#!/usr/bin/env python3 # -*- encoding: utf-8 -*- import json import os import shutil import subprocess import sys import traceback from concurrent.futures import ProcessPoolExecutor as PoolExecutor from io import StringIO from pathlib import Path from typing import Dict, List, Optional, Set, Tuple import colored as clrlib from .system.cmdline_parser import parse_cmdline from .system.downloader.cache import get_path_for_caching STOP_JOBS_FLAG_PATH = Path('stop_jobs.flag') FORBIDDEN_WORKER_SPLITS = { 'deviantart', } MAX_WORKERS = 12 SPLIT_WORKER_AFTER_N_LINKS = 10000 USE_FIREFOX_COOKIES = True DEBUG_WORKER = None IGNORE_WORKERS: Set[str] = set() REDOWNLOAD = False REDOWNLOAD_EMPTIES = False CUSTOM_WORKER_SPLITS: Dict[str, int] = {} SKIP_INDEXED_FILES = True RETRY_ERROR_MASK = 0 FORBIDDEN_WORKER_SPLITS = {'deviantart'} DOWNLOAD_SINGLE_MANUAL_LINK: Optional[str] = None GDL_ERRORS = [ 'GENERIC_ERR', # 1 'UNKNOWN_ERR', # 2 'HTTP_ERR', # 4 '404_ERR', # 8 'AUTH_ERR', # 16 'FORMAT_ERR', # 32 'LACKS_EXTRACTOR_ERR', # 64 'OS_ERR', # 128 'NOT_IN_DISK_ERR', # 256 'LIBRARY_ERROR', # 512 ] GDL_ERRORS_DICT = {(1 << k): v for k, v in enumerate(GDL_ERRORS)} def cmdline(encoded_args: str = None): if encoded_args is None: return run_with_config() else: return parse_cmdline(run_with_config, encoded_args) def run_with_config( max_workers: int = None, debug_worker: str = None, manual_link: str = None, ): global DEBUG_WORKER global MAX_WORKERS global DOWNLOAD_SINGLE_MANUAL_LINK DOWNLOAD_SINGLE_MANUAL_LINK = manual_link DEBUG_WORKER = debug_worker if max_workers is not None: MAX_WORKERS = max_workers return main_loading_preprocessed() def main_loading_preprocessed(): print('Initial data loading...') kwargs = json.loads(Path('r_fetch_preprocessed.json').read_text()) global SKIP_INDEXED_FILES, RETRY_ERROR_MASK, CUSTOM_WORKER_SPLITS, SPLIT_WORKER_AFTER_N_LINKS, REDOWNLOAD_EMPTIES, REDOWNLOAD SKIP_INDEXED_FILES = kwargs.pop('SKIP_INDEXED_FILES') RETRY_ERROR_MASK = kwargs.pop('RETRY_ERROR_MASK') CUSTOM_WORKER_SPLITS = kwargs.pop('CUSTOM_WORKER_SPLITS') SPLIT_WORKER_AFTER_N_LINKS = kwargs.pop('SPLIT_WORKER_AFTER_N_LINKS') REDOWNLOAD_EMPTIES = kwargs.pop('REDOWNLOAD_EMPTIES') REDOWNLOAD = kwargs.pop('REDOWNLOAD') kwargs['workers_state_path'] = Path(kwargs['workers_state_path']) main(**kwargs) def main(files_from_links: Dict[str, List[str]], links_no_files: List[str], files_sizes: Dict[str, int], link_statuses: Dict[str, int], workers_nicely_grouped: List[str], workers_state_path: Path, links_to_worker: Dict[str, List[str]], link_keys: Dict[str, str]): global DEBUG_WORKER if len(links_to_worker) == 0: raise Exception('No work to do.') # if DEBUG_WORKER is None: # raise Exception('DEBUG_ONLY: preventing cron jobs from running') workers_state_path = Path('i_gdl_w') workers_state_path.mkdir(exist_ok=True, parents=True) for wsp in workers_state_path.iterdir(): wsp.unlink() print('Initial data loaded.') def save_ending_files(): nonlocal links_no_files links_no_files2 = list(map(lambda a: a[0], filter(lambda a: len(a[1]) <= 0 and a[0] not in links_no_files, files_from_links.items()))) + links_no_files files_from_links2 = dict( filter(lambda a: len(a[1]) > 0, files_from_links.items())) links_no_files2_sorted = sorted(links_no_files2) links_for_files = dict() for link, files in files_from_links2.items(): for file in files: if file not in links_for_files: links_for_files[file] = list() links_for_files[file].append(link) del file del link del files os.sync() Path('i_gdl_lnf.json').write_text( json.dumps(links_no_files2_sorted, indent=1)) Path('i_gdl_ffl.json').write_text(json.dumps( files_from_links2, indent=1, sort_keys=True)) Path('i_gdl_lff.json').write_text(json.dumps( links_for_files, indent=1, sort_keys=True)) Path('i_gdl_fsz.json').write_text( json.dumps(files_sizes, indent=1, sort_keys=True)) Path('i_gdl_spl.json').write_text(json.dumps( link_statuses, indent=1, sort_keys=True)) os.sync() print('Performing partial save...') save_ending_files() print('Performed partial save.') totalfiles = 0 thread_ids = workers_nicely_grouped.copy() for line, thread_id in enumerate(thread_ids): workers_state_path.joinpath(thread_id+'=line').write_text(str(line)) linkcount = len(links_to_worker[thread_id]) workers_state_path.joinpath(thread_id).write_text( f'waiting:{linkcount}:{linkcount}:0:0') do_fancy_multithreading_panel = False thread_id_count = len(thread_ids) if DOWNLOAD_SINGLE_MANUAL_LINK is not None or DEBUG_WORKER is not None: if DOWNLOAD_SINGLE_MANUAL_LINK is not None: DEBUG_WORKER = 'manual' links_to_worker[DEBUG_WORKER] = [DOWNLOAD_SINGLE_MANUAL_LINK] print(f'Will debug {repr(DEBUG_WORKER)}.') thread_id = DEBUG_WORKER links_list = links_to_worker[DEBUG_WORKER] download_link_list( links_list, thread_id, None, f'Debugging {repr(DEBUG_WORKER)}...', workers_state_path.joinpath(thread_id), ) return if links_to_worker: with PoolExecutor(min(MAX_WORKERS, thread_id_count)) as pe: if do_fancy_multithreading_panel: print(f'\033[2J', end='', flush=True) print(f'\033[0;0H', end='', flush=True) print('Downloading...', flush=True) if do_fancy_multithreading_panel: print(f'\033[0;0H', end='', flush=True) largest_tid_size = max(map(len, thread_ids)) line2tid = dict() def done_callback_generator(line): nonlocal totalfiles def terminate_process_pool(): os.system('sync') os.system( "bash -c \"ps -aux | grep './redditgetter.py' | grep -v grep | sed -e 's/ */ /g' | cut -d' ' -f2 | xargs -r -- kill -15\"") sys.exit(0xFF) def done_callback(job): nonlocal totalfiles thread_id = line2tid[line] links_list = links_to_worker[thread_id] try: workers_state_path.joinpath(thread_id).write_text( f'finished:{len(links_list)}:0:0:0') print(clrlib.stylize( f'Received job #{line}: {thread_id}', [ clrlib.fg('white'), clrlib.bg('green'), clrlib.attr('bold'), ] )) downloaded_links = list() totalbytes = 0 thisfiles = 0 true = True downloaded_links = job.result() for link, files in downloaded_links: if true: statusdir = get_path_for_caching( link, Path('i_gdl_s')) statusdir.mkdir(parents=True, exist_ok=True) statusfile = statusdir.joinpath( '_gdl_status.json') statuses = dict() if statusfile.exists(): statuses = json.loads( statusfile.read_text()) link_statuses[link] = statuses.get(link, 0xFF) if link not in files_from_links: files_from_links[link] = list() lenfiles = len(files) totalfiles += lenfiles for file in files: filepath = Path(file) thisfiles += 1 if filepath.exists(): files_from_links[link].append(file) st_size = filepath.stat().st_size files_sizes[file] = st_size totalbytes += st_size workers_state_path.joinpath(thread_id).write_text( f'finished:{len(links_list)}:0:{totalbytes}:{thisfiles}') save_ending_files() except: sio = StringIO() traceback.print_exc(file=sio) excTxt = sio.getvalue() try: workers_state_path.joinpath(thread_id).write_text( f'failed:{len(links_list)}:0:0:0') except: pass try: workers_state_path.joinpath( thread_id+'=exc').write_text(excTxt) except: pass try: pe.shutdown(wait=False) except: pass print(excTxt) terminate_process_pool() return return done_callback for line, thread_id in enumerate(thread_ids): line2tid[line] = thread_id links_list = links_to_worker[thread_id] workers_state_path.joinpath(thread_id).write_text( f'enqueued:{len(links_list)}:{len(links_list)}:0:0') print(clrlib.stylize(f'Starting job #{line}: {thread_id}', [ clrlib.fg('white'), clrlib.bg('light_red'), clrlib.attr('bold'), ])) jobstardedmsg = clrlib.stylize(f'Starting job #{line}: {thread_id}', [ clrlib.fg('black'), clrlib.bg('light_yellow'), clrlib.attr('bold'), ]) thread_id_nmsz = len(thread_id) thread_id_display = thread_id + ' ' * \ (largest_tid_size - thread_id_nmsz) job = pe.submit( download_link_list, links_list, thread_id_display, line+3 if do_fancy_multithreading_panel else None, jobstardedmsg, workers_state_path.joinpath(thread_id), ) job.add_done_callback(done_callback_generator(line)) save_ending_files() if (p := Path('latest_image_download.txt')).exists(): p.unlink() if workers_state_path.exists(): for p in workers_state_path.glob('*'): p.unlink() shutil.rmtree(workers_state_path) print(f'Downloaded {totalfiles} files') def download_link_list(links: List[str], thread_id: str, line: Optional[int] = None, job_started_msg: Optional[str] = None, thread_state_path: Optional[Path] = None, ) -> List[Tuple[str, List[str]]]: '''Downloads a link list inside a ProcessPoolExecutor''' if STOP_JOBS_FLAG_PATH.exists(): raise InterruptedError(STOP_JOBS_FLAG_PATH) if job_started_msg is not None: print(job_started_msg) has_its_own_line = line is not None link_count = len(links) remaining_links = link_count if thread_state_path is not None: thread_state_path.write_text( f'running:{link_count}:{remaining_links}:0:0') result = list() totalbytes = 0 totalfiles = 0 try: for link in links: scrubbing = True cachedir = get_path_for_caching(link, Path('i_gdl_c')) statusdir = get_path_for_caching(link, Path('i_gdl_s')) cachedir.mkdir(parents=True, exist_ok=True) statusdir.mkdir(parents=True, exist_ok=True) metafile = cachedir.joinpath('_gdl_meta.json') statusfile = statusdir.joinpath('_gdl_status.json') meta = dict() statuses = dict() link_already_downloaded = False if metafile.exists(): try: meta = json.loads(metafile.read_text()) except json.JSONDecodeError: pass if statusfile.exists(): try: statuses = json.loads(statusfile.read_text()) except json.JSONDecodeError: pass if link in meta and link in statuses: link_already_downloaded = True rc = statuses.get(link, 0xFF) if rc == 0: for fl in meta[link]: pth = Path(fl) try: if not pth.exists(): link_already_downloaded = False break except OSError: link_already_downloaded = False break if len(meta[link]) == 0 and REDOWNLOAD_EMPTIES: link_already_downloaded = False if (rc & RETRY_ERROR_MASK) != 0: link_already_downloaded = False if not link_already_downloaded or REDOWNLOAD: scrubbing = False if thread_state_path is not None: thread_state_path.write_text( f'running:{link_count}:{remaining_links}:{totalbytes}:{totalfiles}:{link}') spr = subprocess.run( [ 'python3', str(Path(__file__).parent.joinpath( 'gallery_dl_scriptable.py')), str(thread_id), str(remaining_links), 'i_gdl', link, 'latest_image_download.txt' ], text=False, stdin=sys.stdin, stdout=subprocess.PIPE, stderr=sys.stdout, env={ 'FORCE_COLOR': '1' if clrlib.colored('black').enabled() else '0', **os.environ }, ) jobres = {'status': 512, 'link_requested': link, 'link_effective': link, 'logs': [], 'files': []} if spr.returncode == 0: try: jobres = json.loads(spr.stdout) except Exception: pass rc: int = jobres['status'] os.sync() files: List[str] = jobres['files'].copy() files = list(filter(len, files)) has_changed = True while has_changed: has_changed = False for seq, fl in enumerate(files): if not (pth := Path(fl)).exists(): candidates = sorted(list(filter( lambda p: (p.name.startswith(pth.name) and p.suffix != '.part' and p.suffix != '.json'), pth.parent.iterdir())), key=lambda p: len(p.name) ) if len(candidates) > 0: files[seq] = str(candidates[0]) has_changed = True break else: rc |= 256 # raise Exception(pth.name, candidates, files) del has_changed meta[link] = files statuses[link] = rc metafile.write_text(json.dumps(meta, indent=1)) statusfile.write_text(json.dumps(statuses, indent=1)) os.sync() for fl in meta[link]: code = statuses[link] pth = Path(fl) if not pth.exists(): if code != 0: continue else: raise FileNotFoundError((link, link_already_downloaded, meta[link])) else: totalfiles += 1 totalbytes += pth.stat().st_size result.append((link, meta[link])) remaining_links -= 1 if thread_state_path is not None: scrubbing_running = 'scrubbing' if scrubbing else 'running' thread_state_path.write_text( f'{scrubbing_running}:{link_count}:{remaining_links}:{totalbytes}:{totalfiles}:{link}') if STOP_JOBS_FLAG_PATH.exists(): raise InterruptedError(STOP_JOBS_FLAG_PATH) finally: print((f'\033[{line};0H' if has_its_own_line else '') + clrlib.stylize(thread_id.strip(), [clrlib.fg('yellow'), clrlib.attr('bold')]) + clrlib.stylize('#', [clrlib.fg('light_red')]) + clrlib.stylize('Done', [clrlib.fg('light_green')]) + ('\033[K' if has_its_own_line else '') ) return result