#!/usr/bin/env python3 # -*- encoding: utf-8 -*- import json import operator import os import time from concurrent.futures import ProcessPoolExecutor as PoolExecutor from pathlib import Path from typing import List, Dict, Union, Any from urllib.error import ContentTooShortError, HTTPError, URLError import colored as clrlib from .system import simpleDownloader from .system.cmdline_parser import parse_cmdline from .system.subredditTools import (GATEWAY_LINK_ARGS, build_gateway_link, getEmptySubredditData, getSubredditPageJsonInfo) UNDERSAMPLED_CHECK_TIME = 6*3600 TOLERABLE_ERROR_LOG_SIZE = 5 MAX_WORKERS = 16 IGNORE_STATISTICAL_LEVERAGER = False def cmdline(encoded_args: str = None): if encoded_args is None: return run_with_config() else: return parse_cmdline(run_with_config, encoded_args) def run_with_config(max_workers: int = None, ignore_statistical_leverager: bool = False, tolerable_error_log_size: int = 5, undersampled_check_time: int = 6*3600, ): global MAX_WORKERS global IGNORE_STATISTICAL_LEVERAGER global TOLERABLE_ERROR_LOG_SIZE global UNDERSAMPLED_CHECK_TIME TOLERABLE_ERROR_LOG_SIZE = tolerable_error_log_size IGNORE_STATISTICAL_LEVERAGER = ignore_statistical_leverager UNDERSAMPLED_CHECK_TIME = undersampled_check_time if max_workers is not None: MAX_WORKERS = max_workers MAX_WORKERS += MAX_WORKERS % 2 return main() simpleDownloader.setCookies({'over18': 1}) wdir = os.path.abspath('.') class NoneResponseError(ValueError): @classmethod def check(cls, value): if value is None: raise cls('None value was found') return value def subreddit_error_append(subreddit: str, error: str, timestamp: Union[int, float] = None, ): errors_path = Path( os.path.abspath(os.path.join(wdir, 'r', subreddit, 'errors.json')) ) if timestamp is None: timestamp = time.time() errors: List[Dict[str, Any]] = [] if errors_path.is_file(): errors = json.loads(errors_path.read_bytes()) errors.append(dict(error=error, timestamp=timestamp)) errors_path.write_text(json.dumps(errors, indent=1)) def subreddit_error_empty(subreddit: str): errors_path = Path( os.path.abspath(os.path.join(wdir, 'r', subreddit, 'errors.json')) ) if errors_path.is_file(): errors_path.unlink() def subreddit_error_within_tolerance(subreddit: str): if TOLERABLE_ERROR_LOG_SIZE < 0: return True errors_path = Path( os.path.abspath(os.path.join(wdir, 'r', subreddit, 'errors.json')) ) errors: List[Dict[str, Any]] = [] if errors_path.is_file(): errors = json.loads(errors_path.read_bytes()) return len(errors) <= TOLERABLE_ERROR_LOG_SIZE def process_subreddit(subreddit, srdt, jsonPageSr): simpleDownloader.setCookies({'over18': 1}) srp = os.path.abspath(os.path.join(wdir, 'r', subreddit)) nextpage = build_gateway_link(subreddit) pageno = 0 ygst = srdt['date_first'] while nextpage: pageno += 1 print(clrlib.stylize(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit), [ clrlib.fg('light_yellow'), ])) print(clrlib.stylize(' >> %s' % (nextpage.replace(GATEWAY_LINK_ARGS, '[...]'),), [ clrlib.fg('light_yellow'), clrlib.attr('dim'), ])) redditBytes = None try: redditBytes = NoneResponseError.check( simpleDownloader.getUrlBytes( nextpage ) ) except (HTTPError, URLError, ContentTooShortError, NoneResponseError) as error: print(clrlib.stylize(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit), [ clrlib.fg('light_red'), clrlib.attr('bold'), ])) print(clrlib.stylize(f" >> HTTP Error: Skipping: {error}", [ clrlib.fg('light_red'), clrlib.attr('bold'), clrlib.attr('dim'), ])) subreddit_error_append(subreddit, str(error)) break subreddit_error_empty(subreddit) jsonPage = json.loads(redditBytes) getSubredditPageJsonInfoResult = None try: getSubredditPageJsonInfoResult = ( getSubredditPageJsonInfo(jsonPage, subreddit, pageno)) except IndexError: print(clrlib.stylize(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit), [ clrlib.fg('light_gray'), clrlib.attr('dim'), ])) print(clrlib.stylize(" >> Empty subreddit: Skipping...", [ clrlib.fg('light_gray'), clrlib.attr('dim'), ])) break first, last, nextpage, links = getSubredditPageJsonInfoResult if ygst >= first: # if latest stored post is at same age or older than the latest downloaded post, then we are up-to-date nextpage = None srdt['date_first'] = max(first, srdt['date_first']) srdt['date_last'] = min(last, srdt['date_last']) for link in links[::-1]: if link not in srdt['links']: srdt['links'].append(link) srid = next(iter(set.intersection( set(jsonPage['subreddits'].keys()), set(jsonPage['postFlair'].keys()), set(jsonPage['subredditAboutInfo'].keys()) ))) jsonPageSr = dict( id=srid, name=subreddit, definition=jsonPage['subreddits'][srid], about=jsonPage['subredditAboutInfo'][srid], flair=jsonPage['postFlair'][srid], ) srdt['links'] = list(filter(lambda a: len(a['datakey']) < 20, srdt['links'])) srdt['links'] = sorted(srdt['links'], key=lambda a: - a['timestamp']) if len(srdt['links']) >= 1: srdt['date_first'] = srdt['links'][0]['timestamp'] # max srdt['date_last'] = srdt['links'][-1]['timestamp'] # min return (subreddit, srp, srdt, jsonPageSr) def statistically_expects_post(srdt, latest_file_change=None, use_current_time_with_average=1): if srdt['date_last'] >= srdt['date_first'] or len(srdt['links']) < 2: return latest_file_change is None or (time.time() - latest_file_change) > UNDERSAMPLED_CHECK_TIME recent_timestamps = [link['timestamp'] for link in srdt['links'][:21]] recent_timestamps_zipped = list(zip([time.time(), *recent_timestamps], [*recent_timestamps, 0])) recent_timediffs_with_zeros = list(map(lambda a: operator.sub(*a), recent_timestamps_zipped))[use_current_time_with_average:-1] recent_timediffs = list(filter(bool, recent_timediffs_with_zeros)) if len(recent_timediffs) <= 0: return True recent_timediffs_avg = sum(recent_timediffs) / len(recent_timediffs) if latest_file_change is None: latest_file_change = srdt['links'][0]['timestamp'] if ( (use_current_time_with_average != 0) and (int(time.time()) >= latest_file_change + recent_timediffs_avg * 5) ): return statistically_expects_post(srdt, latest_file_change, 0) else: return int(time.time()) >= latest_file_change + recent_timediffs_avg def main(): print('Building summary...') srs, srsi, srf, srmt = build_summary() print('Download...') subreddits = sorted(filter(lambda sr: os.path.isdir( os.path.join(wdir, 'r', sr)), os.listdir(os.path.join(wdir, 'r')))) print('Opening process pool...') with PoolExecutor(MAX_WORKERS//2) as pe2: def process_subreddit_done_callback_inner(job): (subreddit, srp, srdt, jsonPageSr) = job.result() del job process_subreddit_done_callback(subreddit, srp, srdt, jsonPageSr, pe2, srs, srsi) return with PoolExecutor(MAX_WORKERS//2) as pe: print('Opened process pool') for subreddit in subreddits: if subreddit not in srs: srs[subreddit] = getEmptySubredditData(subreddit) if subreddit not in srsi: srsi[subreddit] = None if IGNORE_STATISTICAL_LEVERAGER or statistically_expects_post( srs[subreddit], srmt.get(subreddit)): if subreddit_error_within_tolerance(subreddit): job = pe.submit( process_subreddit, subreddit, srs[subreddit], srsi[subreddit], ) job.add_done_callback(process_subreddit_done_callback_inner) print('Closing process pool...') print('Closed process pool') print('Writing summary...') write_summary(srs, srsi, srf) print('Done') def process_subreddit_done_callback(subreddit, srp, srdt, jsonPageSr, pe, srs, srsi): srs[subreddit] = srdt srsi[subreddit] = jsonPageSr print(clrlib.stylize(f' @> Writing /r/{subreddit}', [ clrlib.fg('light_cyan'), ])) job = pe.submit( post_processing_saver, subreddit, srp, srdt, jsonPageSr ) def post_processing_saver(subreddit, srp, srdt, jsonPageSr): write_json(Path(os.path.join(srp, 'subreddit.json')), srdt, sort_keys=True) if jsonPageSr is not None: write_json(Path(os.path.join(srp, 'meta.json')), jsonPageSr, sort_keys=True) print(clrlib.stylize(f' @> Written /r/{subreddit}', [ clrlib.fg('light_green'), ])) def build_summary(): rjpath = Path(wdir, 'r.json') rijpath = Path(wdir, 'ri.json') rfpath = Path(wdir, 'rf.json') oldsrs = dict() oldsrsi = dict() if rjpath.exists(): oldsrs = json.loads(rjpath.read_bytes()) if rijpath.exists(): oldsrsi = json.loads(rijpath.read_bytes()) srs = dict() srsi = dict() srmt = dict() nodownloadfilter = dict() nosfwfilter = dict() nonsfwfilter = dict() wallpaperfilter = dict() with PoolExecutor(MAX_WORKERS) as pe: def on_data_read(job): (sr, srp, srip, srd, srid, sripe) = job.result() if srd is not None: srs[sr] = srd else: if sr not in oldsrs: srp.unlink() if sr in srmt: del srmt[sr] else: print('Restoring old data for corrupted subrredit %r' % sr) srs[sr] = oldsrs[sr] srp.write_text(json.dumps(oldsrs[sr], indent=1)) srmt[sr] = rjpath.stat().st_mtime if sripe: if srid is not None: srsi[sr] = srid else: if sr not in oldsrsi: srip.unlink() else: print('Restoring old data for corrupted subrredit %r' % sr) srsi[sr] = oldsrsi[sr] srip.write_text(json.dumps(oldsrsi[sr], indent=1)) for srp in Path(wdir, 'r').glob('*/subreddit.json'): sr = srp.parent.name.lower() nodownloadfilter[sr] = srp.parent.joinpath('nodownload.flag').exists() nosfwfilter[sr] = srp.parent.joinpath('nosfw.flag').exists() nonsfwfilter[sr] = srp.parent.joinpath('nonsfw.flag').exists() wallpaperfilter[sr] = srp.parent.joinpath('wallpaper.flag').exists() srmt[sr] = srp.stat().st_mtime srip = srp.parent.joinpath('meta.json') job = pe.submit(read_disk_summary, sr, srp, srip) job.add_done_callback(on_data_read) srf = dict( no_download=nodownloadfilter, no_sfw=nosfwfilter, no_nsfw=nonsfwfilter, wallpaper=wallpaperfilter, ) return srs, srsi, srf, srmt def read_disk_summary(sr, srp, srip): srd = None srid = None sripe = srip.exists() try: srd = json.loads(srp.read_bytes()) except json.decoder.JSONDecodeError: pass if sripe: try: srid = json.loads(srip.read_bytes()) except json.decoder.JSONDecodeError: pass return (sr, srp, srip, srd, srid, sripe) def write_summary(srs, srsi, srf): rjpath = Path(wdir, 'r.json') rijpath = Path(wdir, 'ri.json') rfpath = Path(wdir, 'rf.json') with PoolExecutor(MAX_WORKERS) as pe: pe.submit(write_json, rjpath, srs) pe.submit(write_json, rijpath, srsi) pe.submit(write_json, rfpath, srf) def write_json(path, data, **kwargs): path.write_text(json.dumps(data, indent=1, **kwargs)) if __name__ == '__main__': main()