reddit-image-wall-getter/reddit_imgs/sync.py

406 lines
15 KiB
Python
Raw Permalink Normal View History

2017-12-29 22:54:22 +00:00
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
2020-06-01 03:20:23 +00:00
import json
2021-01-23 06:15:31 +00:00
import operator
2017-12-29 22:54:22 +00:00
import os
2021-01-23 06:15:31 +00:00
import time
2020-06-01 03:20:23 +00:00
from concurrent.futures import ProcessPoolExecutor as PoolExecutor
from pathlib import Path
2021-01-23 06:15:31 +00:00
from typing import List, Dict, Union, Any
2020-06-01 03:20:23 +00:00
from urllib.error import ContentTooShortError, HTTPError, URLError
2020-11-06 00:08:05 +00:00
import colored as clrlib
2020-06-01 03:20:23 +00:00
2017-12-29 22:54:22 +00:00
from .system import simpleDownloader
2020-07-20 01:54:26 +00:00
from .system.cmdline_parser import parse_cmdline
2020-06-01 03:20:23 +00:00
from .system.subredditTools import (GATEWAY_LINK_ARGS, build_gateway_link,
getEmptySubredditData,
getSubredditPageJsonInfo)
2021-01-23 06:15:31 +00:00
UNDERSAMPLED_CHECK_TIME = 6*3600
TOLERABLE_ERROR_LOG_SIZE = 5
2020-07-20 01:54:26 +00:00
MAX_WORKERS = 16
2021-01-23 06:15:31 +00:00
IGNORE_STATISTICAL_LEVERAGER = False
2021-09-08 02:29:25 +00:00
TOLERABLE_OUTDATENESS = 5
MINIMUM_POST_TIME = 60
2020-07-20 01:54:26 +00:00
2017-12-29 22:54:22 +00:00
2020-07-20 01:54:26 +00:00
def cmdline(encoded_args: str = None):
if encoded_args is None:
return run_with_config()
else:
return parse_cmdline(run_with_config, encoded_args)
def run_with_config(max_workers: int = None,
2021-01-23 06:15:31 +00:00
ignore_statistical_leverager: bool = False,
tolerable_error_log_size: int = 5,
2021-09-08 02:29:25 +00:00
undersampled_check_time: int = 6 * 3600,
tolerable_outdatedness: int = 5,
minimum_post_interval: int = 60,
2020-07-20 01:54:26 +00:00
):
global MAX_WORKERS
2021-01-23 06:15:31 +00:00
global IGNORE_STATISTICAL_LEVERAGER
global TOLERABLE_ERROR_LOG_SIZE
global UNDERSAMPLED_CHECK_TIME
2021-09-08 02:29:25 +00:00
global TOLERABLE_OUTDATENESS
global MINIMUM_POST_TIME
TOLERABLE_OUTDATENESS = tolerable_outdatedness
MINIMUM_POST_TIME = minimum_post_interval
2021-01-23 06:15:31 +00:00
TOLERABLE_ERROR_LOG_SIZE = tolerable_error_log_size
IGNORE_STATISTICAL_LEVERAGER = ignore_statistical_leverager
UNDERSAMPLED_CHECK_TIME = undersampled_check_time
2020-07-20 01:54:26 +00:00
if max_workers is not None:
MAX_WORKERS = max_workers
2021-01-23 06:15:31 +00:00
MAX_WORKERS += MAX_WORKERS % 2
2020-07-20 01:54:26 +00:00
return main()
simpleDownloader.setCookies({'over18': 1})
2017-12-29 22:54:22 +00:00
wdir = os.path.abspath('.')
2020-07-20 01:54:26 +00:00
2021-01-23 06:15:31 +00:00
class NoneResponseError(ValueError):
@classmethod
def check(cls, value):
if value is None:
raise cls('None value was found')
return value
def subreddit_error_append(subreddit: str,
error: str,
timestamp: Union[int, float] = None,
):
errors_path = Path(
os.path.abspath(os.path.join(wdir, 'r', subreddit, 'errors.json'))
)
if timestamp is None:
timestamp = time.time()
errors: List[Dict[str, Any]] = []
if errors_path.is_file():
errors = json.loads(errors_path.read_bytes())
errors.append(dict(error=error, timestamp=timestamp))
errors_path.write_text(json.dumps(errors, indent=1))
def subreddit_error_empty(subreddit: str):
errors_path = Path(
os.path.abspath(os.path.join(wdir, 'r', subreddit, 'errors.json'))
)
if errors_path.is_file():
errors_path.unlink()
def subreddit_error_within_tolerance(subreddit: str):
if TOLERABLE_ERROR_LOG_SIZE < 0:
return True
errors_path = Path(
os.path.abspath(os.path.join(wdir, 'r', subreddit, 'errors.json'))
)
errors: List[Dict[str, Any]] = []
if errors_path.is_file():
errors = json.loads(errors_path.read_bytes())
return len(errors) <= TOLERABLE_ERROR_LOG_SIZE
2020-11-06 00:08:05 +00:00
def process_subreddit(subreddit, srdt, jsonPageSr):
2020-07-20 01:54:26 +00:00
simpleDownloader.setCookies({'over18': 1})
2020-06-01 03:20:23 +00:00
srp = os.path.abspath(os.path.join(wdir, 'r', subreddit))
nextpage = build_gateway_link(subreddit)
pageno = 0
ygst = srdt['date_first']
while nextpage:
2020-07-20 01:54:26 +00:00
pageno += 1
2020-11-06 00:08:05 +00:00
print(clrlib.stylize(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit), [
clrlib.fg('light_yellow'),
]))
print(clrlib.stylize(' >> %s' % (nextpage.replace(GATEWAY_LINK_ARGS, '[...]'),), [
clrlib.fg('light_yellow'), clrlib.attr('dim'),
]))
2020-06-01 03:20:23 +00:00
redditBytes = None
try:
2021-01-23 06:15:31 +00:00
redditBytes = NoneResponseError.check(
simpleDownloader.getUrlBytes(
nextpage
)
)
except (HTTPError, URLError, ContentTooShortError, NoneResponseError) as error:
2020-11-06 00:08:05 +00:00
print(clrlib.stylize(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit), [
clrlib.fg('light_red'), clrlib.attr('bold'),
]))
2021-01-23 06:15:31 +00:00
print(clrlib.stylize(f" >> HTTP Error: Skipping: {error}", [
2021-09-08 02:29:25 +00:00
clrlib.fg('light_red'), clrlib.attr(
'bold'), clrlib.attr('dim'),
2020-11-06 00:08:05 +00:00
]))
2021-01-23 06:15:31 +00:00
subreddit_error_append(subreddit, str(error))
2020-06-01 03:20:23 +00:00
break
2021-01-23 06:15:31 +00:00
subreddit_error_empty(subreddit)
2020-06-01 03:20:23 +00:00
jsonPage = json.loads(redditBytes)
2020-07-20 01:54:26 +00:00
getSubredditPageJsonInfoResult = None
try:
getSubredditPageJsonInfoResult = (
getSubredditPageJsonInfo(jsonPage, subreddit, pageno))
except IndexError:
2020-11-06 00:08:05 +00:00
print(clrlib.stylize(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit), [
clrlib.fg('light_gray'), clrlib.attr('dim'),
]))
print(clrlib.stylize(" >> Empty subreddit: Skipping...", [
clrlib.fg('light_gray'), clrlib.attr('dim'),
]))
2020-07-20 01:54:26 +00:00
break
first, last, nextpage, links = getSubredditPageJsonInfoResult
if ygst >= first: # if latest stored post is at same age or older than the latest downloaded post, then we are up-to-date
2020-06-01 03:20:23 +00:00
nextpage = None
srdt['date_first'] = max(first, srdt['date_first'])
srdt['date_last'] = min(last, srdt['date_last'])
for link in links[::-1]:
if link not in srdt['links']:
srdt['links'].append(link)
srid = next(iter(set.intersection(
set(jsonPage['subreddits'].keys()),
set(jsonPage['postFlair'].keys()),
set(jsonPage['subredditAboutInfo'].keys())
)))
jsonPageSr = dict(
id=srid,
name=subreddit,
definition=jsonPage['subreddits'][srid],
about=jsonPage['subredditAboutInfo'][srid],
flair=jsonPage['postFlair'][srid],
)
2021-09-08 02:29:25 +00:00
srdt['links'] = list(
filter(lambda a: len(a['datakey']) < 20, srdt['links']))
2021-01-23 06:15:31 +00:00
srdt['links'] = sorted(srdt['links'], key=lambda a: - a['timestamp'])
if len(srdt['links']) >= 1:
srdt['date_first'] = srdt['links'][0]['timestamp'] # max
srdt['date_last'] = srdt['links'][-1]['timestamp'] # min
2020-11-06 00:08:05 +00:00
return (subreddit, srp, srdt, jsonPageSr)
2020-06-01 03:20:23 +00:00
2021-09-08 02:29:25 +00:00
def statistically_expects_post(srdt,
latest_file_change=None,
use_current_time_with_average=1
) -> bool:
return _statistically_expects_post(srdt,
latest_file_change,
use_current_time_with_average
) < 0
def _statistically_expects_post(srdt,
latest_file_change=None,
use_current_time_with_average=1
) -> int:
# returns the time expected for a future image to arrive.
# negative is returned if it's deemed that an image is already available
2021-01-23 06:15:31 +00:00
if srdt['date_last'] >= srdt['date_first'] or len(srdt['links']) < 2:
2021-09-08 02:29:25 +00:00
if latest_file_change is None:
return -1
elif (time_due := int(
time.time() - latest_file_change - UNDERSAMPLED_CHECK_TIME
)) > 0:
return -time_due
return UNDERSAMPLED_CHECK_TIME
2021-01-23 06:15:31 +00:00
recent_timestamps = [link['timestamp'] for link in srdt['links'][:21]]
2021-09-08 02:29:25 +00:00
recent_timestamps_zipped = list(
zip([time.time(), *recent_timestamps], [*recent_timestamps, 0]))
recent_timediffs_with_zeros = list(map(lambda a: operator.sub(
*a), recent_timestamps_zipped))[use_current_time_with_average:-1]
2021-01-23 06:15:31 +00:00
recent_timediffs = list(filter(bool, recent_timediffs_with_zeros))
if len(recent_timediffs) <= 0:
return True
recent_timediffs_avg = sum(recent_timediffs) / len(recent_timediffs)
if latest_file_change is None:
latest_file_change = srdt['links'][0]['timestamp']
if (
(use_current_time_with_average != 0) and
(int(time.time()) >= latest_file_change + recent_timediffs_avg * 5)
):
2021-09-08 02:29:25 +00:00
return _statistically_expects_post(srdt, latest_file_change, 0)
2021-01-23 06:15:31 +00:00
else:
2021-09-08 02:29:25 +00:00
recent_timediffs_avg = max(1, TOLERABLE_OUTDATENESS) * max(
MINIMUM_POST_TIME, recent_timediffs_avg)
expected_next_post_timestamp = int(
latest_file_change + recent_timediffs_avg)
return expected_next_post_timestamp - int(time.time())
2021-01-23 06:15:31 +00:00
2017-12-29 22:54:22 +00:00
def main():
2020-11-06 00:08:05 +00:00
print('Building summary...')
2021-01-23 06:15:31 +00:00
srs, srsi, srf, srmt = build_summary()
2020-11-06 00:08:05 +00:00
print('Download...')
2020-07-20 01:54:26 +00:00
subreddits = sorted(filter(lambda sr: os.path.isdir(
os.path.join(wdir, 'r', sr)), os.listdir(os.path.join(wdir, 'r'))))
2020-11-06 00:08:05 +00:00
print('Opening process pool...')
2021-09-08 02:29:25 +00:00
skipped_processing = False
statistical_leverager_pendings = list()
2021-01-23 06:15:31 +00:00
with PoolExecutor(MAX_WORKERS//2) as pe2:
2020-11-06 00:08:05 +00:00
def process_subreddit_done_callback_inner(job):
(subreddit, srp, srdt, jsonPageSr) = job.result()
del job
2021-09-08 02:29:25 +00:00
process_subreddit_done_callback(
subreddit, srp, srdt, jsonPageSr, pe2, srs, srsi)
2020-11-06 00:08:05 +00:00
return
2021-01-23 06:15:31 +00:00
with PoolExecutor(MAX_WORKERS//2) as pe:
2020-11-06 00:08:05 +00:00
print('Opened process pool')
2021-09-08 02:29:25 +00:00
ignored_count = 0
2020-11-06 00:08:05 +00:00
for subreddit in subreddits:
if subreddit not in srs:
srs[subreddit] = getEmptySubredditData(subreddit)
if subreddit not in srsi:
srsi[subreddit] = None
2021-01-23 06:15:31 +00:00
if IGNORE_STATISTICAL_LEVERAGER or statistically_expects_post(
srs[subreddit], srmt.get(subreddit)):
if subreddit_error_within_tolerance(subreddit):
job = pe.submit(
process_subreddit,
subreddit,
srs[subreddit],
srsi[subreddit],
)
2021-09-08 02:29:25 +00:00
job.add_done_callback(
process_subreddit_done_callback_inner)
else:
ignored_count += 1
else:
ignored_count += 1
statistical_leverager_pendings.append(
_statistically_expects_post(
srs[subreddit], srmt.get(subreddit))
)
if ignored_count == len(subreddits):
skipped_processing = True
2020-11-06 00:08:05 +00:00
print('Closing process pool...')
print('Closed process pool')
2021-09-08 02:29:25 +00:00
if not skipped_processing:
print('Writing summary...')
write_summary(srs, srsi, srf)
print('Done')
else:
next_workload = min(
[*list(filter(lambda a: a >= 0, statistical_leverager_pendings)), 365*24*3600])
raise Exception(f'No new work to do. Wait {next_workload} seconds')
2020-11-06 00:08:05 +00:00
def process_subreddit_done_callback(subreddit, srp, srdt, jsonPageSr, pe, srs, srsi):
srs[subreddit] = srdt
srsi[subreddit] = jsonPageSr
print(clrlib.stylize(f' @> Writing /r/{subreddit}', [
clrlib.fg('light_cyan'),
]))
job = pe.submit(
post_processing_saver,
subreddit, srp, srdt, jsonPageSr
)
def post_processing_saver(subreddit, srp, srdt, jsonPageSr):
write_json(Path(os.path.join(srp, 'subreddit.json')), srdt, sort_keys=True)
if jsonPageSr is not None:
2021-09-08 02:29:25 +00:00
write_json(Path(os.path.join(srp, 'meta.json')),
jsonPageSr, sort_keys=True)
2020-11-06 00:08:05 +00:00
print(clrlib.stylize(f' @> Written /r/{subreddit}', [
clrlib.fg('light_green'),
]))
2020-05-13 21:07:05 +00:00
def build_summary():
2020-06-01 03:20:23 +00:00
rjpath = Path(wdir, 'r.json')
2020-07-20 01:54:26 +00:00
rijpath = Path(wdir, 'ri.json')
2020-11-06 00:08:05 +00:00
rfpath = Path(wdir, 'rf.json')
2020-06-01 03:20:23 +00:00
oldsrs = dict()
2020-07-20 01:54:26 +00:00
oldsrsi = dict()
2020-06-01 03:20:23 +00:00
if rjpath.exists():
2020-11-06 00:08:05 +00:00
oldsrs = json.loads(rjpath.read_bytes())
2020-07-20 01:54:26 +00:00
if rijpath.exists():
2020-11-06 00:08:05 +00:00
oldsrsi = json.loads(rijpath.read_bytes())
2020-05-13 21:07:05 +00:00
srs = dict()
2020-07-20 01:54:26 +00:00
srsi = dict()
2021-01-23 06:15:31 +00:00
srmt = dict()
2020-11-06 00:08:05 +00:00
nodownloadfilter = dict()
nosfwfilter = dict()
nonsfwfilter = dict()
wallpaperfilter = dict()
with PoolExecutor(MAX_WORKERS) as pe:
def on_data_read(job):
(sr, srp, srip, srd, srid, sripe) = job.result()
if srd is not None:
srs[sr] = srd
2020-06-01 03:20:23 +00:00
else:
2020-11-06 00:08:05 +00:00
if sr not in oldsrs:
srp.unlink()
2021-01-23 06:15:31 +00:00
if sr in srmt:
del srmt[sr]
2020-07-20 01:54:26 +00:00
else:
print('Restoring old data for corrupted subrredit %r' % sr)
2020-11-06 00:08:05 +00:00
srs[sr] = oldsrs[sr]
srp.write_text(json.dumps(oldsrs[sr], indent=1))
2021-01-23 06:15:31 +00:00
srmt[sr] = rjpath.stat().st_mtime
2020-11-06 00:08:05 +00:00
if sripe:
if srid is not None:
srsi[sr] = srid
else:
if sr not in oldsrsi:
srip.unlink()
else:
print('Restoring old data for corrupted subrredit %r' % sr)
srsi[sr] = oldsrsi[sr]
srip.write_text(json.dumps(oldsrsi[sr], indent=1))
for srp in Path(wdir, 'r').glob('*/subreddit.json'):
sr = srp.parent.name.lower()
2021-09-08 02:29:25 +00:00
nodownloadfilter[sr] = srp.parent.joinpath(
'nodownload.flag').exists()
2020-11-06 00:08:05 +00:00
nosfwfilter[sr] = srp.parent.joinpath('nosfw.flag').exists()
nonsfwfilter[sr] = srp.parent.joinpath('nonsfw.flag').exists()
2021-09-08 02:29:25 +00:00
wallpaperfilter[sr] = srp.parent.joinpath(
'wallpaper.flag').exists()
2021-01-23 06:15:31 +00:00
srmt[sr] = srp.stat().st_mtime
2020-11-06 00:08:05 +00:00
srip = srp.parent.joinpath('meta.json')
job = pe.submit(read_disk_summary, sr, srp, srip)
job.add_done_callback(on_data_read)
srf = dict(
no_download=nodownloadfilter,
no_sfw=nosfwfilter,
no_nsfw=nonsfwfilter,
wallpaper=wallpaperfilter,
)
2021-01-23 06:15:31 +00:00
return srs, srsi, srf, srmt
2020-11-06 00:08:05 +00:00
def read_disk_summary(sr, srp, srip):
srd = None
srid = None
sripe = srip.exists()
try:
srd = json.loads(srp.read_bytes())
except json.decoder.JSONDecodeError:
pass
if sripe:
try:
srid = json.loads(srip.read_bytes())
except json.decoder.JSONDecodeError:
pass
return (sr, srp, srip, srd, srid, sripe)
def write_summary(srs, srsi, srf):
rjpath = Path(wdir, 'r.json')
rijpath = Path(wdir, 'ri.json')
rfpath = Path(wdir, 'rf.json')
with PoolExecutor(MAX_WORKERS) as pe:
pe.submit(write_json, rjpath, srs)
pe.submit(write_json, rijpath, srsi)
pe.submit(write_json, rfpath, srf)
def write_json(path, data, **kwargs):
path.write_text(json.dumps(data, indent=1, **kwargs))
2020-05-13 21:07:05 +00:00
2017-12-29 22:54:22 +00:00
if __name__ == '__main__':
main()