reddit-image-wall-getter/reddit_imgs/sync.py

406 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import json
import operator
import os
import time
from concurrent.futures import ProcessPoolExecutor as PoolExecutor
from pathlib import Path
from typing import List, Dict, Union, Any
from urllib.error import ContentTooShortError, HTTPError, URLError
import colored as clrlib
from .system import simpleDownloader
from .system.cmdline_parser import parse_cmdline
from .system.subredditTools import (GATEWAY_LINK_ARGS, build_gateway_link,
getEmptySubredditData,
getSubredditPageJsonInfo)
UNDERSAMPLED_CHECK_TIME = 6*3600
TOLERABLE_ERROR_LOG_SIZE = 5
MAX_WORKERS = 16
IGNORE_STATISTICAL_LEVERAGER = False
TOLERABLE_OUTDATENESS = 5
MINIMUM_POST_TIME = 60
def cmdline(encoded_args: str = None):
if encoded_args is None:
return run_with_config()
else:
return parse_cmdline(run_with_config, encoded_args)
def run_with_config(max_workers: int = None,
ignore_statistical_leverager: bool = False,
tolerable_error_log_size: int = 5,
undersampled_check_time: int = 6 * 3600,
tolerable_outdatedness: int = 5,
minimum_post_interval: int = 60,
):
global MAX_WORKERS
global IGNORE_STATISTICAL_LEVERAGER
global TOLERABLE_ERROR_LOG_SIZE
global UNDERSAMPLED_CHECK_TIME
global TOLERABLE_OUTDATENESS
global MINIMUM_POST_TIME
TOLERABLE_OUTDATENESS = tolerable_outdatedness
MINIMUM_POST_TIME = minimum_post_interval
TOLERABLE_ERROR_LOG_SIZE = tolerable_error_log_size
IGNORE_STATISTICAL_LEVERAGER = ignore_statistical_leverager
UNDERSAMPLED_CHECK_TIME = undersampled_check_time
if max_workers is not None:
MAX_WORKERS = max_workers
MAX_WORKERS += MAX_WORKERS % 2
return main()
simpleDownloader.setCookies({'over18': 1})
wdir = os.path.abspath('.')
class NoneResponseError(ValueError):
@classmethod
def check(cls, value):
if value is None:
raise cls('None value was found')
return value
def subreddit_error_append(subreddit: str,
error: str,
timestamp: Union[int, float] = None,
):
errors_path = Path(
os.path.abspath(os.path.join(wdir, 'r', subreddit, 'errors.json'))
)
if timestamp is None:
timestamp = time.time()
errors: List[Dict[str, Any]] = []
if errors_path.is_file():
errors = json.loads(errors_path.read_bytes())
errors.append(dict(error=error, timestamp=timestamp))
errors_path.write_text(json.dumps(errors, indent=1))
def subreddit_error_empty(subreddit: str):
errors_path = Path(
os.path.abspath(os.path.join(wdir, 'r', subreddit, 'errors.json'))
)
if errors_path.is_file():
errors_path.unlink()
def subreddit_error_within_tolerance(subreddit: str):
if TOLERABLE_ERROR_LOG_SIZE < 0:
return True
errors_path = Path(
os.path.abspath(os.path.join(wdir, 'r', subreddit, 'errors.json'))
)
errors: List[Dict[str, Any]] = []
if errors_path.is_file():
errors = json.loads(errors_path.read_bytes())
return len(errors) <= TOLERABLE_ERROR_LOG_SIZE
def process_subreddit(subreddit, srdt, jsonPageSr):
simpleDownloader.setCookies({'over18': 1})
srp = os.path.abspath(os.path.join(wdir, 'r', subreddit))
nextpage = build_gateway_link(subreddit)
pageno = 0
ygst = srdt['date_first']
while nextpage:
pageno += 1
print(clrlib.stylize(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit), [
clrlib.fg('light_yellow'),
]))
print(clrlib.stylize(' >> %s' % (nextpage.replace(GATEWAY_LINK_ARGS, '[...]'),), [
clrlib.fg('light_yellow'), clrlib.attr('dim'),
]))
redditBytes = None
try:
redditBytes = NoneResponseError.check(
simpleDownloader.getUrlBytes(
nextpage
)
)
except (HTTPError, URLError, ContentTooShortError, NoneResponseError) as error:
print(clrlib.stylize(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit), [
clrlib.fg('light_red'), clrlib.attr('bold'),
]))
print(clrlib.stylize(f" >> HTTP Error: Skipping: {error}", [
clrlib.fg('light_red'), clrlib.attr(
'bold'), clrlib.attr('dim'),
]))
subreddit_error_append(subreddit, str(error))
break
subreddit_error_empty(subreddit)
jsonPage = json.loads(redditBytes)
getSubredditPageJsonInfoResult = None
try:
getSubredditPageJsonInfoResult = (
getSubredditPageJsonInfo(jsonPage, subreddit, pageno))
except IndexError:
print(clrlib.stylize(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit), [
clrlib.fg('light_gray'), clrlib.attr('dim'),
]))
print(clrlib.stylize(" >> Empty subreddit: Skipping...", [
clrlib.fg('light_gray'), clrlib.attr('dim'),
]))
break
first, last, nextpage, links = getSubredditPageJsonInfoResult
if ygst >= first: # if latest stored post is at same age or older than the latest downloaded post, then we are up-to-date
nextpage = None
srdt['date_first'] = max(first, srdt['date_first'])
srdt['date_last'] = min(last, srdt['date_last'])
for link in links[::-1]:
if link not in srdt['links']:
srdt['links'].append(link)
srid = next(iter(set.intersection(
set(jsonPage['subreddits'].keys()),
set(jsonPage['postFlair'].keys()),
set(jsonPage['subredditAboutInfo'].keys())
)))
jsonPageSr = dict(
id=srid,
name=subreddit,
definition=jsonPage['subreddits'][srid],
about=jsonPage['subredditAboutInfo'][srid],
flair=jsonPage['postFlair'][srid],
)
srdt['links'] = list(
filter(lambda a: len(a['datakey']) < 20, srdt['links']))
srdt['links'] = sorted(srdt['links'], key=lambda a: - a['timestamp'])
if len(srdt['links']) >= 1:
srdt['date_first'] = srdt['links'][0]['timestamp'] # max
srdt['date_last'] = srdt['links'][-1]['timestamp'] # min
return (subreddit, srp, srdt, jsonPageSr)
def statistically_expects_post(srdt,
latest_file_change=None,
use_current_time_with_average=1
) -> bool:
return _statistically_expects_post(srdt,
latest_file_change,
use_current_time_with_average
) < 0
def _statistically_expects_post(srdt,
latest_file_change=None,
use_current_time_with_average=1
) -> int:
# returns the time expected for a future image to arrive.
# negative is returned if it's deemed that an image is already available
if srdt['date_last'] >= srdt['date_first'] or len(srdt['links']) < 2:
if latest_file_change is None:
return -1
elif (time_due := int(
time.time() - latest_file_change - UNDERSAMPLED_CHECK_TIME
)) > 0:
return -time_due
return UNDERSAMPLED_CHECK_TIME
recent_timestamps = [link['timestamp'] for link in srdt['links'][:21]]
recent_timestamps_zipped = list(
zip([time.time(), *recent_timestamps], [*recent_timestamps, 0]))
recent_timediffs_with_zeros = list(map(lambda a: operator.sub(
*a), recent_timestamps_zipped))[use_current_time_with_average:-1]
recent_timediffs = list(filter(bool, recent_timediffs_with_zeros))
if len(recent_timediffs) <= 0:
return True
recent_timediffs_avg = sum(recent_timediffs) / len(recent_timediffs)
if latest_file_change is None:
latest_file_change = srdt['links'][0]['timestamp']
if (
(use_current_time_with_average != 0) and
(int(time.time()) >= latest_file_change + recent_timediffs_avg * 5)
):
return _statistically_expects_post(srdt, latest_file_change, 0)
else:
recent_timediffs_avg = max(1, TOLERABLE_OUTDATENESS) * max(
MINIMUM_POST_TIME, recent_timediffs_avg)
expected_next_post_timestamp = int(
latest_file_change + recent_timediffs_avg)
return expected_next_post_timestamp - int(time.time())
def main():
print('Building summary...')
srs, srsi, srf, srmt = build_summary()
print('Download...')
subreddits = sorted(filter(lambda sr: os.path.isdir(
os.path.join(wdir, 'r', sr)), os.listdir(os.path.join(wdir, 'r'))))
print('Opening process pool...')
skipped_processing = False
statistical_leverager_pendings = list()
with PoolExecutor(MAX_WORKERS//2) as pe2:
def process_subreddit_done_callback_inner(job):
(subreddit, srp, srdt, jsonPageSr) = job.result()
del job
process_subreddit_done_callback(
subreddit, srp, srdt, jsonPageSr, pe2, srs, srsi)
return
with PoolExecutor(MAX_WORKERS//2) as pe:
print('Opened process pool')
ignored_count = 0
for subreddit in subreddits:
if subreddit not in srs:
srs[subreddit] = getEmptySubredditData(subreddit)
if subreddit not in srsi:
srsi[subreddit] = None
if IGNORE_STATISTICAL_LEVERAGER or statistically_expects_post(
srs[subreddit], srmt.get(subreddit)):
if subreddit_error_within_tolerance(subreddit):
job = pe.submit(
process_subreddit,
subreddit,
srs[subreddit],
srsi[subreddit],
)
job.add_done_callback(
process_subreddit_done_callback_inner)
else:
ignored_count += 1
else:
ignored_count += 1
statistical_leverager_pendings.append(
_statistically_expects_post(
srs[subreddit], srmt.get(subreddit))
)
if ignored_count == len(subreddits):
skipped_processing = True
print('Closing process pool...')
print('Closed process pool')
if not skipped_processing:
print('Writing summary...')
write_summary(srs, srsi, srf)
print('Done')
else:
next_workload = min(
[*list(filter(lambda a: a >= 0, statistical_leverager_pendings)), 365*24*3600])
raise Exception(f'No new work to do. Wait {next_workload} seconds')
def process_subreddit_done_callback(subreddit, srp, srdt, jsonPageSr, pe, srs, srsi):
srs[subreddit] = srdt
srsi[subreddit] = jsonPageSr
print(clrlib.stylize(f' @> Writing /r/{subreddit}', [
clrlib.fg('light_cyan'),
]))
job = pe.submit(
post_processing_saver,
subreddit, srp, srdt, jsonPageSr
)
def post_processing_saver(subreddit, srp, srdt, jsonPageSr):
write_json(Path(os.path.join(srp, 'subreddit.json')), srdt, sort_keys=True)
if jsonPageSr is not None:
write_json(Path(os.path.join(srp, 'meta.json')),
jsonPageSr, sort_keys=True)
print(clrlib.stylize(f' @> Written /r/{subreddit}', [
clrlib.fg('light_green'),
]))
def build_summary():
rjpath = Path(wdir, 'r.json')
rijpath = Path(wdir, 'ri.json')
rfpath = Path(wdir, 'rf.json')
oldsrs = dict()
oldsrsi = dict()
if rjpath.exists():
oldsrs = json.loads(rjpath.read_bytes())
if rijpath.exists():
oldsrsi = json.loads(rijpath.read_bytes())
srs = dict()
srsi = dict()
srmt = dict()
nodownloadfilter = dict()
nosfwfilter = dict()
nonsfwfilter = dict()
wallpaperfilter = dict()
with PoolExecutor(MAX_WORKERS) as pe:
def on_data_read(job):
(sr, srp, srip, srd, srid, sripe) = job.result()
if srd is not None:
srs[sr] = srd
else:
if sr not in oldsrs:
srp.unlink()
if sr in srmt:
del srmt[sr]
else:
print('Restoring old data for corrupted subrredit %r' % sr)
srs[sr] = oldsrs[sr]
srp.write_text(json.dumps(oldsrs[sr], indent=1))
srmt[sr] = rjpath.stat().st_mtime
if sripe:
if srid is not None:
srsi[sr] = srid
else:
if sr not in oldsrsi:
srip.unlink()
else:
print('Restoring old data for corrupted subrredit %r' % sr)
srsi[sr] = oldsrsi[sr]
srip.write_text(json.dumps(oldsrsi[sr], indent=1))
for srp in Path(wdir, 'r').glob('*/subreddit.json'):
sr = srp.parent.name.lower()
nodownloadfilter[sr] = srp.parent.joinpath(
'nodownload.flag').exists()
nosfwfilter[sr] = srp.parent.joinpath('nosfw.flag').exists()
nonsfwfilter[sr] = srp.parent.joinpath('nonsfw.flag').exists()
wallpaperfilter[sr] = srp.parent.joinpath(
'wallpaper.flag').exists()
srmt[sr] = srp.stat().st_mtime
srip = srp.parent.joinpath('meta.json')
job = pe.submit(read_disk_summary, sr, srp, srip)
job.add_done_callback(on_data_read)
srf = dict(
no_download=nodownloadfilter,
no_sfw=nosfwfilter,
no_nsfw=nonsfwfilter,
wallpaper=wallpaperfilter,
)
return srs, srsi, srf, srmt
def read_disk_summary(sr, srp, srip):
srd = None
srid = None
sripe = srip.exists()
try:
srd = json.loads(srp.read_bytes())
except json.decoder.JSONDecodeError:
pass
if sripe:
try:
srid = json.loads(srip.read_bytes())
except json.decoder.JSONDecodeError:
pass
return (sr, srp, srip, srd, srid, sripe)
def write_summary(srs, srsi, srf):
rjpath = Path(wdir, 'r.json')
rijpath = Path(wdir, 'ri.json')
rfpath = Path(wdir, 'rf.json')
with PoolExecutor(MAX_WORKERS) as pe:
pe.submit(write_json, rjpath, srs)
pe.submit(write_json, rijpath, srsi)
pe.submit(write_json, rfpath, srf)
def write_json(path, data, **kwargs):
path.write_text(json.dumps(data, indent=1, **kwargs))
if __name__ == '__main__':
main()