reddit-image-wall-getter/reddit_imgs/fetch4.py

449 lines
18 KiB
Python

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import json
import os
import shutil
import subprocess
import sys
import traceback
from concurrent.futures import ProcessPoolExecutor as PoolExecutor
from io import StringIO
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
import colored as clrlib
from .system.cmdline_parser import parse_cmdline
from .system.downloader.cache import get_path_for_caching
STOP_JOBS_FLAG_PATH = Path('stop_jobs.flag')
FORBIDDEN_WORKER_SPLITS = {
'deviantart',
}
MAX_WORKERS = 12
SPLIT_WORKER_AFTER_N_LINKS = 10000
USE_FIREFOX_COOKIES = True
DEBUG_WORKER = None
IGNORE_WORKERS: Set[str] = set()
REDOWNLOAD = False
REDOWNLOAD_EMPTIES = False
CUSTOM_WORKER_SPLITS: Dict[str, int] = {}
SKIP_INDEXED_FILES = True
RETRY_ERROR_MASK = 0
FORBIDDEN_WORKER_SPLITS = {'deviantart'}
DOWNLOAD_SINGLE_MANUAL_LINK: Optional[str] = None
GDL_ERRORS = [
'GENERIC_ERR', # 1
'UNKNOWN_ERR', # 2
'HTTP_ERR', # 4
'404_ERR', # 8
'AUTH_ERR', # 16
'FORMAT_ERR', # 32
'LACKS_EXTRACTOR_ERR', # 64
'OS_ERR', # 128
'NOT_IN_DISK_ERR', # 256
'LIBRARY_ERROR', # 512
]
GDL_ERRORS_DICT = {(1 << k): v for k, v in enumerate(GDL_ERRORS)}
def cmdline(encoded_args: str = None):
if encoded_args is None:
return run_with_config()
else:
return parse_cmdline(run_with_config, encoded_args)
def run_with_config(
max_workers: int = None,
debug_worker: str = None,
manual_link: str = None,
):
global DEBUG_WORKER
global MAX_WORKERS
global DOWNLOAD_SINGLE_MANUAL_LINK
DOWNLOAD_SINGLE_MANUAL_LINK = manual_link
DEBUG_WORKER = debug_worker
if max_workers is not None:
MAX_WORKERS = max_workers
return main_loading_preprocessed()
def main_loading_preprocessed():
print('Initial data loading...')
kwargs = json.loads(Path('r_fetch_preprocessed.json').read_text())
global SKIP_INDEXED_FILES, RETRY_ERROR_MASK, CUSTOM_WORKER_SPLITS, SPLIT_WORKER_AFTER_N_LINKS, REDOWNLOAD_EMPTIES, REDOWNLOAD
SKIP_INDEXED_FILES = kwargs.pop('SKIP_INDEXED_FILES')
RETRY_ERROR_MASK = kwargs.pop('RETRY_ERROR_MASK')
CUSTOM_WORKER_SPLITS = kwargs.pop('CUSTOM_WORKER_SPLITS')
SPLIT_WORKER_AFTER_N_LINKS = kwargs.pop('SPLIT_WORKER_AFTER_N_LINKS')
REDOWNLOAD_EMPTIES = kwargs.pop('REDOWNLOAD_EMPTIES')
REDOWNLOAD = kwargs.pop('REDOWNLOAD')
kwargs['workers_state_path'] = Path(kwargs['workers_state_path'])
main(**kwargs)
def main(files_from_links: Dict[str, List[str]],
links_no_files: List[str],
files_sizes: Dict[str, int],
link_statuses: Dict[str, int],
workers_nicely_grouped: List[str],
workers_state_path: Path,
links_to_worker: Dict[str, List[str]],
link_keys: Dict[str, str]):
global DEBUG_WORKER
if len(links_to_worker) == 0:
raise Exception('No work to do.')
# if DEBUG_WORKER is None:
# raise Exception('DEBUG_ONLY: preventing cron jobs from running')
workers_state_path = Path('i_gdl_w')
workers_state_path.mkdir(exist_ok=True, parents=True)
for wsp in workers_state_path.iterdir():
wsp.unlink()
print('Initial data loaded.')
def save_ending_files():
nonlocal links_no_files
links_no_files2 = list(map(lambda a: a[0],
filter(lambda a: len(a[1]) <= 0 and a[0] not in links_no_files,
files_from_links.items()))) + links_no_files
files_from_links2 = dict(
filter(lambda a: len(a[1]) > 0,
files_from_links.items()))
links_no_files2_sorted = sorted(links_no_files2)
links_for_files = dict()
for link, files in files_from_links2.items():
for file in files:
if file not in links_for_files:
links_for_files[file] = list()
links_for_files[file].append(link)
del file
del link
del files
os.sync()
Path('i_gdl_lnf.json').write_text(
json.dumps(links_no_files2_sorted, indent=1))
Path('i_gdl_ffl.json').write_text(json.dumps(
files_from_links2, indent=1, sort_keys=True))
Path('i_gdl_lff.json').write_text(json.dumps(
links_for_files, indent=1, sort_keys=True))
Path('i_gdl_fsz.json').write_text(
json.dumps(files_sizes, indent=1, sort_keys=True))
Path('i_gdl_spl.json').write_text(json.dumps(
link_statuses, indent=1, sort_keys=True))
os.sync()
print('Performing partial save...')
save_ending_files()
print('Performed partial save.')
totalfiles = 0
thread_ids = workers_nicely_grouped.copy()
for line, thread_id in enumerate(thread_ids):
workers_state_path.joinpath(thread_id+'=line').write_text(str(line))
linkcount = len(links_to_worker[thread_id])
workers_state_path.joinpath(thread_id).write_text(
f'waiting:{linkcount}:{linkcount}:0:0')
do_fancy_multithreading_panel = False
thread_id_count = len(thread_ids)
if DOWNLOAD_SINGLE_MANUAL_LINK is not None or DEBUG_WORKER is not None:
if DOWNLOAD_SINGLE_MANUAL_LINK is not None:
DEBUG_WORKER = 'manual'
links_to_worker[DEBUG_WORKER] = [DOWNLOAD_SINGLE_MANUAL_LINK]
print(f'Will debug {repr(DEBUG_WORKER)}.')
thread_id = DEBUG_WORKER
links_list = links_to_worker[DEBUG_WORKER]
download_link_list(
links_list,
thread_id,
None,
f'Debugging {repr(DEBUG_WORKER)}...',
workers_state_path.joinpath(thread_id),
)
return
if links_to_worker:
with PoolExecutor(min(MAX_WORKERS, thread_id_count)) as pe:
if do_fancy_multithreading_panel:
print(f'\033[2J', end='', flush=True)
print(f'\033[0;0H', end='', flush=True)
print('Downloading...', flush=True)
if do_fancy_multithreading_panel:
print(f'\033[0;0H', end='', flush=True)
largest_tid_size = max(map(len, thread_ids))
line2tid = dict()
def done_callback_generator(line):
nonlocal totalfiles
def terminate_process_pool():
os.system('sync')
os.system(
"bash -c \"ps -aux | grep './redditgetter.py' | grep -v grep | sed -e 's/ */ /g' | cut -d' ' -f2 | xargs -r -- kill -15\"")
sys.exit(0xFF)
def done_callback(job):
nonlocal totalfiles
thread_id = line2tid[line]
links_list = links_to_worker[thread_id]
try:
workers_state_path.joinpath(thread_id).write_text(
f'finished:{len(links_list)}:0:0:0')
print(clrlib.stylize(
f'Received job #{line}: {thread_id}', [
clrlib.fg('white'),
clrlib.bg('green'),
clrlib.attr('bold'),
]
))
downloaded_links = list()
totalbytes = 0
thisfiles = 0
true = True
downloaded_links = job.result()
for link, files in downloaded_links:
if true:
statusdir = get_path_for_caching(
link, Path('i_gdl_s'))
statusdir.mkdir(parents=True, exist_ok=True)
statusfile = statusdir.joinpath(
'_gdl_status.json')
statuses = dict()
if statusfile.exists():
statuses = json.loads(
statusfile.read_text())
link_statuses[link] = statuses.get(link, 0xFF)
if link not in files_from_links:
files_from_links[link] = list()
lenfiles = len(files)
totalfiles += lenfiles
for file in files:
filepath = Path(file)
thisfiles += 1
if filepath.exists():
files_from_links[link].append(file)
st_size = filepath.stat().st_size
files_sizes[file] = st_size
totalbytes += st_size
workers_state_path.joinpath(thread_id).write_text(
f'finished:{len(links_list)}:0:{totalbytes}:{thisfiles}')
save_ending_files()
except:
sio = StringIO()
traceback.print_exc(file=sio)
excTxt = sio.getvalue()
try:
workers_state_path.joinpath(thread_id).write_text(
f'failed:{len(links_list)}:0:0:0')
except:
pass
try:
workers_state_path.joinpath(
thread_id+'=exc').write_text(excTxt)
except:
pass
try:
pe.shutdown(wait=False)
except:
pass
print(excTxt)
terminate_process_pool()
return
return done_callback
for line, thread_id in enumerate(thread_ids):
line2tid[line] = thread_id
links_list = links_to_worker[thread_id]
workers_state_path.joinpath(thread_id).write_text(
f'enqueued:{len(links_list)}:{len(links_list)}:0:0')
print(clrlib.stylize(f'Starting job #{line}: {thread_id}', [
clrlib.fg('white'),
clrlib.bg('light_red'),
clrlib.attr('bold'),
]))
jobstardedmsg = clrlib.stylize(f'Starting job #{line}: {thread_id}', [
clrlib.fg('black'),
clrlib.bg('light_yellow'),
clrlib.attr('bold'),
])
thread_id_nmsz = len(thread_id)
thread_id_display = thread_id + ' ' * \
(largest_tid_size - thread_id_nmsz)
job = pe.submit(
download_link_list,
links_list,
thread_id_display,
line+3 if do_fancy_multithreading_panel else None,
jobstardedmsg,
workers_state_path.joinpath(thread_id),
)
job.add_done_callback(done_callback_generator(line))
save_ending_files()
if (p := Path('latest_image_download.txt')).exists():
p.unlink()
if workers_state_path.exists():
for p in workers_state_path.glob('*'):
p.unlink()
shutil.rmtree(workers_state_path)
print(f'Downloaded {totalfiles} files')
def download_link_list(links: List[str],
thread_id: str,
line: Optional[int] = None,
job_started_msg: Optional[str] = None,
thread_state_path: Optional[Path] = None,
) -> List[Tuple[str, List[str]]]:
'''Downloads a link list inside a ProcessPoolExecutor'''
if STOP_JOBS_FLAG_PATH.exists():
raise InterruptedError(STOP_JOBS_FLAG_PATH)
if job_started_msg is not None:
print(job_started_msg)
has_its_own_line = line is not None
link_count = len(links)
remaining_links = link_count
if thread_state_path is not None:
thread_state_path.write_text(
f'running:{link_count}:{remaining_links}:0:0')
result = list()
totalbytes = 0
totalfiles = 0
try:
for link in links:
scrubbing = True
cachedir = get_path_for_caching(link, Path('i_gdl_c'))
statusdir = get_path_for_caching(link, Path('i_gdl_s'))
cachedir.mkdir(parents=True, exist_ok=True)
statusdir.mkdir(parents=True, exist_ok=True)
metafile = cachedir.joinpath('_gdl_meta.json')
statusfile = statusdir.joinpath('_gdl_status.json')
meta = dict()
statuses = dict()
link_already_downloaded = False
if metafile.exists():
try:
meta = json.loads(metafile.read_text())
except json.JSONDecodeError:
pass
if statusfile.exists():
try:
statuses = json.loads(statusfile.read_text())
except json.JSONDecodeError:
pass
if link in meta and link in statuses:
link_already_downloaded = True
rc = statuses.get(link, 0xFF)
if rc == 0:
for fl in meta[link]:
pth = Path(fl)
try:
if not pth.exists():
link_already_downloaded = False
break
except OSError:
link_already_downloaded = False
break
if len(meta[link]) == 0 and REDOWNLOAD_EMPTIES:
link_already_downloaded = False
if (rc & RETRY_ERROR_MASK) != 0:
link_already_downloaded = False
if not link_already_downloaded or REDOWNLOAD:
scrubbing = False
if thread_state_path is not None:
thread_state_path.write_text(
f'running:{link_count}:{remaining_links}:{totalbytes}:{totalfiles}:{link}')
spr = subprocess.run(
[
'python3',
str(Path(__file__).parent.joinpath(
'gallery_dl_scriptable.py')),
str(thread_id),
str(remaining_links),
'i_gdl',
link,
'latest_image_download.txt'
],
text=False,
stdin=sys.stdin,
stdout=subprocess.PIPE,
stderr=sys.stdout,
env={
'FORCE_COLOR': '1' if clrlib.colored('black').enabled() else '0',
**os.environ
},
)
jobres = {'status': 512,
'link_requested': link,
'link_effective': link,
'logs': [],
'files': []}
if spr.returncode == 0:
try:
jobres = json.loads(spr.stdout)
except Exception:
pass
rc: int = jobres['status']
os.sync()
files: List[str] = jobres['files'].copy()
files = list(filter(len, files))
has_changed = True
while has_changed:
has_changed = False
for seq, fl in enumerate(files):
if not (pth := Path(fl)).exists():
candidates = sorted(list(filter(
lambda p: (p.name.startswith(pth.name)
and
p.suffix != '.part'
and
p.suffix != '.json'),
pth.parent.iterdir())),
key=lambda p: len(p.name)
)
if len(candidates) > 0:
files[seq] = str(candidates[0])
has_changed = True
break
else:
rc |= 256
# raise Exception(pth.name, candidates, files)
del has_changed
meta[link] = files
statuses[link] = rc
metafile.write_text(json.dumps(meta, indent=1))
statusfile.write_text(json.dumps(statuses, indent=1))
os.sync()
for fl in meta[link]:
code = statuses[link]
pth = Path(fl)
if not pth.exists():
if code != 0:
continue
else:
raise FileNotFoundError((link,
link_already_downloaded,
meta[link]))
else:
totalfiles += 1
totalbytes += pth.stat().st_size
result.append((link, meta[link]))
remaining_links -= 1
if thread_state_path is not None:
scrubbing_running = 'scrubbing' if scrubbing else 'running'
thread_state_path.write_text(
f'{scrubbing_running}:{link_count}:{remaining_links}:{totalbytes}:{totalfiles}:{link}')
if STOP_JOBS_FLAG_PATH.exists():
raise InterruptedError(STOP_JOBS_FLAG_PATH)
finally:
print((f'\033[{line};0H' if has_its_own_line else '') +
clrlib.stylize(thread_id.strip(), [clrlib.fg('yellow'), clrlib.attr('bold')]) +
clrlib.stylize('#', [clrlib.fg('light_red')]) +
clrlib.stylize('Done', [clrlib.fg('light_green')]) +
('\033[K' if has_its_own_line else '')
)
return result