449 lines
18 KiB
Python
449 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
# -*- encoding: utf-8 -*-
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import traceback
|
|
from concurrent.futures import ProcessPoolExecutor as PoolExecutor
|
|
from io import StringIO
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Set, Tuple
|
|
|
|
import colored as clrlib
|
|
|
|
from .system.cmdline_parser import parse_cmdline
|
|
from .system.downloader.cache import get_path_for_caching
|
|
|
|
STOP_JOBS_FLAG_PATH = Path('stop_jobs.flag')
|
|
FORBIDDEN_WORKER_SPLITS = {
|
|
'deviantart',
|
|
}
|
|
|
|
MAX_WORKERS = 12
|
|
SPLIT_WORKER_AFTER_N_LINKS = 10000
|
|
USE_FIREFOX_COOKIES = True
|
|
DEBUG_WORKER = None
|
|
IGNORE_WORKERS: Set[str] = set()
|
|
REDOWNLOAD = False
|
|
REDOWNLOAD_EMPTIES = False
|
|
CUSTOM_WORKER_SPLITS: Dict[str, int] = {}
|
|
SKIP_INDEXED_FILES = True
|
|
RETRY_ERROR_MASK = 0
|
|
FORBIDDEN_WORKER_SPLITS = {'deviantart'}
|
|
DOWNLOAD_SINGLE_MANUAL_LINK: Optional[str] = None
|
|
|
|
|
|
GDL_ERRORS = [
|
|
'GENERIC_ERR', # 1
|
|
'UNKNOWN_ERR', # 2
|
|
'HTTP_ERR', # 4
|
|
'404_ERR', # 8
|
|
'AUTH_ERR', # 16
|
|
'FORMAT_ERR', # 32
|
|
'LACKS_EXTRACTOR_ERR', # 64
|
|
'OS_ERR', # 128
|
|
'NOT_IN_DISK_ERR', # 256
|
|
'LIBRARY_ERROR', # 512
|
|
]
|
|
GDL_ERRORS_DICT = {(1 << k): v for k, v in enumerate(GDL_ERRORS)}
|
|
|
|
|
|
def cmdline(encoded_args: str = None):
|
|
if encoded_args is None:
|
|
return run_with_config()
|
|
else:
|
|
return parse_cmdline(run_with_config, encoded_args)
|
|
|
|
|
|
def run_with_config(
|
|
max_workers: int = None,
|
|
debug_worker: str = None,
|
|
manual_link: str = None,
|
|
):
|
|
global DEBUG_WORKER
|
|
global MAX_WORKERS
|
|
global DOWNLOAD_SINGLE_MANUAL_LINK
|
|
DOWNLOAD_SINGLE_MANUAL_LINK = manual_link
|
|
DEBUG_WORKER = debug_worker
|
|
if max_workers is not None:
|
|
MAX_WORKERS = max_workers
|
|
return main_loading_preprocessed()
|
|
|
|
|
|
def main_loading_preprocessed():
|
|
print('Initial data loading...')
|
|
kwargs = json.loads(Path('r_fetch_preprocessed.json').read_text())
|
|
global SKIP_INDEXED_FILES, RETRY_ERROR_MASK, CUSTOM_WORKER_SPLITS, SPLIT_WORKER_AFTER_N_LINKS, REDOWNLOAD_EMPTIES, REDOWNLOAD
|
|
SKIP_INDEXED_FILES = kwargs.pop('SKIP_INDEXED_FILES')
|
|
RETRY_ERROR_MASK = kwargs.pop('RETRY_ERROR_MASK')
|
|
CUSTOM_WORKER_SPLITS = kwargs.pop('CUSTOM_WORKER_SPLITS')
|
|
SPLIT_WORKER_AFTER_N_LINKS = kwargs.pop('SPLIT_WORKER_AFTER_N_LINKS')
|
|
REDOWNLOAD_EMPTIES = kwargs.pop('REDOWNLOAD_EMPTIES')
|
|
REDOWNLOAD = kwargs.pop('REDOWNLOAD')
|
|
kwargs['workers_state_path'] = Path(kwargs['workers_state_path'])
|
|
main(**kwargs)
|
|
|
|
|
|
def main(files_from_links: Dict[str, List[str]],
|
|
links_no_files: List[str],
|
|
files_sizes: Dict[str, int],
|
|
link_statuses: Dict[str, int],
|
|
workers_nicely_grouped: List[str],
|
|
workers_state_path: Path,
|
|
links_to_worker: Dict[str, List[str]],
|
|
link_keys: Dict[str, str]):
|
|
global DEBUG_WORKER
|
|
if len(links_to_worker) == 0:
|
|
raise Exception('No work to do.')
|
|
# if DEBUG_WORKER is None:
|
|
# raise Exception('DEBUG_ONLY: preventing cron jobs from running')
|
|
workers_state_path = Path('i_gdl_w')
|
|
workers_state_path.mkdir(exist_ok=True, parents=True)
|
|
for wsp in workers_state_path.iterdir():
|
|
wsp.unlink()
|
|
|
|
print('Initial data loaded.')
|
|
|
|
def save_ending_files():
|
|
nonlocal links_no_files
|
|
links_no_files2 = list(map(lambda a: a[0],
|
|
filter(lambda a: len(a[1]) <= 0 and a[0] not in links_no_files,
|
|
files_from_links.items()))) + links_no_files
|
|
files_from_links2 = dict(
|
|
filter(lambda a: len(a[1]) > 0,
|
|
files_from_links.items()))
|
|
links_no_files2_sorted = sorted(links_no_files2)
|
|
links_for_files = dict()
|
|
for link, files in files_from_links2.items():
|
|
for file in files:
|
|
if file not in links_for_files:
|
|
links_for_files[file] = list()
|
|
links_for_files[file].append(link)
|
|
del file
|
|
del link
|
|
del files
|
|
os.sync()
|
|
Path('i_gdl_lnf.json').write_text(
|
|
json.dumps(links_no_files2_sorted, indent=1))
|
|
Path('i_gdl_ffl.json').write_text(json.dumps(
|
|
files_from_links2, indent=1, sort_keys=True))
|
|
Path('i_gdl_lff.json').write_text(json.dumps(
|
|
links_for_files, indent=1, sort_keys=True))
|
|
Path('i_gdl_fsz.json').write_text(
|
|
json.dumps(files_sizes, indent=1, sort_keys=True))
|
|
Path('i_gdl_spl.json').write_text(json.dumps(
|
|
link_statuses, indent=1, sort_keys=True))
|
|
os.sync()
|
|
|
|
print('Performing partial save...')
|
|
save_ending_files()
|
|
print('Performed partial save.')
|
|
|
|
totalfiles = 0
|
|
|
|
thread_ids = workers_nicely_grouped.copy()
|
|
for line, thread_id in enumerate(thread_ids):
|
|
workers_state_path.joinpath(thread_id+'=line').write_text(str(line))
|
|
linkcount = len(links_to_worker[thread_id])
|
|
workers_state_path.joinpath(thread_id).write_text(
|
|
f'waiting:{linkcount}:{linkcount}:0:0')
|
|
do_fancy_multithreading_panel = False
|
|
thread_id_count = len(thread_ids)
|
|
if DOWNLOAD_SINGLE_MANUAL_LINK is not None or DEBUG_WORKER is not None:
|
|
if DOWNLOAD_SINGLE_MANUAL_LINK is not None:
|
|
DEBUG_WORKER = 'manual'
|
|
links_to_worker[DEBUG_WORKER] = [DOWNLOAD_SINGLE_MANUAL_LINK]
|
|
print(f'Will debug {repr(DEBUG_WORKER)}.')
|
|
thread_id = DEBUG_WORKER
|
|
links_list = links_to_worker[DEBUG_WORKER]
|
|
download_link_list(
|
|
links_list,
|
|
thread_id,
|
|
None,
|
|
f'Debugging {repr(DEBUG_WORKER)}...',
|
|
workers_state_path.joinpath(thread_id),
|
|
)
|
|
return
|
|
if links_to_worker:
|
|
with PoolExecutor(min(MAX_WORKERS, thread_id_count)) as pe:
|
|
if do_fancy_multithreading_panel:
|
|
print(f'\033[2J', end='', flush=True)
|
|
print(f'\033[0;0H', end='', flush=True)
|
|
print('Downloading...', flush=True)
|
|
if do_fancy_multithreading_panel:
|
|
print(f'\033[0;0H', end='', flush=True)
|
|
largest_tid_size = max(map(len, thread_ids))
|
|
line2tid = dict()
|
|
|
|
def done_callback_generator(line):
|
|
nonlocal totalfiles
|
|
|
|
def terminate_process_pool():
|
|
os.system('sync')
|
|
os.system(
|
|
"bash -c \"ps -aux | grep './redditgetter.py' | grep -v grep | sed -e 's/ */ /g' | cut -d' ' -f2 | xargs -r -- kill -15\"")
|
|
sys.exit(0xFF)
|
|
|
|
def done_callback(job):
|
|
nonlocal totalfiles
|
|
thread_id = line2tid[line]
|
|
links_list = links_to_worker[thread_id]
|
|
try:
|
|
workers_state_path.joinpath(thread_id).write_text(
|
|
f'finished:{len(links_list)}:0:0:0')
|
|
print(clrlib.stylize(
|
|
f'Received job #{line}: {thread_id}', [
|
|
clrlib.fg('white'),
|
|
clrlib.bg('green'),
|
|
clrlib.attr('bold'),
|
|
]
|
|
))
|
|
downloaded_links = list()
|
|
totalbytes = 0
|
|
thisfiles = 0
|
|
true = True
|
|
downloaded_links = job.result()
|
|
for link, files in downloaded_links:
|
|
if true:
|
|
statusdir = get_path_for_caching(
|
|
link, Path('i_gdl_s'))
|
|
statusdir.mkdir(parents=True, exist_ok=True)
|
|
statusfile = statusdir.joinpath(
|
|
'_gdl_status.json')
|
|
statuses = dict()
|
|
if statusfile.exists():
|
|
statuses = json.loads(
|
|
statusfile.read_text())
|
|
link_statuses[link] = statuses.get(link, 0xFF)
|
|
if link not in files_from_links:
|
|
files_from_links[link] = list()
|
|
lenfiles = len(files)
|
|
totalfiles += lenfiles
|
|
for file in files:
|
|
filepath = Path(file)
|
|
thisfiles += 1
|
|
if filepath.exists():
|
|
files_from_links[link].append(file)
|
|
st_size = filepath.stat().st_size
|
|
files_sizes[file] = st_size
|
|
totalbytes += st_size
|
|
workers_state_path.joinpath(thread_id).write_text(
|
|
f'finished:{len(links_list)}:0:{totalbytes}:{thisfiles}')
|
|
save_ending_files()
|
|
except:
|
|
sio = StringIO()
|
|
traceback.print_exc(file=sio)
|
|
excTxt = sio.getvalue()
|
|
try:
|
|
workers_state_path.joinpath(thread_id).write_text(
|
|
f'failed:{len(links_list)}:0:0:0')
|
|
except:
|
|
pass
|
|
try:
|
|
workers_state_path.joinpath(
|
|
thread_id+'=exc').write_text(excTxt)
|
|
except:
|
|
pass
|
|
try:
|
|
pe.shutdown(wait=False)
|
|
except:
|
|
pass
|
|
print(excTxt)
|
|
terminate_process_pool()
|
|
return
|
|
return done_callback
|
|
for line, thread_id in enumerate(thread_ids):
|
|
line2tid[line] = thread_id
|
|
links_list = links_to_worker[thread_id]
|
|
workers_state_path.joinpath(thread_id).write_text(
|
|
f'enqueued:{len(links_list)}:{len(links_list)}:0:0')
|
|
print(clrlib.stylize(f'Starting job #{line}: {thread_id}', [
|
|
clrlib.fg('white'),
|
|
clrlib.bg('light_red'),
|
|
clrlib.attr('bold'),
|
|
]))
|
|
jobstardedmsg = clrlib.stylize(f'Starting job #{line}: {thread_id}', [
|
|
clrlib.fg('black'),
|
|
clrlib.bg('light_yellow'),
|
|
clrlib.attr('bold'),
|
|
])
|
|
thread_id_nmsz = len(thread_id)
|
|
thread_id_display = thread_id + ' ' * \
|
|
(largest_tid_size - thread_id_nmsz)
|
|
job = pe.submit(
|
|
download_link_list,
|
|
links_list,
|
|
thread_id_display,
|
|
line+3 if do_fancy_multithreading_panel else None,
|
|
jobstardedmsg,
|
|
workers_state_path.joinpath(thread_id),
|
|
)
|
|
job.add_done_callback(done_callback_generator(line))
|
|
save_ending_files()
|
|
if (p := Path('latest_image_download.txt')).exists():
|
|
p.unlink()
|
|
if workers_state_path.exists():
|
|
for p in workers_state_path.glob('*'):
|
|
p.unlink()
|
|
shutil.rmtree(workers_state_path)
|
|
print(f'Downloaded {totalfiles} files')
|
|
|
|
|
|
def download_link_list(links: List[str],
|
|
thread_id: str,
|
|
line: Optional[int] = None,
|
|
job_started_msg: Optional[str] = None,
|
|
thread_state_path: Optional[Path] = None,
|
|
) -> List[Tuple[str, List[str]]]:
|
|
'''Downloads a link list inside a ProcessPoolExecutor'''
|
|
if STOP_JOBS_FLAG_PATH.exists():
|
|
raise InterruptedError(STOP_JOBS_FLAG_PATH)
|
|
if job_started_msg is not None:
|
|
print(job_started_msg)
|
|
has_its_own_line = line is not None
|
|
link_count = len(links)
|
|
remaining_links = link_count
|
|
if thread_state_path is not None:
|
|
thread_state_path.write_text(
|
|
f'running:{link_count}:{remaining_links}:0:0')
|
|
result = list()
|
|
totalbytes = 0
|
|
totalfiles = 0
|
|
try:
|
|
for link in links:
|
|
scrubbing = True
|
|
cachedir = get_path_for_caching(link, Path('i_gdl_c'))
|
|
statusdir = get_path_for_caching(link, Path('i_gdl_s'))
|
|
cachedir.mkdir(parents=True, exist_ok=True)
|
|
statusdir.mkdir(parents=True, exist_ok=True)
|
|
metafile = cachedir.joinpath('_gdl_meta.json')
|
|
statusfile = statusdir.joinpath('_gdl_status.json')
|
|
meta = dict()
|
|
statuses = dict()
|
|
link_already_downloaded = False
|
|
if metafile.exists():
|
|
try:
|
|
meta = json.loads(metafile.read_text())
|
|
except json.JSONDecodeError:
|
|
pass
|
|
if statusfile.exists():
|
|
try:
|
|
statuses = json.loads(statusfile.read_text())
|
|
except json.JSONDecodeError:
|
|
pass
|
|
if link in meta and link in statuses:
|
|
link_already_downloaded = True
|
|
rc = statuses.get(link, 0xFF)
|
|
if rc == 0:
|
|
for fl in meta[link]:
|
|
pth = Path(fl)
|
|
try:
|
|
if not pth.exists():
|
|
link_already_downloaded = False
|
|
break
|
|
except OSError:
|
|
link_already_downloaded = False
|
|
break
|
|
if len(meta[link]) == 0 and REDOWNLOAD_EMPTIES:
|
|
link_already_downloaded = False
|
|
if (rc & RETRY_ERROR_MASK) != 0:
|
|
link_already_downloaded = False
|
|
if not link_already_downloaded or REDOWNLOAD:
|
|
scrubbing = False
|
|
if thread_state_path is not None:
|
|
thread_state_path.write_text(
|
|
f'running:{link_count}:{remaining_links}:{totalbytes}:{totalfiles}:{link}')
|
|
spr = subprocess.run(
|
|
[
|
|
'python3',
|
|
str(Path(__file__).parent.joinpath(
|
|
'gallery_dl_scriptable.py')),
|
|
str(thread_id),
|
|
str(remaining_links),
|
|
'i_gdl',
|
|
link,
|
|
'latest_image_download.txt'
|
|
],
|
|
text=False,
|
|
stdin=sys.stdin,
|
|
stdout=subprocess.PIPE,
|
|
stderr=sys.stdout,
|
|
env={
|
|
'FORCE_COLOR': '1' if clrlib.colored('black').enabled() else '0',
|
|
**os.environ
|
|
},
|
|
)
|
|
jobres = {'status': 512,
|
|
'link_requested': link,
|
|
'link_effective': link,
|
|
'logs': [],
|
|
'files': []}
|
|
if spr.returncode == 0:
|
|
try:
|
|
jobres = json.loads(spr.stdout)
|
|
except Exception:
|
|
pass
|
|
rc: int = jobres['status']
|
|
os.sync()
|
|
files: List[str] = jobres['files'].copy()
|
|
files = list(filter(len, files))
|
|
has_changed = True
|
|
while has_changed:
|
|
has_changed = False
|
|
for seq, fl in enumerate(files):
|
|
if not (pth := Path(fl)).exists():
|
|
candidates = sorted(list(filter(
|
|
lambda p: (p.name.startswith(pth.name)
|
|
and
|
|
p.suffix != '.part'
|
|
and
|
|
p.suffix != '.json'),
|
|
pth.parent.iterdir())),
|
|
key=lambda p: len(p.name)
|
|
)
|
|
if len(candidates) > 0:
|
|
files[seq] = str(candidates[0])
|
|
has_changed = True
|
|
break
|
|
else:
|
|
rc |= 256
|
|
# raise Exception(pth.name, candidates, files)
|
|
del has_changed
|
|
meta[link] = files
|
|
statuses[link] = rc
|
|
metafile.write_text(json.dumps(meta, indent=1))
|
|
statusfile.write_text(json.dumps(statuses, indent=1))
|
|
os.sync()
|
|
for fl in meta[link]:
|
|
code = statuses[link]
|
|
pth = Path(fl)
|
|
if not pth.exists():
|
|
if code != 0:
|
|
continue
|
|
else:
|
|
raise FileNotFoundError((link,
|
|
link_already_downloaded,
|
|
meta[link]))
|
|
else:
|
|
totalfiles += 1
|
|
totalbytes += pth.stat().st_size
|
|
result.append((link, meta[link]))
|
|
remaining_links -= 1
|
|
if thread_state_path is not None:
|
|
scrubbing_running = 'scrubbing' if scrubbing else 'running'
|
|
thread_state_path.write_text(
|
|
f'{scrubbing_running}:{link_count}:{remaining_links}:{totalbytes}:{totalfiles}:{link}')
|
|
if STOP_JOBS_FLAG_PATH.exists():
|
|
raise InterruptedError(STOP_JOBS_FLAG_PATH)
|
|
finally:
|
|
print((f'\033[{line};0H' if has_its_own_line else '') +
|
|
clrlib.stylize(thread_id.strip(), [clrlib.fg('yellow'), clrlib.attr('bold')]) +
|
|
clrlib.stylize('#', [clrlib.fg('light_red')]) +
|
|
clrlib.stylize('Done', [clrlib.fg('light_green')]) +
|
|
('\033[K' if has_its_own_line else '')
|
|
)
|
|
return result
|