#!/usr/bin/env python3 # -*- encoding: utf-8 -*- import json import math from collections import OrderedDict, defaultdict from pathlib import Path from typing import Dict, List, Optional, Set, Tuple import reddit_imgs.sync from .system import libgallerydl from .system.cmdline_parser import parse_cmdline from .system.downloader.cache import get_normalized_link, get_path_for_caching from .system.flattener import flatten_generator from .system.urlmatcher import search_urls FORBIDDEN_WORKER_SPLITS = {'deviantart'} SKIP_INDEXED_FILES = True RETRY_ERROR_MASK = 0 CUSTOM_WORKER_SPLITS: Dict[str, int] = {} SPLIT_WORKER_AFTER_N_LINKS = 10000 IGNORE_WORKERS: Set[str] = set() REDOWNLOAD_EMPTIES = False REDOWNLOAD = False STOP_JOBS_FLAG_PATH = Path('stop_jobs.flag') HTML_SPECIAL_CHARS_REPLACE: List[Tuple[str, str]] = [ ('<', '<'), ('>', '>'), ('"', '"'), (''', '\''), ('&', '&'), ] HTML_SPECIAL_CHARS: List[str] = list( map(lambda a: a[0], HTML_SPECIAL_CHARS_REPLACE)) def contains_any(s: str, l: List[str]) -> bool: for i in l: if i in s: return True return False def replace_many(s: str, l: List[Tuple[str, str]]) -> str: for f, t in l: s = s.replace(f, t) return s def cmdline(encoded_args: str = None): if encoded_args is None: return run_with_config() else: return parse_cmdline(run_with_config, encoded_args) def run_with_config(redownload_empties: bool = False, redownload_all: bool = False, retry_generic_errors: bool = False, retry_unknown_errors: bool = False, retry_network_errors: bool = False, retry_not_found_errors: bool = False, retry_auth_errors: bool = False, retry_format_errors: bool = False, retry_extractor_errors: bool = False, retry_os_errors: bool = False, retry_not_in_disk_errors: bool = False, retry_gdl_mask: int = 0, split_workers: int = None, custom_worker_splits: dict = None, skip_indexed_files: bool = True, ignore_workers: Set[str] = set(), ): global SPLIT_WORKER_AFTER_N_LINKS global CUSTOM_WORKER_SPLITS global SKIP_INDEXED_FILES global REDOWNLOAD_EMPTIES global RETRY_ERROR_MASK global REDOWNLOAD global IGNORE_WORKERS IGNORE_WORKERS = ignore_workers REDOWNLOAD = redownload_all SKIP_INDEXED_FILES = skip_indexed_files REDOWNLOAD_EMPTIES = redownload_empties RETRY_ERROR_MASK |= retry_gdl_mask if retry_generic_errors: RETRY_ERROR_MASK |= 1 << 0 if retry_unknown_errors: RETRY_ERROR_MASK |= 1 << 1 if retry_network_errors: RETRY_ERROR_MASK |= 1 << 2 if retry_not_found_errors: RETRY_ERROR_MASK |= 1 << 3 if retry_auth_errors: RETRY_ERROR_MASK |= 1 << 4 if retry_format_errors: RETRY_ERROR_MASK |= 1 << 5 if retry_extractor_errors: RETRY_ERROR_MASK |= 1 << 6 if retry_os_errors: RETRY_ERROR_MASK |= 1 << 7 if retry_not_in_disk_errors: RETRY_ERROR_MASK |= 1 << 8 if split_workers is not None: SPLIT_WORKER_AFTER_N_LINKS = split_workers if custom_worker_splits is not None: CUSTOM_WORKER_SPLITS = custom_worker_splits return main() def main(): subreddit_data_path = Path('r.json') if not subreddit_data_path.exists(): print("Executing prerrequisite...") reddit_imgs.sync.main() subreddit_filters_path = Path('rf.json') print('Loading posts from disk...') Path('i_gdl').mkdir(exist_ok=True, parents=True) workers_state_path = Path('i_gdl_w') workers_state_path.mkdir(exist_ok=True, parents=True) for wsp in workers_state_path.iterdir(): wsp.unlink() if STOP_JOBS_FLAG_PATH.exists(): STOP_JOBS_FLAG_PATH.unlink() subreddit_data = json.loads(subreddit_data_path.read_text()) subreddit_filters = json.loads(subreddit_filters_path.read_bytes()) print('Loading posts...') posts = prerrun_flatten_subreddits_into_posts( subreddit_data, subreddit_filters) print(f'{len(posts)} posts identified.') print(f'Identifying alternative trivial links...') prerrun_posts_re_sort(posts) Path('r_gdl_p.json').write_text( json.dumps(posts, indent=1, sort_keys=True)) print(f'Grouping links with the posts they show up in...') links = OrderedDict() for dk, post in posts.items(): for link in post['links']: if link not in links: links[link] = list() links[link].append(dk) Path('r_gdl_lp.json').write_text( json.dumps(links, indent=1, sort_keys=True)) known_link_set = set(links.keys()) print(f'{len(links)} links found') print(f'Checking if there is an extractor for each link...') r_gdl_lk_path = Path('r_gdl_lk.json') r_gdl_le_path = Path('r_gdl_le.json') r_gdl_le_cs_path = Path('r_gdl_le_cs.json') r_gdl_le_cs_uv_path = Path('r_gdl_le_cs_uv.json') r_gdl_le_cs_uv_iu_path = Path('r_gdl_le_cs_uv_iu.json') link_keys = dict() if r_gdl_lk_path.exists(): link_keys = json.loads(r_gdl_lk_path.read_text()) for link in links.keys(): if link not in link_keys or link_keys[link] == '': key = libgallerydl.find_extractor_archival_key(link) if key is None: key = '' link_keys[link] = key del key link_extractors_category_subcategory: Dict[str, Optional[Tuple[str, str]]] = dict() link_extractors_category_subcategory_in_use: Dict[str, List[str]] = defaultdict( list) link_extractors: Dict[str, str] = dict() if r_gdl_le_cs_path.exists(): link_extractors_category_subcategory = dict(map( lambda a: (a[0], None if a[1] is None else tuple(a[1])), json.loads(r_gdl_le_cs_path.read_text()).items() )) # if r_gdl_le_path.exists(): # link_extractors = json.loads(r_gdl_le_path.read_text()) for link in links.keys(): if link_extractors_category_subcategory.get(link) is None: category_subcategory = libgallerydl.find_extractor_category_subcategory( link) link_extractors_category_subcategory[link] = category_subcategory for link in links.keys(): category_subcategory = link_extractors_category_subcategory[link] if category_subcategory is None: link_extractors[link] = '' else: category = category_subcategory[0] if category_subcategory in [ ('blogger', 'blog'), ('blogger', 'search'), ('deviantart', 'collection'), ('deviantart', 'folder'), ('deviantart', 'gallery'), ('deviantart', 'popular'), ('deviantart', 'user'), ('deviantart', 'tag'), ('e621', 'tag'), ('exhentai', 'gallery'), ('furaffinity', 'favorite'), ('furaffinity', 'gallery'), ('furaffinity', 'user'), ('imgur', 'subreddit'), ('imgur', 'user'), ('inkbunny', 'user'), ('instagram', 'tag'), ('instagram', 'user'), ('newgrounds', 'user'), ('paheal', 'tag'), ('pinterest', 'pin'), ('pinterest', 'pinit'), ('pornhub', 'user'), ('reddit', 'subreddit'), ('reddit', 'user'), ('sankakucomplex', 'article'), ('subscribestar', 'user'), ('tumblr', 'tag'), ('tumblr', 'user'), ('twitter', 'timeline'), ]: link_extractors[link] = '' else: link_extractors[link] = category link_extractors_category_subcategory_in_use['_'.join( category_subcategory)].append(link) for discarded_link in set(link_extractors.keys()).difference(known_link_set): del link_extractors[discarded_link] for discarded_link in set(link_extractors_category_subcategory.keys()).difference(known_link_set): del link_extractors_category_subcategory[discarded_link] r_gdl_le_cs_path.write_text(json.dumps( link_extractors_category_subcategory, indent=1, sort_keys=True)) r_gdl_le_cs_uv_path.write_text(json.dumps( sorted(list(set(filter(lambda a: a is not None, link_extractors_category_subcategory.values())))), indent=1, sort_keys=True)) r_gdl_le_cs_uv_iu_path.write_text(json.dumps( dict(frequency=dict(map(lambda a: (a[0], len(a[1])), link_extractors_category_subcategory_in_use.items())), full=link_extractors_category_subcategory_in_use), indent=1, sort_keys=True)) r_gdl_le_path.write_text(json.dumps( link_extractors, indent=1, sort_keys=True)) r_gdl_lk_path.write_text(json.dumps( link_keys, indent=1, sort_keys=True)) links_by_extractor = { extractor: list() for extractor in list(set(link_extractors.values())) } for link, extractor in link_extractors.items(): links_by_extractor[extractor].append(link) not_downloadable_links = dict() not_downloadable_links[''] = links_by_extractor.get('', []) not_downloadable_links['reddit_user'] = links_by_extractor.get( 'reddit_user', []) not_downloadable_links['reddit_subreddit'] = links_by_extractor.get( 'reddit_subreddit', []) Path('i_undownloadable.json').write_text( json.dumps(not_downloadable_links, indent=1)) if '' in links_by_extractor: del links_by_extractor[''] if 'reddit_user' in links_by_extractor: del links_by_extractor['reddit_user'] if 'reddit_subreddit' in links_by_extractor: del links_by_extractor['reddit_subreddit'] not_downloadable_link_set = frozenset( flatten_generator(not_downloadable_links.values())) print(f'{len(links)-len(not_downloadable_link_set)} downloadable links found') print(f'{len(not_downloadable_link_set)} undownloadable links found') print(f'{len(links_by_extractor)} extractors found') Path('r_gdl_lbe.json').write_text(json.dumps( links_by_extractor, indent=1, sort_keys=True)) files_from_links: Dict[str, List[str]] = dict() links_no_files: List[str] = list() files_sizes: Dict[str, int] = dict() link_statuses: Dict[str, int] = dict() ignored_links: Set[str] = set() if (pth := Path('i_gdl_ffl.json')).exists(): try: files_from_links = json.loads(pth.read_text()) except: pass if (pth := Path('i_gdl_lnf.json')).exists(): try: links_no_files = json.loads(pth.read_text()) except: pass if (pth := Path('i_gdl_fsz.json')).exists(): try: files_sizes = json.loads(pth.read_text()) except: pass if (pth := Path('i_gdl_spl.json')).exists(): try: link_statuses = json.loads(pth.read_text()) except: pass for discarded_link in set(links_no_files).difference(known_link_set): links_no_files.remove(discarded_link) discarded_files = set() for discarded_link in set(files_from_links.keys()).difference(known_link_set): files_in_link = files_from_links[discarded_link] for file_in_link in files_in_link: discarded_files.add(file_in_link) if discarded_link in link_statuses: del link_statuses[discarded_link] del files_from_links[discarded_link] files_to_keep = set() for files_from_link in files_from_links.values(): for file_from_link in files_from_link: if file_from_link not in files_to_keep: files_to_keep.add(file_from_link) for discarded_file in discarded_files.difference(files_to_keep): if discarded_file in files_sizes: del files_sizes[discarded_file] for missing_file_size in files_to_keep.difference(set(files_sizes.keys())): p = Path(missing_file_size) if not p.exists(): raise FileNotFoundError(missing_file_size) else: files_sizes[missing_file_size] = p.stat().st_size print('Re-filled files_sizes for %r' % p) if (p := Path('i_gdl_ignored.txt')).exists(): ignored_links = set(list(filter(len, p.read_text().splitlines()))) links_no_files = list(filter(lambda a: a not in ignored_links, links_no_files)) link_statuses = dict(filter(lambda a: a[0] not in ignored_links, link_statuses.items())) files_from_links = dict(filter(lambda a: a[0] not in ignored_links, files_from_links.items())) checked_links = list(files_from_links.keys()) + links_no_files checked_links = frozenset(checked_links) max_expected_jobs_for_extractor = 0 for extractor, links in links_by_extractor.items(): links = [link for link in links if link not in ignored_links and ( link not in checked_links or not SKIP_INDEXED_FILES or (link_statuses.get(link, 0xFF) & RETRY_ERROR_MASK) != 0 )] if len(links) <= 0: continue this_worker_split_after_n_links = CUSTOM_WORKER_SPLITS.get( extractor, SPLIT_WORKER_AFTER_N_LINKS) workers = math.ceil(len(links)/this_worker_split_after_n_links) if workers <= 1 or extractor in FORBIDDEN_WORKER_SPLITS: workers = 1 max_expected_jobs_for_extractor = max( max_expected_jobs_for_extractor, workers ) worker_by_seq = [list() for _ in range(max_expected_jobs_for_extractor)] links_to_worker = dict() for extractor, links in links_by_extractor.items(): links = [link for link in links if link not in ignored_links and ( link not in checked_links or not SKIP_INDEXED_FILES or (link_statuses.get(link, 0xFF) & RETRY_ERROR_MASK) != 0 )] if len(links) <= 0: continue this_worker_split_after_n_links = CUSTOM_WORKER_SPLITS.get( extractor, SPLIT_WORKER_AFTER_N_LINKS) workers = math.ceil(len(links)/this_worker_split_after_n_links) if workers <= 1 or extractor in FORBIDDEN_WORKER_SPLITS: if extractor in IGNORE_WORKERS: continue links_to_worker[extractor] = links worker_by_seq[0].append(extractor) else: digits = math.ceil(math.log10(max(1, workers+1))) fmt = "%%0%dd" % digits for worker_no in range(workers): lowerlimit = (worker_no+0)*this_worker_split_after_n_links upperlimit = (worker_no+1)*this_worker_split_after_n_links thisrange = links[lowerlimit:upperlimit] worker_nm = extractor + ':' + (fmt % (worker_no)) if worker_nm in IGNORE_WORKERS: continue links_to_worker[worker_nm] = thisrange worker_by_seq[worker_no].append(worker_nm) for w in worker_by_seq: w.sort() workers_nicely_grouped = [ worker for workergroup in worker_by_seq for worker in workergroup if worker != '' ] print(f'{len(links_to_worker)} workers to be spawned') response_dict = dict( files_from_links=files_from_links, links_no_files=links_no_files, files_sizes=files_sizes, link_statuses=link_statuses, workers_nicely_grouped=workers_nicely_grouped, workers_state_path=str(workers_state_path), links_to_worker=links_to_worker, link_keys=link_keys, SKIP_INDEXED_FILES=SKIP_INDEXED_FILES, RETRY_ERROR_MASK=RETRY_ERROR_MASK, CUSTOM_WORKER_SPLITS=CUSTOM_WORKER_SPLITS, SPLIT_WORKER_AFTER_N_LINKS=SPLIT_WORKER_AFTER_N_LINKS, REDOWNLOAD_EMPTIES=REDOWNLOAD_EMPTIES, REDOWNLOAD=REDOWNLOAD, ) Path('r_fetch_preprocessed.json').write_text( json.dumps(response_dict, indent=1)) if len(links_to_worker) == 0: raise Exception('No work to do.') return response_dict def prerrun_flatten_subreddits_into_posts(subreddit_data, subreddit_filters): postsl = [ {'subreddit': subreddit, **post} for subreddit, srdt in subreddit_data.items() for post in srdt['links'] ] postsl.sort(key=lambda a: (-a['timestamp'], a['datakey'])) postsd = dict() for post in postsl: dk = post['datakey'] sr = post['subreddit'] if subreddit_filters['no_download'][sr]: continue if subreddit_filters['no_sfw'][sr] and not post['nsfw']: continue if subreddit_filters['no_nsfw'][sr] and post['nsfw']: continue if dk not in postsd: postsd[dk] = post.copy() postsd[dk]['subreddits'] = list() postsd[dk]['links'] = list() del postsd[dk]['subreddit'] del postsd[dk]['link'] del postsd[dk]['domain'] if (sr := post['subreddit']) not in (srs := postsd[dk]['subreddits']): srs.append(sr) if (lnk := get_normalized_link(post['link'])) not in (lnks := postsd[dk]['links']): lnks.append(lnk) return postsd def prerrun_posts_re_sort(posts): for post in sorted(posts.values(), key=lambda a: (-a['timestamp'], a['datakey'])): post['subreddits'].sort() dk = post['datakey'] post_links = post['links'] has_changed_any_link = True while has_changed_any_link: has_changed_any_link = False for link in post_links: if '' in link: for linkcopy in search_urls(link): linkcopy = get_normalized_link(linkcopy) linkcopy = replace_many( linkcopy, HTML_SPECIAL_CHARS_REPLACE) if linkcopy not in post_links: post_links.append(linkcopy) has_changed_any_link = True while link in post_links: post_links.remove(link) has_changed_any_link = True break else: linkcopy = link linkcopy = get_normalized_link(linkcopy) if linkcopy not in post_links: post_links.append(linkcopy) has_changed_any_link = True break if '?' in link: linkcopy = link.split('?', 1)[0] linkcopy = get_normalized_link(linkcopy) if linkcopy not in post_links: post_links.append(linkcopy) has_changed_any_link = True break if '#' in link: linkcopy = link.split('#', 1)[0] linkcopy = get_normalized_link(linkcopy) if linkcopy not in post_links: post_links.append(linkcopy) has_changed_any_link = True break if contains_any(linkcopy, HTML_SPECIAL_CHARS): linkcopy = replace_many( linkcopy, HTML_SPECIAL_CHARS_REPLACE) if linkcopy not in post_links: post_links.append(linkcopy) has_changed_any_link = True break if linkcopy[-1:] in ('/', '#', '?'): while linkcopy[-1:] in ('/', '#', '?'): linkcopy = linkcopy[: -1] linkcopy = get_normalized_link(linkcopy) if linkcopy not in post_links: post_links.append(linkcopy) has_changed_any_link = True if link.strip() == '': while link in post_links: post_links.remove(link) has_changed_any_link = True break if link.startswith('/'): while link in post_links: post_links.remove(link) has_changed_any_link = True break if link.startswith('#'): while link in post_links: post_links.remove(link) has_changed_any_link = True break if link.startswith('mailto'): while link in post_links: post_links.remove(link) has_changed_any_link = True break if (proto := (tpl := link.split(':', 1))[0]).lower() in ('http', 'https') and proto not in ('http', 'https'): lst = list(tpl) lst[0] = lst[0].lower() linkcopy = ':'.join(lst) post_links.remove(link) if linkcopy not in post_links: post_links.append(linkcopy) has_changed_any_link = True break post['links'] = list(filter(lambda link: ( not link.startswith('https://preview.redd.it/') or ( (('?width=' in link) or ('&width=' in link)) and (('?format=' in link) or ('&format=' in link)) and (('?auto=' in link) or ('&auto=' in link)) and (('?s=' in link) or ('&s=' in link)) ) ), post['links'])) post['links'].sort()