From e58cc9bf774c7c892cc84524b592d3d18afda585 Mon Sep 17 00:00:00 2001 From: Adler Neves Date: Sat, 23 Jan 2021 03:15:31 -0300 Subject: [PATCH] savestate --- .gitignore | 8 +- reddit_imgs/{fetch2.py => _fetch2.py} | 8 +- reddit_imgs/fetch3.py | 672 +++++++++++++++++++++++++ reddit_imgs/fetchpreprocess.py | 505 +++++++++++++++++++ reddit_imgs/runner.py | 12 +- reddit_imgs/sync.py | 136 ++++- reddit_imgs/system/libgallerydl.py | 57 +++ reddit_imgs/system/simpleDownloader.py | 27 +- reddit_imgs/wallpapers2.py | 14 +- 9 files changed, 1389 insertions(+), 50 deletions(-) rename reddit_imgs/{fetch2.py => _fetch2.py} (99%) create mode 100644 reddit_imgs/fetch3.py create mode 100644 reddit_imgs/fetchpreprocess.py create mode 100644 reddit_imgs/system/libgallerydl.py diff --git a/.gitignore b/.gitignore index 65278e5..c69bc08 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ tmp tmp/** -r.json +/*.json +/*.txt +/display_fetch_futures.trace i i/** r @@ -15,10 +17,6 @@ r_gdl* r_gdl*/** i_c i_c/** -i_*.json -fetch_missing.json -most_repeated_hashes.json -display_fetch_futures.trace i_h i_h/** i_h_n diff --git a/reddit_imgs/fetch2.py b/reddit_imgs/_fetch2.py similarity index 99% rename from reddit_imgs/fetch2.py rename to reddit_imgs/_fetch2.py index 4fde6c7..5cdd231 100644 --- a/reddit_imgs/fetch2.py +++ b/reddit_imgs/_fetch2.py @@ -65,11 +65,11 @@ GDL_ERRORS = [ GDL_ERRORS_DICT = {(1 << k): v for k, v in enumerate(GDL_ERRORS)} HTML_SPECIAL_CHARS_REPLACE: List[Tuple[str, str]] = [ - ('&', '&'), ('<', '<'), ('>', '>'), ('"', '"'), (''', '\''), + ('&', '&'), ] HTML_SPECIAL_CHARS: List[str] = list(map(lambda a: a[0], HTML_SPECIAL_CHARS_REPLACE)) @@ -917,8 +917,9 @@ class DownloadJobWithCallSaverPostProcessor(gallery_dl.job.DownloadJob): self) if parent is None else parent.cspp def initialize(self, kwdict=None): - super().initialize(kwdict) - self.postprocessors.append(self.cspp) + if not isinstance(self.hooks, tuple): + print('ADDED!!') + self.hooks['prepare'].append(self.cspp.prepare) class ColoredLineOutput(gallery_dl.output.TerminalOutput): @@ -1015,6 +1016,7 @@ class CallSaverPostProcessor(gallery_dl.postprocessor.common.PostProcessor): # print(cloned_pathfmt.kwdict) # print(cloned_pathfmt) self.calls['prepare'].append(cloned_pathfmt.path) + return pathfmt def run(self, pathfmt: gallery_dl.util.PathFormat): """Execute the postprocessor for a file""" diff --git a/reddit_imgs/fetch3.py b/reddit_imgs/fetch3.py new file mode 100644 index 0000000..fa28820 --- /dev/null +++ b/reddit_imgs/fetch3.py @@ -0,0 +1,672 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + +import pprint +import json +import math +import os +import pickle +import shutil +import subprocess +import sys +import traceback +from collections import OrderedDict +from concurrent.futures import ProcessPoolExecutor as PoolExecutor +from io import StringIO +from pathlib import Path +from typing import Any, Dict, List, Optional, Set, Tuple, Type + +import colored as clrlib +import gallery_dl +import gallery_dl.config +import gallery_dl.extractor +import gallery_dl.job +import gallery_dl.postprocessor.common +import gallery_dl.util + +import reddit_imgs.sync + +from .system.cmdline_parser import parse_cmdline +from .system.downloader.cache import get_normalized_link, get_path_for_caching +from .system.flattener import flatten_generator +from .system.urlmatcher import search_urls + +gdl_pf: Type[gallery_dl.util.PathFormat] = ( + gallery_dl.util.PathFormat + if not hasattr(gallery_dl.util, 'PathFormatOriginal') else + gallery_dl.util.PathFormatOriginal) + +STOP_JOBS_FLAG_PATH = Path('stop_jobs.flag') +FORBIDDEN_WORKER_SPLITS = { + 'deviantart', +} + +MAX_WORKERS = 12 +SPLIT_WORKER_AFTER_N_LINKS = 10000 +USE_FIREFOX_COOKIES = True +DEBUG_WORKER = None +IGNORE_WORKERS: Set[str] = set() +REDOWNLOAD = False +REDOWNLOAD_EMPTIES = False +CUSTOM_WORKER_SPLITS: Dict[str, int] = {} +SKIP_INDEXED_FILES = True +RETRY_ERROR_MASK = 0 +FORBIDDEN_WORKER_SPLITS = {'deviantart'} +DOWNLOAD_SINGLE_MANUAL_LINK: Optional[str] = None + + +GDL_ERRORS = [ + 'GENERIC_ERR', # 1 + 'UNKNOWN_ERR', # 2 + 'HTTP_ERR', # 4 + '404_ERR', # 8 + 'AUTH_ERR', # 16 + 'FORMAT_ERR', # 32 + 'LACKS_EXTRACTOR_ERR', # 64 + 'OS_ERR', # 128 + 'NOT_IN_DISK_ERR', # 256 +] +GDL_ERRORS_DICT = {(1 << k): v for k, v in enumerate(GDL_ERRORS)} + + +def cmdline(encoded_args: str = None): + if encoded_args is None: + return run_with_config() + else: + return parse_cmdline(run_with_config, encoded_args) + + +def run_with_config( + max_workers: int = None, + debug_worker: str = None, + manual_link: str = None, +): + global DEBUG_WORKER + global MAX_WORKERS + global DOWNLOAD_SINGLE_MANUAL_LINK + DOWNLOAD_SINGLE_MANUAL_LINK = manual_link + DEBUG_WORKER = debug_worker + if max_workers is not None: + MAX_WORKERS = max_workers + return main_loading_preprocessed() + + +def main_loading_preprocessed(): + print('Initial data loading...') + kwargs = json.loads(Path('r_fetch_preprocessed.json').read_text()) + global SKIP_INDEXED_FILES, RETRY_ERROR_MASK, CUSTOM_WORKER_SPLITS, SPLIT_WORKER_AFTER_N_LINKS, REDOWNLOAD_EMPTIES, REDOWNLOAD + SKIP_INDEXED_FILES = kwargs.pop('SKIP_INDEXED_FILES') + RETRY_ERROR_MASK = kwargs.pop('RETRY_ERROR_MASK') + CUSTOM_WORKER_SPLITS = kwargs.pop('CUSTOM_WORKER_SPLITS') + SPLIT_WORKER_AFTER_N_LINKS = kwargs.pop('SPLIT_WORKER_AFTER_N_LINKS') + REDOWNLOAD_EMPTIES = kwargs.pop('REDOWNLOAD_EMPTIES') + REDOWNLOAD = kwargs.pop('REDOWNLOAD') + kwargs['workers_state_path'] = Path(kwargs['workers_state_path']) + main(**kwargs) + + +def main(files_from_links: Dict[str, List[str]], + links_no_files: List[str], + files_sizes: Dict[str, int], + link_statuses: Dict[str, int], + workers_nicely_grouped: List[str], + workers_state_path: Path, + links_to_worker: Dict[str, List[str]], + link_keys: Dict[str, str]): + global DEBUG_WORKER + # if DEBUG_WORKER is None: + # raise Exception('DEBUG_ONLY: preventing cron jobs from running') + workers_state_path = Path('i_gdl_w') + workers_state_path.mkdir(exist_ok=True, parents=True) + for wsp in workers_state_path.iterdir(): + wsp.unlink() + + print('Initial data loaded.') + + def save_ending_files(): + nonlocal links_no_files + links_no_files2 = list(map(lambda a: a[0], + filter(lambda a: len(a[1]) <= 0 and a[0] not in links_no_files, + files_from_links.items()))) + links_no_files + files_from_links2 = dict( + filter(lambda a: len(a[1]) > 0, + files_from_links.items())) + links_no_files2_sorted = sorted(links_no_files2) + links_for_files = dict() + for link, files in files_from_links2.items(): + for file in files: + if file not in links_for_files: + links_for_files[file] = list() + links_for_files[file].append(link) + del file + del link + del files + os.sync() + Path('i_gdl_lnf.json').write_text( + json.dumps(links_no_files2_sorted, indent=1)) + Path('i_gdl_ffl.json').write_text(json.dumps( + files_from_links2, indent=1, sort_keys=True)) + Path('i_gdl_lff.json').write_text(json.dumps( + links_for_files, indent=1, sort_keys=True)) + Path('i_gdl_fsz.json').write_text( + json.dumps(files_sizes, indent=1, sort_keys=True)) + Path('i_gdl_spl.json').write_text(json.dumps( + link_statuses, indent=1, sort_keys=True)) + os.sync() + + print('Performing partial save...') + save_ending_files() + print('Performed partial save.') + + configure_gdl() + + gallery_dl.output.select = lambda: ColoredLineOutput(False) + + totalfiles = 0 + + thread_ids = workers_nicely_grouped.copy() + for line, thread_id in enumerate(thread_ids): + workers_state_path.joinpath(thread_id+'=line').write_text(str(line)) + linkcount = len(links_to_worker[thread_id]) + workers_state_path.joinpath(thread_id).write_text( + f'waiting:{linkcount}:{linkcount}:0:0') + do_fancy_multithreading_panel = False + thread_id_count = len(thread_ids) + if DOWNLOAD_SINGLE_MANUAL_LINK is not None or DEBUG_WORKER is not None: + if DOWNLOAD_SINGLE_MANUAL_LINK is not None: + DEBUG_WORKER = 'manual' + links_to_worker[DEBUG_WORKER] = [DOWNLOAD_SINGLE_MANUAL_LINK] + print(f'Will debug {repr(DEBUG_WORKER)}.') + thread_id = DEBUG_WORKER + links_list = links_to_worker[DEBUG_WORKER] + download_link_list( + links_list, + thread_id, + None, + f'Debugging {repr(DEBUG_WORKER)}...', + workers_state_path.joinpath(thread_id), + ) + return + if links_to_worker: + with PoolExecutor(min(MAX_WORKERS, thread_id_count)) as pe: + if do_fancy_multithreading_panel: + print(f'\033[2J', end='', flush=True) + print(f'\033[0;0H', end='', flush=True) + print('Downloading...', flush=True) + if do_fancy_multithreading_panel: + print(f'\033[0;0H', end='', flush=True) + largest_tid_size = max(map(len, thread_ids)) + line2tid = dict() + + def done_callback_generator(line): + nonlocal totalfiles + + def terminate_process_pool(): + os.system('sync') + os.system("bash -c \"ps -aux | grep './redditgetter.py' | grep -v grep | sed -e 's/ */ /g' | cut -d' ' -f2 | xargs -r -- kill -15\"") + sys.exit(0xFF) + + def done_callback(job): + nonlocal totalfiles + thread_id = line2tid[line] + links_list = links_to_worker[thread_id] + try: + workers_state_path.joinpath(thread_id).write_text( + f'finished:{len(links_list)}:0:0:0') + print(clrlib.stylize( + f'Received job #{line}: {thread_id}', [ + clrlib.fg('white'), + clrlib.bg('green'), + clrlib.attr('bold'), + ] + )) + downloaded_links = list() + totalbytes = 0 + thisfiles = 0 + true = True + downloaded_links = job.result() + for link, files in downloaded_links: + if true: + statusdir = get_path_for_caching( + link, Path('i_gdl_s')) + statusdir.mkdir(parents=True, exist_ok=True) + statusfile = statusdir.joinpath('_gdl_status.json') + statuses = dict() + if statusfile.exists(): + statuses = json.loads(statusfile.read_text()) + link_statuses[link] = statuses.get(link, 0xFF) + if link not in files_from_links: + files_from_links[link] = list() + lenfiles = len(files) + totalfiles += lenfiles + for file in files: + filepath = Path(file) + thisfiles += 1 + if filepath.exists(): + files_from_links[link].append(file) + st_size = filepath.stat().st_size + files_sizes[file] = st_size + totalbytes += st_size + workers_state_path.joinpath(thread_id).write_text( + f'finished:{len(links_list)}:0:{totalbytes}:{thisfiles}') + save_ending_files() + except: + sio = StringIO() + traceback.print_exc(file=sio) + excTxt = sio.getvalue() + try: + workers_state_path.joinpath(thread_id).write_text( + f'failed:{len(links_list)}:0:0:0') + except: + pass + try: + workers_state_path.joinpath(thread_id+'=exc').write_text(excTxt) + except: + pass + try: + pe.shutdown(wait=False) + except: + pass + print(excTxt) + terminate_process_pool() + return + return done_callback + for line, thread_id in enumerate(thread_ids): + line2tid[line] = thread_id + links_list = links_to_worker[thread_id] + workers_state_path.joinpath(thread_id).write_text( + f'enqueued:{len(links_list)}:{len(links_list)}:0:0') + print(clrlib.stylize(f'Starting job #{line}: {thread_id}', [ + clrlib.fg('white'), + clrlib.bg('light_red'), + clrlib.attr('bold'), + ])) + jobstardedmsg = clrlib.stylize(f'Starting job #{line}: {thread_id}', [ + clrlib.fg('black'), + clrlib.bg('light_yellow'), + clrlib.attr('bold'), + ]) + thread_id_nmsz = len(thread_id) + thread_id_display = thread_id + ' ' * (largest_tid_size - thread_id_nmsz) + job = pe.submit( + download_link_list, + links_list, + thread_id_display, + line+3 if do_fancy_multithreading_panel else None, + jobstardedmsg, + workers_state_path.joinpath(thread_id), + ) + job.add_done_callback(done_callback_generator(line)) + save_ending_files() + if (p := Path('latest_image_download.txt')).exists(): + p.unlink() + if workers_state_path.exists(): + for p in workers_state_path.glob('*'): + p.unlink() + shutil.rmtree(workers_state_path) + print(f'Downloaded {totalfiles} files') + + +def download_link_list(links: List[str], + thread_id: str, + line: Optional[int] = None, + job_started_msg: Optional[str] = None, + thread_state_path: Optional[Path] = None, + ) -> List[Tuple[str, List[str]]]: + '''Downloads a link list inside a ProcessPoolExecutor''' + if STOP_JOBS_FLAG_PATH.exists(): + raise InterruptedError(STOP_JOBS_FLAG_PATH) + if job_started_msg is not None: + print(job_started_msg) + has_its_own_line = line is not None + link_count = len(links) + remaining_links = link_count + configure_gdl() + if thread_state_path is not None: + thread_state_path.write_text( + f'running:{link_count}:{remaining_links}:0:0') + + def get_printer(): + return ColoredLineOutput( + has_its_own_line, + prefix=(f'\033[{line};0H' if has_its_own_line else '') + + clrlib.stylize('%9d' % remaining_links, [clrlib.fg('light_cyan')]) + + clrlib.stylize('@', [clrlib.fg('light_red')]) + + clrlib.stylize(thread_id, [clrlib.fg('yellow')]) + + clrlib.stylize('= ', [clrlib.fg('dark_gray')]), + suffix=('\033[K\033[0;0H' if has_its_own_line else ''), + prefixsz=len(('%9d' % 0)+' '+thread_id), + suffixsz=0, + write_successes_to=Path('latest_image_download.txt'), + ) + + gallery_dl.output.select = get_printer + result = list() + totalbytes = 0 + totalfiles = 0 + try: + for link in links: + scrubbing = True + cachedir = get_path_for_caching(link, Path('i_gdl_c')) + statusdir = get_path_for_caching(link, Path('i_gdl_s')) + cachedir.mkdir(parents=True, exist_ok=True) + statusdir.mkdir(parents=True, exist_ok=True) + metafile = cachedir.joinpath('_gdl_meta.json') + statusfile = statusdir.joinpath('_gdl_status.json') + meta = dict() + statuses = dict() + link_already_downloaded = False + if metafile.exists(): + try: + meta = json.loads(metafile.read_text()) + except json.JSONDecodeError: + pass + if statusfile.exists(): + try: + statuses = json.loads(statusfile.read_text()) + except json.JSONDecodeError: + pass + if link in meta and link in statuses: + link_already_downloaded = True + rc = statuses.get(link, 0xFF) + if rc == 0: + for fl in meta[link]: + pth = Path(fl) + try: + if not pth.exists(): + link_already_downloaded = False + break + except OSError: + link_already_downloaded = False + break + if len(meta[link]) == 0 and REDOWNLOAD_EMPTIES: + link_already_downloaded = False + if (rc & RETRY_ERROR_MASK) != 0: + link_already_downloaded = False + if not link_already_downloaded or REDOWNLOAD: + scrubbing = False + if thread_state_path is not None: + thread_state_path.write_text( + f'running:{link_count}:{remaining_links}:{totalbytes}:{totalfiles}:{link}') + job = DownloadJobWithCallSaverPostProcessor(link) + job.out = get_printer() + job.out.message(link, clrlib.fg('light_magenta')) + rc = job.run() + os.sync() + # print('FINAL', job.cspp.calls) + # raise Exception(job.cspp.calls) + # files = job.cspp.calls['run_final'].copy() # Only brings the last element + files = job.cspp.calls['prepare'].copy() + files = list(filter(len, files)) + has_changed = True + while has_changed: + has_changed = False + for seq, fl in enumerate(files): + if not (pth := Path(fl)).exists(): + candidates = sorted(list(filter( + lambda p: (p.name.startswith(pth.name) + and + p.suffix != '.part' + and + p.suffix != '.json'), + pth.parent.iterdir())), + key=lambda p: len(p.name) + ) + if len(candidates) > 0: + files[seq] = str(candidates[0]) + has_changed = True + break + else: + rc |= 256 + # raise Exception(pth.name, candidates, files) + del has_changed + meta[link] = files + statuses[link] = rc + metafile.write_text(json.dumps(meta, indent=1)) + statusfile.write_text(json.dumps(statuses, indent=1)) + os.sync() + for fl in meta[link]: + code = statuses[link] + pth = Path(fl) + if not pth.exists(): + if code != 0: + continue + else: + raise FileNotFoundError((link, + link_already_downloaded, + meta[link])) + else: + totalfiles += 1 + totalbytes += pth.stat().st_size + result.append((link, meta[link])) + remaining_links -= 1 + if thread_state_path is not None: + scrubbing_running = 'scrubbing' if scrubbing else 'running' + thread_state_path.write_text( + f'{scrubbing_running}:{link_count}:{remaining_links}:{totalbytes}:{totalfiles}:{link}') + if STOP_JOBS_FLAG_PATH.exists(): + raise InterruptedError(STOP_JOBS_FLAG_PATH) + finally: + print((f'\033[{line};0H' if has_its_own_line else '') + + clrlib.stylize(thread_id.strip(), [clrlib.fg('yellow'), clrlib.attr('bold')]) + + clrlib.stylize('#', [clrlib.fg('light_red')]) + + clrlib.stylize('Done', [clrlib.fg('light_green')]) + + ('\033[K' if has_its_own_line else '') + ) + return result + + +def configure_gdl(): + '''Configures Gallery-DL for usage.''' + parser = gallery_dl.option.build_parser() + args = parser.parse_args([ + *([] if USE_FIREFOX_COOKIES else ['--cookies=i_gdl/.cookies']), + '--dest=i_gdl', + '--write-metadata', + # '--write-tags', + # '--write-log=i_gdl_log.txt', + '--write-unsupported=i_gdl_unsupported.txt', + # '--quiet', + *(['--verbose'] if DEBUG_WORKER else []), + '--retries=1', + # '--retries=7', + # '--limit-rate=1500k', + ]) + gallery_dl.output.initialize_logging(args.loglevel) + + # configuration + if args.load_config: + gallery_dl.config.load() + if args.cfgfiles: + gallery_dl.config.load(args.cfgfiles, strict=True) + if args.yamlfiles: + gallery_dl.config.load(args.yamlfiles, strict=True, fmt="yaml") + if args.postprocessors: + gallery_dl.config.set((), "postprocessors", args.postprocessors) + if args.abort: + gallery_dl.config.set((), "skip", "abort:" + str(args.abort)) + for opts in args.options: + gallery_dl.config.set(*opts) + + # loglevels + gallery_dl.output.configure_logging(args.loglevel) + + gallery_dl.output.select = ColoredLineOutput + + gallery_dl.util.PathFormatOriginal = gdl_pf + gallery_dl.util.PathFormat = OverriddenPathFormat + + +class DownloadJobWithCallSaverPostProcessor(gallery_dl.job.DownloadJob): + def __init__(self, url, parent=None): + super().__init__(url, parent) + self.cspp = CallSaverPostProcessor( + self) if parent is None else parent.cspp + + def initialize(self, kwdict=None): + super().initialize(kwdict) + if not isinstance(self.hooks, tuple): + self.hooks['prepare'].append(self.cspp.prepare) + + +class ColoredLineOutput(gallery_dl.output.TerminalOutput): + def __init__(self, sameline=False, prefix="", suffix="", prefixsz=0, suffixsz=0, write_successes_to=None): + super().__init__() + self.sameline = sameline + self.eol = '\r' if sameline else '\n' + self.prefix = prefix + self.suffix = suffix + self.prefixsz = prefixsz + self.suffixsz = suffixsz + self.write_successes_to = write_successes_to + self._termsize_update() + + def start(self, path): + self.message(path, + clrlib.fg("light_yellow"), + ) + + def skip(self, path): + self.message(path, + clrlib.attr('dim'), + ) + + def success(self, path, tries): + self.message(path, + clrlib.attr('bold'), + clrlib.fg('light_green'), + ) + if self.write_successes_to is not None: + self.write_successes_to.write_text(path) + + def message(self, txt: str, *attrs: List[str], do_print: bool = True) -> str: + """Prints a message with given formatters""" + clrtxt = clrlib.stylize(self.shorten(txt), attrs) + fmtd = f"{self.prefix}{clrtxt}{self.suffix}" + if do_print: + print(fmtd, flush=True, end=self.eol) + return fmtd + + def shorten(self, txt): + self._termsize_update() + self.width = self.termsize - self.prefixsz - self.suffixsz - 1 + return super().shorten(txt) + + def _termsize_update(self): + self.termsize = shutil.get_terminal_size().columns + + +class OverriddenPathFormat(gdl_pf): + def __init__(self, extractor): + super().__init__(extractor) + self.clean_path = FixFileNameFormatterWrapper(self.clean_path) + + +class CallSaverPostProcessor(gallery_dl.postprocessor.common.PostProcessor): + def __init__(self, job): + super().__init__(job) + self.calls = dict( + prepare=list(), + run=list(), + run_metadata=list(), + run_after=list(), + run_final=list(), + ) + + def prepare(self, pathfmt: gallery_dl.util.PathFormat): + """Update file paths, etc.""" + + directory_formatters = pathfmt.directory_formatters + filename_formatter = pathfmt.filename_formatter + clean_segment = pathfmt.clean_segment + clean_path = pathfmt.clean_path + kwdict_fallback = pathfmt.kwdict.get('_fallback', None) + + # pp = pprint.PrettyPrinter() + # pp.pprint(pathfmt) + # pp.pprint(pathfmt.__dict__) + + pathfmt.directory_formatters = None + pathfmt.filename_formatter = None + pathfmt.clean_segment = None + pathfmt.clean_path = None + if kwdict_fallback is not None: + pathfmt.kwdict['_fallback'] = None + + cloned_pathfmt: gallery_dl.util.PathFormat = pickle.loads(pickle.dumps(pathfmt)) + + pathfmt.directory_formatters = directory_formatters + pathfmt.filename_formatter = filename_formatter + pathfmt.clean_segment = clean_segment + pathfmt.clean_path = clean_path + if kwdict_fallback is not None: + pathfmt.kwdict['_fallback'] = kwdict_fallback + + cloned_pathfmt.directory_formatters = directory_formatters + cloned_pathfmt.filename_formatter = filename_formatter + cloned_pathfmt.clean_segment = clean_segment + cloned_pathfmt.clean_path = clean_path + if kwdict_fallback is not None: + cloned_pathfmt.kwdict['_fallback'] = kwdict_fallback + + cloned_pathfmt.build_path() + # print(cloned_pathfmt.path) + # print(cloned_pathfmt.filename) + # print(cloned_pathfmt.kwdict) + # print(cloned_pathfmt) + self.calls['prepare'].append(cloned_pathfmt.path) + return pathfmt + + def run(self, pathfmt: gallery_dl.util.PathFormat): + """Execute the postprocessor for a file""" + self.calls['run'].append(pathfmt.path) + + def run_metadata(self, pathfmt: gallery_dl.util.PathFormat): + """Execute the postprocessor for a file""" + self.calls['run_metadata'].append(pathfmt.path) + + def run_after(self, pathfmt: gallery_dl.util.PathFormat): + """Execute postprocessor after moving a file to its target location""" + self.calls['run_after'].append(pathfmt.path) + + def run_final(self, pathfmt: gallery_dl.util.PathFormat, status: int): + """Postprocessor finalization after all files have been downloaded""" + self.calls['run_final'].append((pathfmt.path, status)) + + +class FixFileNameFormatterWrapper: + """Wraps file name formatter for ensuring a valid file name length""" + + def __init__(self, formatter: gallery_dl.util.Formatter): + self.formatter = formatter + + def __call__(self, *args, **kwargs) -> str: + path = self.formatter(*args, **kwargs) + parts = list(map(fix_filename_ending_extension, + map(fix_filename_length, + map(fix_filename_ending_extension, + Path(path).parts)))) + return str(Path(*parts)) + + +def fix_filename_length(filename: str) -> str: + """Ensures a segment has a valid file name length""" + if len(filename.encode()) > 240: + extension = Path(filename).suffix + extension_bytes_length = len(extension.encode()) + stem_bytes = Path(filename).stem.encode() + fixed_stem_bytes = stem_bytes[:240-extension_bytes_length] + fixed_stem = fixed_stem_bytes.decode(errors="ignore") + return fixed_stem + extension + return filename + + +def fix_filename_ending_extension(filename: str) -> str: + if (fp := Path(filename)).stem[-1:] in ('.', ' '): + return str(fp.parent.joinpath(f"{fp.stem.rstrip('. ')}{fp.suffix}")) + return filename + + +if __name__ == "__main__": + kwargs: Dict[str, Any] = dict() + main(**kwargs) diff --git a/reddit_imgs/fetchpreprocess.py b/reddit_imgs/fetchpreprocess.py new file mode 100644 index 0000000..724ce7e --- /dev/null +++ b/reddit_imgs/fetchpreprocess.py @@ -0,0 +1,505 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + +import json +import math +from collections import OrderedDict +from pathlib import Path +from typing import Dict, List, Set, Tuple + +import reddit_imgs.sync + +from .system import libgallerydl +from .system.cmdline_parser import parse_cmdline +from .system.downloader.cache import get_normalized_link, get_path_for_caching +from .system.flattener import flatten_generator +from .system.urlmatcher import search_urls + +FORBIDDEN_WORKER_SPLITS = {'deviantart'} +SKIP_INDEXED_FILES = True +RETRY_ERROR_MASK = 0 +CUSTOM_WORKER_SPLITS: Dict[str, int] = {} +SPLIT_WORKER_AFTER_N_LINKS = 10000 +IGNORE_WORKERS: Set[str] = set() +REDOWNLOAD_EMPTIES = False +REDOWNLOAD = False + +STOP_JOBS_FLAG_PATH = Path('stop_jobs.flag') +HTML_SPECIAL_CHARS_REPLACE: List[Tuple[str, str]] = [ + ('<', '<'), + ('>', '>'), + ('"', '"'), + (''', '\''), + ('&', '&'), +] +HTML_SPECIAL_CHARS: List[str] = list( + map(lambda a: a[0], HTML_SPECIAL_CHARS_REPLACE)) + + +def contains_any(s: str, l: List[str]) -> bool: + for i in l: + if i in s: + return True + return False + + +def replace_many(s: str, l: List[Tuple[str, str]]) -> str: + for f, t in l: + s = s.replace(f, t) + return s + + +def cmdline(encoded_args: str = None): + if encoded_args is None: + return run_with_config() + else: + return parse_cmdline(run_with_config, encoded_args) + + +def run_with_config(redownload_empties: bool = False, + redownload_all: bool = False, + retry_generic_errors: bool = False, + retry_unknown_errors: bool = False, + retry_network_errors: bool = False, + retry_not_found_errors: bool = False, + retry_auth_errors: bool = False, + retry_format_errors: bool = False, + retry_extractor_errors: bool = False, + retry_os_errors: bool = False, + retry_not_in_disk_errors: bool = False, + retry_gdl_mask: int = 0, + split_workers: int = None, + custom_worker_splits: dict = None, + skip_indexed_files: bool = True, + ignore_workers: Set[str] = set(), + ): + global SPLIT_WORKER_AFTER_N_LINKS + global CUSTOM_WORKER_SPLITS + global SKIP_INDEXED_FILES + global REDOWNLOAD_EMPTIES + global RETRY_ERROR_MASK + global REDOWNLOAD + global IGNORE_WORKERS + IGNORE_WORKERS = ignore_workers + REDOWNLOAD = redownload_all + SKIP_INDEXED_FILES = skip_indexed_files + REDOWNLOAD_EMPTIES = redownload_empties + RETRY_ERROR_MASK |= retry_gdl_mask + if retry_generic_errors: + RETRY_ERROR_MASK |= 1 << 0 + if retry_unknown_errors: + RETRY_ERROR_MASK |= 1 << 1 + if retry_network_errors: + RETRY_ERROR_MASK |= 1 << 2 + if retry_not_found_errors: + RETRY_ERROR_MASK |= 1 << 3 + if retry_auth_errors: + RETRY_ERROR_MASK |= 1 << 4 + if retry_format_errors: + RETRY_ERROR_MASK |= 1 << 5 + if retry_extractor_errors: + RETRY_ERROR_MASK |= 1 << 6 + if retry_os_errors: + RETRY_ERROR_MASK |= 1 << 7 + if retry_not_in_disk_errors: + RETRY_ERROR_MASK |= 1 << 8 + if split_workers is not None: + SPLIT_WORKER_AFTER_N_LINKS = split_workers + if custom_worker_splits is not None: + CUSTOM_WORKER_SPLITS = custom_worker_splits + return main() + + +def main(): + subreddit_data_path = Path('r.json') + if not subreddit_data_path.exists(): + print("Executing prerrequisite...") + reddit_imgs.sync.main() + subreddit_filters_path = Path('rf.json') + print('Loading posts from disk...') + Path('i_gdl').mkdir(exist_ok=True, parents=True) + workers_state_path = Path('i_gdl_w') + workers_state_path.mkdir(exist_ok=True, parents=True) + for wsp in workers_state_path.iterdir(): + wsp.unlink() + if STOP_JOBS_FLAG_PATH.exists(): + STOP_JOBS_FLAG_PATH.unlink() + subreddit_data = json.loads(subreddit_data_path.read_text()) + subreddit_filters = json.loads(subreddit_filters_path.read_bytes()) + print('Loading posts...') + posts = prerrun_flatten_subreddits_into_posts( + subreddit_data, subreddit_filters) + print(f'{len(posts)} posts identified.') + print(f'Identifying alternative trivial links...') + prerrun_posts_re_sort(posts) + Path('r_gdl_p.json').write_text( + json.dumps(posts, indent=1, sort_keys=True)) + print(f'Grouping links with the posts they show up in...') + links = OrderedDict() + for dk, post in posts.items(): + for link in post['links']: + if link not in links: + links[link] = list() + links[link].append(dk) + Path('r_gdl_lp.json').write_text( + json.dumps(links, indent=1, sort_keys=True)) + known_link_set = set(links.keys()) + print(f'{len(links)} links found') + print(f'Checking if there is an extractor for each link...') + r_gdl_lk_path = Path('r_gdl_lk.json') + r_gdl_le_path = Path('r_gdl_le.json') + link_keys = dict() + if r_gdl_lk_path.exists(): + link_keys = json.loads(r_gdl_lk_path.read_text()) + for link in links.keys(): + if link not in link_keys or link_keys[link] == '': + key = libgallerydl.find_extractor_archival_key(link) + if key is None: + key = '' + link_keys[link] = key + del key + link_extractors = dict() + if r_gdl_le_path.exists(): + link_extractors = json.loads(r_gdl_le_path.read_text()) + for link in links.keys(): + if link not in link_extractors or link_extractors[link] == '': + category_subcategory = libgallerydl.find_extractor_category_subcategory( + link) + if category_subcategory is None: + link_extractors[link] = '' + else: + category, subcategory = category_subcategory + if category == 'reddit' and subcategory in ('subreddit', 'user'): + link_extractors[link] = '' + else: + link_extractors[link] = category + for discarded_link in set(link_extractors.keys()).difference(known_link_set): + del link_extractors[discarded_link] + r_gdl_le_path.write_text(json.dumps( + link_extractors, indent=1, sort_keys=True)) + r_gdl_lk_path.write_text(json.dumps( + link_keys, indent=1, sort_keys=True)) + links_by_extractor = { + extractor: list() + for extractor in list(set(link_extractors.values())) + } + for link, extractor in link_extractors.items(): + links_by_extractor[extractor].append(link) + not_downloadable_links = dict() + not_downloadable_links[''] = links_by_extractor.get('', []) + not_downloadable_links['reddit_user'] = links_by_extractor.get( + 'reddit_user', []) + not_downloadable_links['reddit_subreddit'] = links_by_extractor.get( + 'reddit_subreddit', []) + Path('i_undownloadable.json').write_text( + json.dumps(not_downloadable_links, indent=1)) + if '' in links_by_extractor: + del links_by_extractor[''] + if 'reddit_user' in links_by_extractor: + del links_by_extractor['reddit_user'] + if 'reddit_subreddit' in links_by_extractor: + del links_by_extractor['reddit_subreddit'] + not_downloadable_link_set = frozenset( + flatten_generator(not_downloadable_links.values())) + + print(f'{len(links)-len(not_downloadable_link_set)} downloadable links found') + print(f'{len(not_downloadable_link_set)} undownloadable links found') + print(f'{len(links_by_extractor)} extractors found') + Path('r_gdl_lbe.json').write_text(json.dumps( + links_by_extractor, indent=1, sort_keys=True)) + + files_from_links: Dict[str, List[str]] = dict() + links_no_files: List[str] = list() + files_sizes: Dict[str, int] = dict() + link_statuses: Dict[str, int] = dict() + ignored_links: Set[str] = set() + + if (pth := Path('i_gdl_ffl.json')).exists(): + try: + files_from_links = json.loads(pth.read_text()) + except: + pass + + if (pth := Path('i_gdl_lnf.json')).exists(): + try: + links_no_files = json.loads(pth.read_text()) + except: + pass + + if (pth := Path('i_gdl_fsz.json')).exists(): + try: + files_sizes = json.loads(pth.read_text()) + except: + pass + + if (pth := Path('i_gdl_spl.json')).exists(): + try: + link_statuses = json.loads(pth.read_text()) + except: + pass + + for discarded_link in set(links_no_files).difference(known_link_set): + links_no_files.remove(discarded_link) + discarded_files = set() + for discarded_link in set(files_from_links.keys()).difference(known_link_set): + if discarded_link in files_from_links: + files_in_link = files_from_links[discarded_link] + for file_in_link in files_in_link: + discarded_files.add(file_in_link) + if discarded_link in link_statuses: + del link_statuses[discarded_link] + del files_from_links[discarded_link] + files_to_keep = set() + for files_from_link in files_from_links.values(): + for file_from_link in files_from_link: + if file_from_link not in files_to_keep: + files_to_keep.add(file_from_link) + for discarded_file in discarded_files.difference(files_to_keep): + if discarded_file in files_sizes: + del files_sizes[discarded_file] + for missing_file_size in files_to_keep.difference(set(files_sizes.keys())): + p = Path(missing_file_size) + if not p.exists(): + raise FileNotFoundError(missing_file_size) + else: + files_sizes[missing_file_size] = p.stat().st_size + print('Re-filled files_sizes for %r' % p) + if (p := Path('i_gdl_ignored.txt')).exists(): + ignored_links = set(list(filter(len, p.read_text().splitlines()))) + + links_no_files = list(filter(lambda a: a not in ignored_links, + links_no_files)) + + link_statuses = dict(filter(lambda a: a[0] not in ignored_links, + link_statuses.items())) + + files_from_links = dict(filter(lambda a: a[0] not in ignored_links, + files_from_links.items())) + + checked_links = list(files_from_links.keys()) + links_no_files + + checked_links = frozenset(checked_links) + + max_expected_jobs_for_extractor = 0 + for extractor, links in links_by_extractor.items(): + links = [link + for link in links + if + link not in ignored_links + and + ( + link not in checked_links + or + not SKIP_INDEXED_FILES + or + (link_statuses.get(link, 0xFF) & RETRY_ERROR_MASK) != 0 + )] + if len(links) <= 0: + continue + this_worker_split_after_n_links = CUSTOM_WORKER_SPLITS.get( + extractor, SPLIT_WORKER_AFTER_N_LINKS) + workers = math.ceil(len(links)/this_worker_split_after_n_links) + if workers <= 1 or extractor in FORBIDDEN_WORKER_SPLITS: + workers = 1 + max_expected_jobs_for_extractor = max( + max_expected_jobs_for_extractor, + workers + ) + worker_by_seq = [list() for _ in range(max_expected_jobs_for_extractor)] + + links_to_worker = dict() + for extractor, links in links_by_extractor.items(): + links = [link + for link in links + if + link not in ignored_links + and + ( + link not in checked_links + or + not SKIP_INDEXED_FILES + or + (link_statuses.get(link, 0xFF) & RETRY_ERROR_MASK) != 0 + )] + if len(links) <= 0: + continue + this_worker_split_after_n_links = CUSTOM_WORKER_SPLITS.get( + extractor, SPLIT_WORKER_AFTER_N_LINKS) + workers = math.ceil(len(links)/this_worker_split_after_n_links) + if workers <= 1 or extractor in FORBIDDEN_WORKER_SPLITS: + if extractor in IGNORE_WORKERS: + continue + links_to_worker[extractor] = links + worker_by_seq[0].append(extractor) + else: + digits = math.ceil(math.log10(max(1, workers+1))) + fmt = "%%0%dd" % digits + for worker_no in range(workers): + lowerlimit = (worker_no+0)*this_worker_split_after_n_links + upperlimit = (worker_no+1)*this_worker_split_after_n_links + thisrange = links[lowerlimit:upperlimit] + worker_nm = extractor + ':' + (fmt % (worker_no)) + if worker_nm in IGNORE_WORKERS: + continue + links_to_worker[worker_nm] = thisrange + worker_by_seq[worker_no].append(worker_nm) + for w in worker_by_seq: + w.sort() + workers_nicely_grouped = [ + worker + for workergroup in worker_by_seq + for worker in workergroup + if worker != '' + ] + print(f'{len(links_to_worker)} workers to be spawned') + response_dict = dict( + files_from_links=files_from_links, + links_no_files=links_no_files, + files_sizes=files_sizes, + link_statuses=link_statuses, + workers_nicely_grouped=workers_nicely_grouped, + workers_state_path=str(workers_state_path), + links_to_worker=links_to_worker, + link_keys=link_keys, + SKIP_INDEXED_FILES=SKIP_INDEXED_FILES, + RETRY_ERROR_MASK=RETRY_ERROR_MASK, + CUSTOM_WORKER_SPLITS=CUSTOM_WORKER_SPLITS, + SPLIT_WORKER_AFTER_N_LINKS=SPLIT_WORKER_AFTER_N_LINKS, + REDOWNLOAD_EMPTIES=REDOWNLOAD_EMPTIES, + REDOWNLOAD=REDOWNLOAD, + ) + Path('r_fetch_preprocessed.json').write_text( + json.dumps(response_dict, indent=1)) + return response_dict + + +def prerrun_flatten_subreddits_into_posts(subreddit_data, subreddit_filters): + postsl = [ + {'subreddit': subreddit, **post} + for subreddit, srdt in subreddit_data.items() + for post in srdt['links'] + ] + postsl.sort(key=lambda a: (-a['timestamp'], a['datakey'])) + postsd = dict() + for post in postsl: + dk = post['datakey'] + sr = post['subreddit'] + if subreddit_filters['no_download'][sr]: + continue + if subreddit_filters['no_sfw'][sr] and not post['nsfw']: + continue + if subreddit_filters['no_nsfw'][sr] and post['nsfw']: + continue + if dk not in postsd: + postsd[dk] = post.copy() + postsd[dk]['subreddits'] = list() + postsd[dk]['links'] = list() + del postsd[dk]['subreddit'] + del postsd[dk]['link'] + del postsd[dk]['domain'] + if (sr := post['subreddit']) not in (srs := postsd[dk]['subreddits']): + srs.append(sr) + if (lnk := get_normalized_link(post['link'])) not in (lnks := postsd[dk]['links']): + lnks.append(lnk) + return postsd + + +def prerrun_posts_re_sort(posts): + for post in sorted(posts.values(), key=lambda a: (-a['timestamp'], a['datakey'])): + post['subreddits'].sort() + dk = post['datakey'] + post_links = post['links'] + has_changed_any_link = True + while has_changed_any_link: + has_changed_any_link = False + for link in post_links: + if '' in link: + for linkcopy in search_urls(link): + linkcopy = get_normalized_link(linkcopy) + linkcopy = replace_many( + linkcopy, HTML_SPECIAL_CHARS_REPLACE) + if linkcopy not in post_links: + post_links.append(linkcopy) + has_changed_any_link = True + while link in post_links: + post_links.remove(link) + has_changed_any_link = True + break + else: + linkcopy = link + linkcopy = get_normalized_link(linkcopy) + if linkcopy not in post_links: + post_links.append(linkcopy) + has_changed_any_link = True + break + if '?' in link: + linkcopy = link.split('?', 1)[0] + linkcopy = get_normalized_link(linkcopy) + if linkcopy not in post_links: + post_links.append(linkcopy) + has_changed_any_link = True + break + if '#' in link: + linkcopy = link.split('#', 1)[0] + linkcopy = get_normalized_link(linkcopy) + if linkcopy not in post_links: + post_links.append(linkcopy) + has_changed_any_link = True + break + if contains_any(linkcopy, HTML_SPECIAL_CHARS): + linkcopy = replace_many( + linkcopy, HTML_SPECIAL_CHARS_REPLACE) + if linkcopy not in post_links: + post_links.append(linkcopy) + has_changed_any_link = True + break + if linkcopy[-1:] in ('/', '#', '?'): + while linkcopy[-1:] in ('/', '#', '?'): + linkcopy = linkcopy[: -1] + linkcopy = get_normalized_link(linkcopy) + if linkcopy not in post_links: + post_links.append(linkcopy) + has_changed_any_link = True + if link.strip() == '': + while link in post_links: + post_links.remove(link) + has_changed_any_link = True + break + if link.startswith('/'): + while link in post_links: + post_links.remove(link) + has_changed_any_link = True + break + if link.startswith('#'): + while link in post_links: + post_links.remove(link) + has_changed_any_link = True + break + if link.startswith('mailto'): + while link in post_links: + post_links.remove(link) + has_changed_any_link = True + break + if (proto := (tpl := link.split(':', 1))[0]).lower() in ('http', 'https') and proto not in ('http', 'https'): + lst = list(tpl) + lst[0] = lst[0].lower() + linkcopy = ':'.join(lst) + post_links.remove(link) + if linkcopy not in post_links: + post_links.append(linkcopy) + has_changed_any_link = True + break + post['links'] = list(filter(lambda link: ( + not link.startswith('https://preview.redd.it/') + or + ( + (('?width=' in link) or ('&width=' in link)) + and + (('?format=' in link) or ('&format=' in link)) + and + (('?auto=' in link) or ('&auto=' in link)) + and + (('?s=' in link) or ('&s=' in link)) + ) + ), post['links'])) + post['links'].sort() diff --git a/reddit_imgs/runner.py b/reddit_imgs/runner.py index 2c32208..af487df 100755 --- a/reddit_imgs/runner.py +++ b/reddit_imgs/runner.py @@ -9,7 +9,8 @@ from pathlib import Path import reddit_imgs.condensate_hashes import reddit_imgs.download_pruner -import reddit_imgs.fetch2 +import reddit_imgs.fetchpreprocess +import reddit_imgs.fetch3 import reddit_imgs.hashit2 import reddit_imgs.linguisticdictanal import reddit_imgs.sizebysubreddit @@ -32,11 +33,7 @@ def ensureFolderAvailability(): def main(): - # ensureFolderAvailability() - if len(sys.argv) > 1: - cmdline() - else: - mainmenu() + cmdline() def main_unshared(): @@ -47,7 +44,8 @@ def cmdline(run_each_on_subprocess=False): cmds = sys.argv[1:] available_commands = (( ('sync', reddit_imgs.sync.cmdline), - ('fetch', reddit_imgs.fetch2.cmdline), + ('fetchpreprocess', reddit_imgs.fetchpreprocess.cmdline), + ('fetch', reddit_imgs.fetch3.cmdline), ('suggest_subreddits_from_links', reddit_imgs.suggest_subreddits_from_links.cmdline), ('prune_downloads', reddit_imgs.download_pruner.cmdline), ('hashit', reddit_imgs.hashit2.cmdline), diff --git a/reddit_imgs/sync.py b/reddit_imgs/sync.py index 4e73c44..295439a 100755 --- a/reddit_imgs/sync.py +++ b/reddit_imgs/sync.py @@ -2,9 +2,12 @@ # -*- encoding: utf-8 -*- import json +import operator import os +import time from concurrent.futures import ProcessPoolExecutor as PoolExecutor from pathlib import Path +from typing import List, Dict, Union, Any from urllib.error import ContentTooShortError, HTTPError, URLError import colored as clrlib @@ -15,7 +18,10 @@ from .system.subredditTools import (GATEWAY_LINK_ARGS, build_gateway_link, getEmptySubredditData, getSubredditPageJsonInfo) +UNDERSAMPLED_CHECK_TIME = 6*3600 +TOLERABLE_ERROR_LOG_SIZE = 5 MAX_WORKERS = 16 +IGNORE_STATISTICAL_LEVERAGER = False def cmdline(encoded_args: str = None): @@ -26,10 +32,20 @@ def cmdline(encoded_args: str = None): def run_with_config(max_workers: int = None, + ignore_statistical_leverager: bool = False, + tolerable_error_log_size: int = 5, + undersampled_check_time: int = 6*3600, ): global MAX_WORKERS + global IGNORE_STATISTICAL_LEVERAGER + global TOLERABLE_ERROR_LOG_SIZE + global UNDERSAMPLED_CHECK_TIME + TOLERABLE_ERROR_LOG_SIZE = tolerable_error_log_size + IGNORE_STATISTICAL_LEVERAGER = ignore_statistical_leverager + UNDERSAMPLED_CHECK_TIME = undersampled_check_time if max_workers is not None: MAX_WORKERS = max_workers + MAX_WORKERS += MAX_WORKERS % 2 return main() @@ -38,6 +54,50 @@ simpleDownloader.setCookies({'over18': 1}) wdir = os.path.abspath('.') +class NoneResponseError(ValueError): + @classmethod + def check(cls, value): + if value is None: + raise cls('None value was found') + return value + + +def subreddit_error_append(subreddit: str, + error: str, + timestamp: Union[int, float] = None, + ): + errors_path = Path( + os.path.abspath(os.path.join(wdir, 'r', subreddit, 'errors.json')) + ) + if timestamp is None: + timestamp = time.time() + errors: List[Dict[str, Any]] = [] + if errors_path.is_file(): + errors = json.loads(errors_path.read_bytes()) + errors.append(dict(error=error, timestamp=timestamp)) + errors_path.write_text(json.dumps(errors, indent=1)) + + +def subreddit_error_empty(subreddit: str): + errors_path = Path( + os.path.abspath(os.path.join(wdir, 'r', subreddit, 'errors.json')) + ) + if errors_path.is_file(): + errors_path.unlink() + + +def subreddit_error_within_tolerance(subreddit: str): + if TOLERABLE_ERROR_LOG_SIZE < 0: + return True + errors_path = Path( + os.path.abspath(os.path.join(wdir, 'r', subreddit, 'errors.json')) + ) + errors: List[Dict[str, Any]] = [] + if errors_path.is_file(): + errors = json.loads(errors_path.read_bytes()) + return len(errors) <= TOLERABLE_ERROR_LOG_SIZE + + def process_subreddit(subreddit, srdt, jsonPageSr): simpleDownloader.setCookies({'over18': 1}) srp = os.path.abspath(os.path.join(wdir, 'r', subreddit)) @@ -54,23 +114,21 @@ def process_subreddit(subreddit, srdt, jsonPageSr): ])) redditBytes = None try: - redditBytes = simpleDownloader.getUrlBytes(nextpage) - except (HTTPError, URLError, ContentTooShortError): + redditBytes = NoneResponseError.check( + simpleDownloader.getUrlBytes( + nextpage + ) + ) + except (HTTPError, URLError, ContentTooShortError, NoneResponseError) as error: print(clrlib.stylize(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit), [ clrlib.fg('light_red'), clrlib.attr('bold'), ])) - print(clrlib.stylize(" >> HTTP Error with code: Skipping...", [ - clrlib.fg('light_red'), clrlib.attr('bold'), clrlib.attr('dim'), - ])) - break - if redditBytes is None: - print(clrlib.stylize(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit), [ - clrlib.fg('light_red'), clrlib.attr('bold'), - ])) - print(clrlib.stylize(" >> HTTP Error: Skipping...", [ + print(clrlib.stylize(f" >> HTTP Error: Skipping: {error}", [ clrlib.fg('light_red'), clrlib.attr('bold'), clrlib.attr('dim'), ])) + subreddit_error_append(subreddit, str(error)) break + subreddit_error_empty(subreddit) jsonPage = json.loads(redditBytes) getSubredditPageJsonInfoResult = None try: @@ -105,37 +163,64 @@ def process_subreddit(subreddit, srdt, jsonPageSr): flair=jsonPage['postFlair'][srid], ) srdt['links'] = list(filter(lambda a: len(a['datakey']) < 20, srdt['links'])) - srdt['links'] = sorted(srdt['links'], key=lambda a: -a['timestamp']) + srdt['links'] = sorted(srdt['links'], key=lambda a: - a['timestamp']) + if len(srdt['links']) >= 1: + srdt['date_first'] = srdt['links'][0]['timestamp'] # max + srdt['date_last'] = srdt['links'][-1]['timestamp'] # min return (subreddit, srp, srdt, jsonPageSr) +def statistically_expects_post(srdt, latest_file_change=None, use_current_time_with_average=1): + if srdt['date_last'] >= srdt['date_first'] or len(srdt['links']) < 2: + return latest_file_change is None or (time.time() - latest_file_change) > UNDERSAMPLED_CHECK_TIME + recent_timestamps = [link['timestamp'] for link in srdt['links'][:21]] + recent_timestamps_zipped = list(zip([time.time(), *recent_timestamps], [*recent_timestamps, 0])) + recent_timediffs_with_zeros = list(map(lambda a: operator.sub(*a), recent_timestamps_zipped))[use_current_time_with_average:-1] + recent_timediffs = list(filter(bool, recent_timediffs_with_zeros)) + if len(recent_timediffs) <= 0: + return True + recent_timediffs_avg = sum(recent_timediffs) / len(recent_timediffs) + if latest_file_change is None: + latest_file_change = srdt['links'][0]['timestamp'] + if ( + (use_current_time_with_average != 0) and + (int(time.time()) >= latest_file_change + recent_timediffs_avg * 5) + ): + return statistically_expects_post(srdt, latest_file_change, 0) + else: + return int(time.time()) >= latest_file_change + recent_timediffs_avg + + def main(): print('Building summary...') - srs, srsi, srf = build_summary() + srs, srsi, srf, srmt = build_summary() print('Download...') subreddits = sorted(filter(lambda sr: os.path.isdir( os.path.join(wdir, 'r', sr)), os.listdir(os.path.join(wdir, 'r')))) print('Opening process pool...') - with PoolExecutor(MAX_WORKERS) as pe2: + with PoolExecutor(MAX_WORKERS//2) as pe2: def process_subreddit_done_callback_inner(job): (subreddit, srp, srdt, jsonPageSr) = job.result() del job process_subreddit_done_callback(subreddit, srp, srdt, jsonPageSr, pe2, srs, srsi) return - with PoolExecutor(MAX_WORKERS) as pe: + with PoolExecutor(MAX_WORKERS//2) as pe: print('Opened process pool') for subreddit in subreddits: if subreddit not in srs: srs[subreddit] = getEmptySubredditData(subreddit) if subreddit not in srsi: srsi[subreddit] = None - job = pe.submit( - process_subreddit, - subreddit, - srs[subreddit], - srsi[subreddit], - ) - job.add_done_callback(process_subreddit_done_callback_inner) + if IGNORE_STATISTICAL_LEVERAGER or statistically_expects_post( + srs[subreddit], srmt.get(subreddit)): + if subreddit_error_within_tolerance(subreddit): + job = pe.submit( + process_subreddit, + subreddit, + srs[subreddit], + srsi[subreddit], + ) + job.add_done_callback(process_subreddit_done_callback_inner) print('Closing process pool...') print('Closed process pool') print('Writing summary...') @@ -176,6 +261,7 @@ def build_summary(): oldsrsi = json.loads(rijpath.read_bytes()) srs = dict() srsi = dict() + srmt = dict() nodownloadfilter = dict() nosfwfilter = dict() nonsfwfilter = dict() @@ -188,10 +274,13 @@ def build_summary(): else: if sr not in oldsrs: srp.unlink() + if sr in srmt: + del srmt[sr] else: print('Restoring old data for corrupted subrredit %r' % sr) srs[sr] = oldsrs[sr] srp.write_text(json.dumps(oldsrs[sr], indent=1)) + srmt[sr] = rjpath.stat().st_mtime if sripe: if srid is not None: srsi[sr] = srid @@ -209,6 +298,7 @@ def build_summary(): nosfwfilter[sr] = srp.parent.joinpath('nosfw.flag').exists() nonsfwfilter[sr] = srp.parent.joinpath('nonsfw.flag').exists() wallpaperfilter[sr] = srp.parent.joinpath('wallpaper.flag').exists() + srmt[sr] = srp.stat().st_mtime srip = srp.parent.joinpath('meta.json') job = pe.submit(read_disk_summary, sr, srp, srip) job.add_done_callback(on_data_read) @@ -218,7 +308,7 @@ def build_summary(): no_nsfw=nonsfwfilter, wallpaper=wallpaperfilter, ) - return srs, srsi, srf + return srs, srsi, srf, srmt def read_disk_summary(sr, srp, srip): diff --git a/reddit_imgs/system/libgallerydl.py b/reddit_imgs/system/libgallerydl.py new file mode 100644 index 0000000..4be19fd --- /dev/null +++ b/reddit_imgs/system/libgallerydl.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + +from pathlib import Path +# from collections import defaultdict +from typing import Optional, Tuple + +import gallery_dl +import gallery_dl.extractor +import gallery_dl.extractor.common +import gallery_dl.job +import gallery_dl.util + +from .downloader.cache import get_path_for_caching + + +def find_extractor(link) -> Optional[gallery_dl.extractor.common.Extractor]: + extractor: gallery_dl.extractor.common.Extractor = None + try: + extractor = gallery_dl.extractor.find(link) + except gallery_dl.exception.NotFoundError: + pass + return extractor + + +def find_extractor_category_subcategory(link) -> Optional[Tuple[str, str]]: + extractor: gallery_dl.extractor.common.Extractor = find_extractor(link) + if extractor is None: + return None + else: + return (type(extractor).category, type(extractor).subcategory) + + +def find_extractor_archival_key(link) -> Optional[str]: + extractor: gallery_dl.extractor.common.Extractor = find_extractor(link) + if extractor is None: + return None + else: + return str(get_path_for_caching(link, Path(''))) + # job = gallery_dl.job.SimulationJob(extractor) + # job.archive = FakeArchive('', extractor) + # job.run() # uses the network + # return next(iter(job.archive.captured_keys), None) + + +class FakeArchive(gallery_dl.util.DownloadArchive): + def __init__(self, path, extractor): + super().__init__(':memory:', extractor) + self.captured_keys = list() + + def check(self, kwdict): + return super().check(kwdict) + + def add(self, kwdict): + self.captured_keys.append(kwdict.get( + "_archive_key") or self.keygen(kwdict)) + return super().add(kwdict) diff --git a/reddit_imgs/system/simpleDownloader.py b/reddit_imgs/system/simpleDownloader.py index 4fa3196..349ea66 100644 --- a/reddit_imgs/system/simpleDownloader.py +++ b/reddit_imgs/system/simpleDownloader.py @@ -1,12 +1,31 @@ #!/usr/bin/env python3 # -*- encoding: utf-8 -*- +import re import time +from typing import Dict import urllib.parse import urllib.request import urllib.error +import subprocess +import platform -cookie = dict() +RGX_MAJOR_MINOR_VER = re.compile(r'(\d+\.\d+)') + +cookie: Dict[str, str] = dict() + + +def getFirefoxUserAgent(): + r = subprocess.run(['firefox', '--version'], stdout=subprocess.PIPE, text=True, check=True) + ver = RGX_MAJOR_MINOR_VER.findall(r.stdout.strip())[0] + os_string = '' + if platform.system() == 'Linux': + os_string = 'X11; Linux x86_64; ' + elif platform.system() == 'Darwin': + os_string = 'Macintosh; Intel Mac OS X 11.0; ' + elif platform.system() == 'Windows': + os_string = 'Windows NT 10.0; Win64; x64; ' + return f'Mozilla/5.0 ({os_string}rv:{ver}) Gecko/20100101 Firefox/{ver}' def delCookie(cookiekey): @@ -51,11 +70,9 @@ def getUrlBytes(url, giveUpOn403=False): url.encode('ascii') except: request = urllib.request.Request(urllib.parse.quote(url, safe='/%?#:')) - request.add_header('User-Agent', 'Mozilla/5.0 (X11; Linux x86_64; rv:82.0) ' + - 'Gecko/20100101 Firefox/82.0' - ) + request.add_header('User-Agent', getFirefoxUserAgent()) if len(cookie): - request.add_header("Cookie", '; '.join(map(lambda a: '='.join(a), cookie.items()))) + request.add_header("Cookie", '; '.join(map('='.join, cookie.items()))) response = None try: response = urllib.request.urlopen(request, timeout=30) diff --git a/reddit_imgs/wallpapers2.py b/reddit_imgs/wallpapers2.py index 4760964..83f7fb4 100644 --- a/reddit_imgs/wallpapers2.py +++ b/reddit_imgs/wallpapers2.py @@ -42,12 +42,9 @@ def main(): json.loads(Path('rf.json').read_text())['wallpaper'].items()))) reddit_posts = json.loads(Path('r_gdl_p.json').read_text()) link_to_files = json.loads(Path('i_gdl_ffl.json').read_text()) + ids_to_ignore = list(filter(len, map(str.strip, Path('not_wallpaper.txt').read_text().splitlines()))) copyfiles = list() - linksDown = 0 - linksNotDown = 0 - linksErr = 0 - print('Listing files...') for reddit_post in reddit_posts.values(): @@ -66,7 +63,10 @@ def main(): ) nsfwsafe = 'nsfw' if reddit_post['nsfw'] else 'safe' file_to = Path('w').joinpath(nsfwsafe).joinpath(img_fn) - copyfiles.append((file, file_to)) + if reddit_post['datakey'] not in ids_to_ignore: + copyfiles.append((file, file_to)) + elif file_to.exists(): + file_to.unlink() print('Creating folders...') lcf = len(copyfiles) @@ -112,13 +112,13 @@ def main(): print('\r'+' '*79+'\r'+'%03d%% copied: %05d of %05d' % ((((cnt+1)/lcf)*100)//1, cnt+1, lcf), end='') try: shutil.copyfile(src, dst) - except KeyboardInterrupt as e: + except KeyboardInterrupt as exc: print() print('\r'+' '*79+'\r'+'Deleting interrupted file...', end='') os.remove(dst) print('\r'+' '*79+'\r'+'Aborted safely', end='') print() - raise e + raise exc print() print() print('{0:>5} files were kept'.format(kept))