#!/usr/bin/env python3 # -*- encoding: utf-8 -*- import json import operator import os import time from concurrent.futures import ProcessPoolExecutor as PoolExecutor from pathlib import Path from typing import List, Dict, Union, Any from urllib.error import ContentTooShortError, HTTPError, URLError import colored as clrlib from .system import simpleDownloader from .system.cmdline_parser import parse_cmdline from .system.subredditTools import (GATEWAY_LINK_ARGS, build_gateway_link, getEmptySubredditData, getSubredditPageJsonInfo) UNDERSAMPLED_CHECK_TIME = 6*3600 TOLERABLE_ERROR_LOG_SIZE = 5 MAX_WORKERS = 16 IGNORE_STATISTICAL_LEVERAGER = False TOLERABLE_OUTDATENESS = 5 MINIMUM_POST_TIME = 60 def cmdline(encoded_args: str = None): if encoded_args is None: return run_with_config() else: return parse_cmdline(run_with_config, encoded_args) def run_with_config(max_workers: int = None, ignore_statistical_leverager: bool = False, tolerable_error_log_size: int = 5, undersampled_check_time: int = 6 * 3600, tolerable_outdatedness: int = 5, minimum_post_interval: int = 60, ): global MAX_WORKERS global IGNORE_STATISTICAL_LEVERAGER global TOLERABLE_ERROR_LOG_SIZE global UNDERSAMPLED_CHECK_TIME global TOLERABLE_OUTDATENESS global MINIMUM_POST_TIME TOLERABLE_OUTDATENESS = tolerable_outdatedness MINIMUM_POST_TIME = minimum_post_interval TOLERABLE_ERROR_LOG_SIZE = tolerable_error_log_size IGNORE_STATISTICAL_LEVERAGER = ignore_statistical_leverager UNDERSAMPLED_CHECK_TIME = undersampled_check_time if max_workers is not None: MAX_WORKERS = max_workers MAX_WORKERS += MAX_WORKERS % 2 return main() simpleDownloader.setCookies({'over18': 1}) wdir = os.path.abspath('.') class NoneResponseError(ValueError): @classmethod def check(cls, value): if value is None: raise cls('None value was found') return value def subreddit_error_append(subreddit: str, error: str, timestamp: Union[int, float] = None, ): errors_path = Path( os.path.abspath(os.path.join(wdir, 'r', subreddit, 'errors.json')) ) if timestamp is None: timestamp = time.time() errors: List[Dict[str, Any]] = [] if errors_path.is_file(): errors = json.loads(errors_path.read_bytes()) errors.append(dict(error=error, timestamp=timestamp)) errors_path.write_text(json.dumps(errors, indent=1)) def subreddit_error_empty(subreddit: str): errors_path = Path( os.path.abspath(os.path.join(wdir, 'r', subreddit, 'errors.json')) ) if errors_path.is_file(): errors_path.unlink() def subreddit_error_within_tolerance(subreddit: str): if TOLERABLE_ERROR_LOG_SIZE < 0: return True errors_path = Path( os.path.abspath(os.path.join(wdir, 'r', subreddit, 'errors.json')) ) errors: List[Dict[str, Any]] = [] if errors_path.is_file(): errors = json.loads(errors_path.read_bytes()) return len(errors) <= TOLERABLE_ERROR_LOG_SIZE def process_subreddit(subreddit, srdt, jsonPageSr): simpleDownloader.setCookies({'over18': 1}) srp = os.path.abspath(os.path.join(wdir, 'r', subreddit)) nextpage = build_gateway_link(subreddit) pageno = 0 ygst = srdt['date_first'] while nextpage: pageno += 1 print(clrlib.stylize(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit), [ clrlib.fg('light_yellow'), ])) print(clrlib.stylize(' >> %s' % (nextpage.replace(GATEWAY_LINK_ARGS, '[...]'),), [ clrlib.fg('light_yellow'), clrlib.attr('dim'), ])) redditBytes = None try: redditBytes = NoneResponseError.check( simpleDownloader.getUrlBytes( nextpage ) ) except (HTTPError, URLError, ContentTooShortError, NoneResponseError) as error: print(clrlib.stylize(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit), [ clrlib.fg('light_red'), clrlib.attr('bold'), ])) print(clrlib.stylize(f" >> HTTP Error: Skipping: {error}", [ clrlib.fg('light_red'), clrlib.attr( 'bold'), clrlib.attr('dim'), ])) subreddit_error_append(subreddit, str(error)) break subreddit_error_empty(subreddit) jsonPage = json.loads(redditBytes) getSubredditPageJsonInfoResult = None try: getSubredditPageJsonInfoResult = ( getSubredditPageJsonInfo(jsonPage, subreddit, pageno)) except IndexError: print(clrlib.stylize(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit), [ clrlib.fg('light_gray'), clrlib.attr('dim'), ])) print(clrlib.stylize(" >> Empty subreddit: Skipping...", [ clrlib.fg('light_gray'), clrlib.attr('dim'), ])) break first, last, nextpage, links = getSubredditPageJsonInfoResult if ygst >= first: # if latest stored post is at same age or older than the latest downloaded post, then we are up-to-date nextpage = None srdt['date_first'] = max(first, srdt['date_first']) srdt['date_last'] = min(last, srdt['date_last']) for link in links[::-1]: if link not in srdt['links']: srdt['links'].append(link) srid = next(iter(set.intersection( set(jsonPage['subreddits'].keys()), set(jsonPage['postFlair'].keys()), set(jsonPage['subredditAboutInfo'].keys()) ))) jsonPageSr = dict( id=srid, name=subreddit, definition=jsonPage['subreddits'][srid], about=jsonPage['subredditAboutInfo'][srid], flair=jsonPage['postFlair'][srid], ) srdt['links'] = list( filter(lambda a: len(a['datakey']) < 20, srdt['links'])) srdt['links'] = sorted(srdt['links'], key=lambda a: - a['timestamp']) if len(srdt['links']) >= 1: srdt['date_first'] = srdt['links'][0]['timestamp'] # max srdt['date_last'] = srdt['links'][-1]['timestamp'] # min return (subreddit, srp, srdt, jsonPageSr) def statistically_expects_post(srdt, latest_file_change=None, use_current_time_with_average=1 ) -> bool: return _statistically_expects_post(srdt, latest_file_change, use_current_time_with_average ) < 0 def _statistically_expects_post(srdt, latest_file_change=None, use_current_time_with_average=1 ) -> int: # returns the time expected for a future image to arrive. # negative is returned if it's deemed that an image is already available if srdt['date_last'] >= srdt['date_first'] or len(srdt['links']) < 2: if latest_file_change is None: return -1 elif (time_due := int( time.time() - latest_file_change - UNDERSAMPLED_CHECK_TIME )) > 0: return -time_due return UNDERSAMPLED_CHECK_TIME recent_timestamps = [link['timestamp'] for link in srdt['links'][:21]] recent_timestamps_zipped = list( zip([time.time(), *recent_timestamps], [*recent_timestamps, 0])) recent_timediffs_with_zeros = list(map(lambda a: operator.sub( *a), recent_timestamps_zipped))[use_current_time_with_average:-1] recent_timediffs = list(filter(bool, recent_timediffs_with_zeros)) if len(recent_timediffs) <= 0: return True recent_timediffs_avg = sum(recent_timediffs) / len(recent_timediffs) if latest_file_change is None: latest_file_change = srdt['links'][0]['timestamp'] if ( (use_current_time_with_average != 0) and (int(time.time()) >= latest_file_change + recent_timediffs_avg * 5) ): return _statistically_expects_post(srdt, latest_file_change, 0) else: recent_timediffs_avg = max(1, TOLERABLE_OUTDATENESS) * max( MINIMUM_POST_TIME, recent_timediffs_avg) expected_next_post_timestamp = int( latest_file_change + recent_timediffs_avg) return expected_next_post_timestamp - int(time.time()) def main(): print('Building summary...') srs, srsi, srf, srmt = build_summary() print('Download...') subreddits = sorted(filter(lambda sr: os.path.isdir( os.path.join(wdir, 'r', sr)), os.listdir(os.path.join(wdir, 'r')))) print('Opening process pool...') skipped_processing = False statistical_leverager_pendings = list() with PoolExecutor(MAX_WORKERS//2) as pe2: def process_subreddit_done_callback_inner(job): (subreddit, srp, srdt, jsonPageSr) = job.result() del job process_subreddit_done_callback( subreddit, srp, srdt, jsonPageSr, pe2, srs, srsi) return with PoolExecutor(MAX_WORKERS//2) as pe: print('Opened process pool') ignored_count = 0 for subreddit in subreddits: if subreddit not in srs: srs[subreddit] = getEmptySubredditData(subreddit) if subreddit not in srsi: srsi[subreddit] = None if IGNORE_STATISTICAL_LEVERAGER or statistically_expects_post( srs[subreddit], srmt.get(subreddit)): if subreddit_error_within_tolerance(subreddit): job = pe.submit( process_subreddit, subreddit, srs[subreddit], srsi[subreddit], ) job.add_done_callback( process_subreddit_done_callback_inner) else: ignored_count += 1 else: ignored_count += 1 statistical_leverager_pendings.append( _statistically_expects_post( srs[subreddit], srmt.get(subreddit)) ) if ignored_count == len(subreddits): skipped_processing = True print('Closing process pool...') print('Closed process pool') if not skipped_processing: print('Writing summary...') write_summary(srs, srsi, srf) print('Done') else: next_workload = min( [*list(filter(lambda a: a >= 0, statistical_leverager_pendings)), 365*24*3600]) raise Exception(f'No new work to do. Wait {next_workload} seconds') def process_subreddit_done_callback(subreddit, srp, srdt, jsonPageSr, pe, srs, srsi): srs[subreddit] = srdt srsi[subreddit] = jsonPageSr print(clrlib.stylize(f' @> Writing /r/{subreddit}', [ clrlib.fg('light_cyan'), ])) job = pe.submit( post_processing_saver, subreddit, srp, srdt, jsonPageSr ) def post_processing_saver(subreddit, srp, srdt, jsonPageSr): write_json(Path(os.path.join(srp, 'subreddit.json')), srdt, sort_keys=True) if jsonPageSr is not None: write_json(Path(os.path.join(srp, 'meta.json')), jsonPageSr, sort_keys=True) print(clrlib.stylize(f' @> Written /r/{subreddit}', [ clrlib.fg('light_green'), ])) def build_summary(): rjpath = Path(wdir, 'r.json') rijpath = Path(wdir, 'ri.json') rfpath = Path(wdir, 'rf.json') oldsrs = dict() oldsrsi = dict() if rjpath.exists(): oldsrs = json.loads(rjpath.read_bytes()) if rijpath.exists(): oldsrsi = json.loads(rijpath.read_bytes()) srs = dict() srsi = dict() srmt = dict() nodownloadfilter = dict() nosfwfilter = dict() nonsfwfilter = dict() wallpaperfilter = dict() with PoolExecutor(MAX_WORKERS) as pe: def on_data_read(job): (sr, srp, srip, srd, srid, sripe) = job.result() if srd is not None: srs[sr] = srd else: if sr not in oldsrs: srp.unlink() if sr in srmt: del srmt[sr] else: print('Restoring old data for corrupted subrredit %r' % sr) srs[sr] = oldsrs[sr] srp.write_text(json.dumps(oldsrs[sr], indent=1)) srmt[sr] = rjpath.stat().st_mtime if sripe: if srid is not None: srsi[sr] = srid else: if sr not in oldsrsi: srip.unlink() else: print('Restoring old data for corrupted subrredit %r' % sr) srsi[sr] = oldsrsi[sr] srip.write_text(json.dumps(oldsrsi[sr], indent=1)) for srp in Path(wdir, 'r').glob('*/subreddit.json'): sr = srp.parent.name.lower() nodownloadfilter[sr] = srp.parent.joinpath( 'nodownload.flag').exists() nosfwfilter[sr] = srp.parent.joinpath('nosfw.flag').exists() nonsfwfilter[sr] = srp.parent.joinpath('nonsfw.flag').exists() wallpaperfilter[sr] = srp.parent.joinpath( 'wallpaper.flag').exists() srmt[sr] = srp.stat().st_mtime srip = srp.parent.joinpath('meta.json') job = pe.submit(read_disk_summary, sr, srp, srip) job.add_done_callback(on_data_read) srf = dict( no_download=nodownloadfilter, no_sfw=nosfwfilter, no_nsfw=nonsfwfilter, wallpaper=wallpaperfilter, ) return srs, srsi, srf, srmt def read_disk_summary(sr, srp, srip): srd = None srid = None sripe = srip.exists() try: srd = json.loads(srp.read_bytes()) except json.decoder.JSONDecodeError: pass if sripe: try: srid = json.loads(srip.read_bytes()) except json.decoder.JSONDecodeError: pass return (sr, srp, srip, srd, srid, sripe) def write_summary(srs, srsi, srf): rjpath = Path(wdir, 'r.json') rijpath = Path(wdir, 'ri.json') rfpath = Path(wdir, 'rf.json') with PoolExecutor(MAX_WORKERS) as pe: pe.submit(write_json, rjpath, srs) pe.submit(write_json, rijpath, srsi) pe.submit(write_json, rfpath, srf) def write_json(path, data, **kwargs): path.write_text(json.dumps(data, indent=1, **kwargs)) if __name__ == '__main__': main()