reddit-image-wall-getter/reddit_imgs/_fetch2.py

1073 lines
42 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import json
2020-06-05 22:19:45 +00:00
import math
2020-07-20 01:54:26 +00:00
import os
import pickle
2020-06-05 22:19:45 +00:00
import shutil
2020-07-20 01:54:26 +00:00
import subprocess
2020-06-05 22:19:45 +00:00
import sys
import traceback
2020-11-06 00:08:05 +00:00
from collections import OrderedDict
from concurrent.futures import ProcessPoolExecutor as PoolExecutor
2020-07-20 01:54:26 +00:00
from io import StringIO
from pathlib import Path
2020-11-06 00:08:05 +00:00
from typing import Dict, List, Optional, Set, Tuple, Type
import colored as clrlib
import gallery_dl
import gallery_dl.config
import gallery_dl.extractor
import gallery_dl.job
import gallery_dl.postprocessor.common
2020-07-20 01:54:26 +00:00
import gallery_dl.util
import reddit_imgs.sync
2020-07-20 01:54:26 +00:00
from .system.cmdline_parser import parse_cmdline
from .system.downloader.cache import get_normalized_link, get_path_for_caching
2020-07-20 01:54:26 +00:00
from .system.flattener import flatten_generator
from .system.urlmatcher import search_urls
2020-07-20 01:54:26 +00:00
gdl_pf: Type[gallery_dl.util.PathFormat] = (
gallery_dl.util.PathFormat
if not hasattr(gallery_dl.util, 'PathFormatOriginal') else
gallery_dl.util.PathFormatOriginal)
STOP_JOBS_FLAG_PATH = Path('stop_jobs.flag')
2020-06-05 22:19:45 +00:00
FORBIDDEN_WORKER_SPLITS = {
'deviantart',
}
2020-07-20 01:54:26 +00:00
MAX_WORKERS = 12
2020-11-06 00:08:05 +00:00
SPLIT_WORKER_AFTER_N_LINKS = 10000
2020-07-20 01:54:26 +00:00
USE_FIREFOX_COOKIES = True
DEBUG_WORKER = None
IGNORE_WORKERS = set()
REDOWNLOAD = False
REDOWNLOAD_EMPTIES = False
CUSTOM_WORKER_SPLITS = dict()
SKIP_INDEXED_FILES = True
RETRY_ERROR_MASK = 0
GDL_ERRORS = [
'GENERIC_ERR', # 1
'UNKNOWN_ERR', # 2
'HTTP_ERR', # 4
'404_ERR', # 8
'AUTH_ERR', # 16
'FORMAT_ERR', # 32
'LACKS_EXTRACTOR_ERR', # 64
'OS_ERR', # 128
'NOT_IN_DISK_ERR', # 256
]
GDL_ERRORS_DICT = {(1 << k): v for k, v in enumerate(GDL_ERRORS)}
2020-11-06 00:08:05 +00:00
HTML_SPECIAL_CHARS_REPLACE: List[Tuple[str, str]] = [
('&lt;', '<'),
('&gt;', '>'),
('&quot;', '"'),
('&#039;', '\''),
2021-01-23 06:15:31 +00:00
('&amp;', '&'),
2020-11-06 00:08:05 +00:00
]
HTML_SPECIAL_CHARS: List[str] = list(map(lambda a: a[0], HTML_SPECIAL_CHARS_REPLACE))
def contains_any(s: str, l: List[str]) -> bool:
for i in l:
if i in s:
return True
return False
def replace_many(s: str, l: List[Tuple[str, str]]) -> str:
for f, t in l:
s = s.replace(f, t)
return s
2020-07-20 01:54:26 +00:00
def cmdline(encoded_args: str = None):
if encoded_args is None:
return run_with_config()
else:
return parse_cmdline(run_with_config, encoded_args)
def run_with_config(redownload_empties: bool = False,
redownload_all: bool = False,
use_firefox_cookies: bool = True,
retry_generic_errors: bool = False,
retry_unknown_errors: bool = False,
retry_network_errors: bool = False,
retry_not_found_errors: bool = False,
retry_auth_errors: bool = False,
retry_format_errors: bool = False,
retry_extractor_errors: bool = False,
retry_os_errors: bool = False,
retry_not_in_disk_errors: bool = False,
retry_gdl_mask: int = 0,
max_workers: int = None,
split_workers: int = None,
debug_worker: str = None,
ignore_workers: Set[str] = set(),
custom_worker_splits: dict = None,
skip_indexed_files: bool = True,
):
global SPLIT_WORKER_AFTER_N_LINKS
global CUSTOM_WORKER_SPLITS
global USE_FIREFOX_COOKIES
global SKIP_INDEXED_FILES
global REDOWNLOAD_EMPTIES
global RETRY_ERROR_MASK
global IGNORE_WORKERS
global DEBUG_WORKER
global MAX_WORKERS
global REDOWNLOAD
REDOWNLOAD = redownload_all
DEBUG_WORKER = debug_worker
IGNORE_WORKERS = ignore_workers
SKIP_INDEXED_FILES = skip_indexed_files
REDOWNLOAD_EMPTIES = redownload_empties
USE_FIREFOX_COOKIES = use_firefox_cookies
RETRY_ERROR_MASK |= retry_gdl_mask
if retry_generic_errors:
RETRY_ERROR_MASK |= 1 << 0
if retry_unknown_errors:
RETRY_ERROR_MASK |= 1 << 1
if retry_network_errors:
RETRY_ERROR_MASK |= 1 << 2
if retry_not_found_errors:
RETRY_ERROR_MASK |= 1 << 3
if retry_auth_errors:
RETRY_ERROR_MASK |= 1 << 4
if retry_format_errors:
RETRY_ERROR_MASK |= 1 << 5
if retry_extractor_errors:
RETRY_ERROR_MASK |= 1 << 6
if retry_os_errors:
RETRY_ERROR_MASK |= 1 << 7
if retry_not_in_disk_errors:
RETRY_ERROR_MASK |= 1 << 8
if max_workers is not None:
MAX_WORKERS = max_workers
if split_workers is not None:
SPLIT_WORKER_AFTER_N_LINKS = split_workers
if debug_worker is not None:
DEBUG_WORKER = debug_worker
if custom_worker_splits is not None:
CUSTOM_WORKER_SPLITS = custom_worker_splits
return main()
2020-11-06 00:08:05 +00:00
def prerrun():
subreddit_data_path = Path('r.json')
if not subreddit_data_path.exists():
print("Executing prerrequisite...")
reddit_imgs.sync.main()
2020-07-20 01:54:26 +00:00
if USE_FIREFOX_COOKIES:
print('Getting cookies from Firefox...')
subprocess.run([
'reddit_imgs/get_firefox_cookies.sh',
'i_gdl/.cookies'],
).check_returncode()
2020-11-06 00:08:05 +00:00
subreddit_filters_path = Path('rf.json')
2020-07-20 01:54:26 +00:00
print('Loading posts from disk...')
Path('i_gdl').mkdir(exist_ok=True, parents=True)
2020-06-05 22:19:45 +00:00
workers_state_path = Path('i_gdl_w')
workers_state_path.mkdir(exist_ok=True, parents=True)
for wsp in workers_state_path.iterdir():
wsp.unlink()
2020-07-20 01:54:26 +00:00
if STOP_JOBS_FLAG_PATH.exists():
STOP_JOBS_FLAG_PATH.unlink()
subreddit_data = json.loads(subreddit_data_path.read_text())
2020-11-06 00:08:05 +00:00
subreddit_filters = json.loads(subreddit_filters_path.read_bytes())
2020-07-20 01:54:26 +00:00
print('Loading posts...')
2020-11-06 00:08:05 +00:00
posts = prerrun_flatten_subreddits_into_posts(subreddit_data, subreddit_filters)
print(f'{len(posts)} posts identified.')
print(f'Identifying alternative trivial links...')
2020-11-06 00:08:05 +00:00
prerrun_posts_re_sort(posts)
2020-07-20 01:54:26 +00:00
Path('r_gdl_p.json').write_text(
json.dumps(posts, indent=1, sort_keys=True))
print(f'Grouping links with the posts they show up in...')
2020-11-06 00:08:05 +00:00
links = OrderedDict()
for dk, post in posts.items():
for link in post['links']:
if link not in links:
links[link] = list()
links[link].append(dk)
2020-07-20 01:54:26 +00:00
Path('r_gdl_lp.json').write_text(
json.dumps(links, indent=1, sort_keys=True))
2020-11-06 00:08:05 +00:00
known_link_set = set(links.keys())
print(f'{len(links)} links found')
print(f'Checking if there is an extractor for each link...')
r_gdl_le_path = Path('r_gdl_le.json')
link_extractors = dict()
if r_gdl_le_path.exists():
link_extractors = json.loads(r_gdl_le_path.read_text())
2020-06-05 22:19:45 +00:00
for link in links.keys():
if link not in link_extractors or link_extractors[link] == '':
ext = None
try:
ext = gallery_dl.extractor.find(link)
except gallery_dl.exception.NotFoundError:
pass
2020-07-20 01:54:26 +00:00
if ext is not None and type(ext).category == 'reddit' and type(ext).subcategory in ('subreddit', 'user'):
link_extractors[link] = f'{type(ext).category}_{type(ext).subcategory}'
else:
link_extractors[link] = (type(ext).category
if ext is not None else
'')
2020-11-06 00:08:05 +00:00
for discarded_link in set(link_extractors.keys()).difference(known_link_set):
del link_extractors[discarded_link]
2020-07-20 01:54:26 +00:00
r_gdl_le_path.write_text(json.dumps(
link_extractors, indent=1, sort_keys=True))
links_by_extractor = {
extractor: list()
for extractor in list(set(link_extractors.values()))
}
for link, extractor in link_extractors.items():
links_by_extractor[extractor].append(link)
2020-07-20 01:54:26 +00:00
not_downloadable_links = dict()
not_downloadable_links[''] = links_by_extractor.get('', [])
not_downloadable_links['reddit_user'] = links_by_extractor.get('reddit_user', [])
not_downloadable_links['reddit_subreddit'] = links_by_extractor.get('reddit_subreddit', [])
Path('i_undownloadable.json').write_text(
2020-07-20 01:54:26 +00:00
json.dumps(not_downloadable_links, indent=1))
if '' in links_by_extractor:
del links_by_extractor['']
2020-07-20 01:54:26 +00:00
if 'reddit_user' in links_by_extractor:
del links_by_extractor['reddit_user']
if 'reddit_subreddit' in links_by_extractor:
del links_by_extractor['reddit_subreddit']
not_downloadable_link_set = frozenset(flatten_generator(not_downloadable_links.values()))
print(f'{len(links)-len(not_downloadable_link_set)} downloadable links found')
print(f'{len(not_downloadable_link_set)} undownloadable links found')
print(f'{len(links_by_extractor)} extractors found')
2020-11-06 00:08:05 +00:00
Path('r_gdl_lbe.json').write_text(json.dumps(links_by_extractor, indent=1, sort_keys=True))
2020-11-06 00:08:05 +00:00
files_from_links: Dict[str, List[str]] = dict()
links_no_files: List[str] = list()
files_sizes: Dict[str, int] = dict()
link_statuses: Dict[str, int] = dict()
ignored_links: Set[str] = set()
2020-07-20 01:54:26 +00:00
if (pth := Path('i_gdl_ffl.json')).exists():
try:
files_from_links = json.loads(pth.read_text())
except:
pass
2020-07-20 01:54:26 +00:00
if (pth := Path('i_gdl_lnf.json')).exists():
try:
links_no_files = json.loads(pth.read_text())
except:
pass
2020-07-20 01:54:26 +00:00
if (pth := Path('i_gdl_fsz.json')).exists():
try:
files_sizes = json.loads(pth.read_text())
except:
pass
2020-07-20 01:54:26 +00:00
if (pth := Path('i_gdl_spl.json')).exists():
try:
link_statuses = json.loads(pth.read_text())
except:
pass
2020-07-20 01:54:26 +00:00
2020-11-06 00:08:05 +00:00
for discarded_link in set(links_no_files).difference(known_link_set):
links_no_files.remove(discarded_link)
discarded_files = set()
for discarded_link in set(files_from_links.keys()).difference(known_link_set):
if discarded_link in files_from_links:
files_in_link = files_from_links[discarded_link]
for file_in_link in files_in_link:
discarded_files.add(file_in_link)
if discarded_link in link_statuses:
del link_statuses[discarded_link]
del files_from_links[discarded_link]
files_to_keep = set()
for files_from_link in files_from_links.values():
for file_from_link in files_from_link:
if file_from_link not in files_to_keep:
files_to_keep.add(file_from_link)
for discarded_file in discarded_files.difference(files_to_keep):
if discarded_file in files_sizes:
del files_sizes[discarded_file]
for missing_file_size in files_to_keep.difference(set(files_sizes.keys())):
p = Path(missing_file_size)
if not p.exists():
raise FileNotFoundError(missing_file_size)
else:
files_sizes[missing_file_size] = p.stat().st_size
print('Re-filled files_sizes for %r' % p)
2020-06-05 22:19:45 +00:00
if (p := Path('i_gdl_ignored.txt')).exists():
ignored_links = set(list(filter(len, p.read_text().splitlines())))
2020-07-20 01:54:26 +00:00
links_no_files = list(filter(lambda a: a not in ignored_links,
links_no_files))
link_statuses = dict(filter(lambda a: a[0] not in ignored_links,
link_statuses.items()))
files_from_links = dict(filter(lambda a: a[0] not in ignored_links,
files_from_links.items()))
checked_links = list(files_from_links.keys()) + links_no_files
checked_links = frozenset(checked_links)
2020-06-05 22:19:45 +00:00
max_expected_jobs_for_extractor = 0
for extractor, links in links_by_extractor.items():
2020-07-20 01:54:26 +00:00
links = [link
for link in links
if
link not in ignored_links
and
(
link not in checked_links
or
not SKIP_INDEXED_FILES
or
(link_statuses.get(link, 0xFF) & RETRY_ERROR_MASK) != 0
)]
if len(links) <= 0:
continue
this_worker_split_after_n_links = CUSTOM_WORKER_SPLITS.get(
extractor, SPLIT_WORKER_AFTER_N_LINKS)
workers = math.ceil(len(links)/this_worker_split_after_n_links)
2020-06-05 22:19:45 +00:00
if workers <= 1 or extractor in FORBIDDEN_WORKER_SPLITS:
workers = 1
max_expected_jobs_for_extractor = max(
max_expected_jobs_for_extractor,
workers
)
worker_by_seq = [list() for _ in range(max_expected_jobs_for_extractor)]
links_to_worker = dict()
for extractor, links in links_by_extractor.items():
2020-07-20 01:54:26 +00:00
links = [link
for link in links
if
link not in ignored_links
and
(
link not in checked_links
or
not SKIP_INDEXED_FILES
or
(link_statuses.get(link, 0xFF) & RETRY_ERROR_MASK) != 0
)]
if len(links) <= 0:
continue
this_worker_split_after_n_links = CUSTOM_WORKER_SPLITS.get(
extractor, SPLIT_WORKER_AFTER_N_LINKS)
workers = math.ceil(len(links)/this_worker_split_after_n_links)
2020-06-05 22:19:45 +00:00
if workers <= 1 or extractor in FORBIDDEN_WORKER_SPLITS:
2020-07-20 01:54:26 +00:00
if extractor in IGNORE_WORKERS:
continue
2020-06-05 22:19:45 +00:00
links_to_worker[extractor] = links
worker_by_seq[0].append(extractor)
else:
digits = math.ceil(math.log10(max(1, workers+1)))
fmt = "%%0%dd" % digits
for worker_no in range(workers):
2020-07-20 01:54:26 +00:00
lowerlimit = (worker_no+0)*this_worker_split_after_n_links
upperlimit = (worker_no+1)*this_worker_split_after_n_links
2020-06-05 22:19:45 +00:00
thisrange = links[lowerlimit:upperlimit]
worker_nm = extractor + ':' + (fmt % (worker_no))
2020-07-20 01:54:26 +00:00
if worker_nm in IGNORE_WORKERS:
continue
2020-06-05 22:19:45 +00:00
links_to_worker[worker_nm] = thisrange
worker_by_seq[worker_no].append(worker_nm)
for w in worker_by_seq:
w.sort()
workers_nicely_grouped = [
2020-07-20 01:54:26 +00:00
worker
2020-06-05 22:19:45 +00:00
for workergroup in worker_by_seq
for worker in workergroup
if worker != ''
]
print(f'{len(links_to_worker)} workers to be spawned')
2020-11-06 00:08:05 +00:00
return (files_from_links,
links_no_files,
files_sizes,
link_statuses,
workers_nicely_grouped,
workers_state_path,
links_to_worker,
)
def prerrun_flatten_subreddits_into_posts(subreddit_data, subreddit_filters):
postsl = [
{'subreddit': subreddit, **post}
for subreddit, srdt in subreddit_data.items()
for post in srdt['links']
]
postsl.sort(key=lambda a: (-a['timestamp'], a['datakey']))
postsd = dict()
for post in postsl:
dk = post['datakey']
sr = post['subreddit']
if subreddit_filters['no_download'][sr]:
continue
if subreddit_filters['no_sfw'][sr] and not post['nsfw']:
continue
if subreddit_filters['no_nsfw'][sr] and post['nsfw']:
continue
if dk not in postsd:
postsd[dk] = post.copy()
postsd[dk]['subreddits'] = list()
postsd[dk]['links'] = list()
del postsd[dk]['subreddit']
del postsd[dk]['link']
del postsd[dk]['domain']
if (sr := post['subreddit']) not in (srs := postsd[dk]['subreddits']):
srs.append(sr)
if (lnk := get_normalized_link(post['link'])) not in (lnks := postsd[dk]['links']):
lnks.append(lnk)
return postsd
def prerrun_posts_re_sort(posts):
for post in sorted(posts.values(), key=lambda a: (-a['timestamp'], a['datakey'])):
post['subreddits'].sort()
dk = post['datakey']
post_links = post['links']
has_changed_any_link = True
while has_changed_any_link:
has_changed_any_link = False
for link in post_links:
if '<!--' in link or '-->' in link:
for linkcopy in search_urls(link):
linkcopy = get_normalized_link(linkcopy)
linkcopy = replace_many(linkcopy, HTML_SPECIAL_CHARS_REPLACE)
if linkcopy not in post_links:
post_links.append(linkcopy)
has_changed_any_link = True
while link in post_links:
post_links.remove(link)
has_changed_any_link = True
break
else:
linkcopy = link
linkcopy = get_normalized_link(linkcopy)
if linkcopy not in post_links:
post_links.append(linkcopy)
has_changed_any_link = True
break
if '?' in link:
linkcopy = link.split('?', 1)[0]
linkcopy = get_normalized_link(linkcopy)
if linkcopy not in post_links:
post_links.append(linkcopy)
has_changed_any_link = True
break
if '#' in link:
linkcopy = link.split('#', 1)[0]
linkcopy = get_normalized_link(linkcopy)
if linkcopy not in post_links:
post_links.append(linkcopy)
has_changed_any_link = True
break
if contains_any(linkcopy, HTML_SPECIAL_CHARS):
linkcopy = replace_many(linkcopy, HTML_SPECIAL_CHARS_REPLACE)
if linkcopy not in post_links:
post_links.append(linkcopy)
has_changed_any_link = True
break
if linkcopy[-1:] in ('/', '#', '?'):
while linkcopy[-1:] in ('/', '#', '?'):
linkcopy = linkcopy[:-1]
linkcopy = get_normalized_link(linkcopy)
if linkcopy not in post_links:
post_links.append(linkcopy)
has_changed_any_link = True
if link.strip() == '':
while link in post_links:
post_links.remove(link)
has_changed_any_link = True
break
if link.startswith('/'):
while link in post_links:
post_links.remove(link)
has_changed_any_link = True
break
if link.startswith('#'):
while link in post_links:
post_links.remove(link)
has_changed_any_link = True
break
if link.startswith('mailto'):
while link in post_links:
post_links.remove(link)
has_changed_any_link = True
break
if (proto := (tpl := link.split(':', 1))[0]).lower() in ('http', 'https') and proto not in ('http', 'https'):
lst = list(tpl)
lst[0] = lst[0].lower()
linkcopy = ':'.join(lst)
post_links.remove(link)
if linkcopy not in post_links:
post_links.append(linkcopy)
has_changed_any_link = True
break
post['links'] = list(filter(lambda link: (
not link.startswith('https://preview.redd.it/')
or
(
(('?width=' in link) or ('&width=' in link))
and
(('?format=' in link) or ('&format=' in link))
and
(('?auto=' in link) or ('&auto=' in link))
and
(('?s=' in link) or ('&s=' in link))
)
), post['links']))
post['links'].sort()
def main():
(files_from_links,
links_no_files,
files_sizes,
link_statuses,
workers_nicely_grouped,
workers_state_path,
links_to_worker,
) = prerrun()
2020-06-05 22:19:45 +00:00
configure_gdl()
2020-07-20 01:54:26 +00:00
def save_ending_files():
nonlocal links_no_files
links_no_files2 = list(map(lambda a: a[0],
filter(lambda a: len(a[1]) <= 0 and a[0] not in links_no_files,
files_from_links.items()))) + links_no_files
files_from_links2 = dict(
filter(lambda a: len(a[1]) > 0,
files_from_links.items()))
links_no_files2_sorted = sorted(links_no_files2)
links_for_files = dict()
for link, files in files_from_links2.items():
for file in files:
if file not in links_for_files:
links_for_files[file] = list()
links_for_files[file].append(link)
del file
del link
del files
os.sync()
Path('i_gdl_lnf.json').write_text(
json.dumps(links_no_files2_sorted, indent=1))
Path('i_gdl_ffl.json').write_text(json.dumps(
files_from_links2, indent=1, sort_keys=True))
Path('i_gdl_lff.json').write_text(json.dumps(
links_for_files, indent=1, sort_keys=True))
Path('i_gdl_fsz.json').write_text(
json.dumps(files_sizes, indent=1, sort_keys=True))
Path('i_gdl_spl.json').write_text(json.dumps(
link_statuses, indent=1, sort_keys=True))
os.sync()
save_ending_files()
2020-07-20 01:54:26 +00:00
gallery_dl.output.select = lambda: ColoredLineOutput(False)
totalfiles = 0
2020-06-05 22:19:45 +00:00
thread_ids = workers_nicely_grouped.copy()
for line, thread_id in enumerate(thread_ids):
workers_state_path.joinpath(thread_id+'=line').write_text(str(line))
linkcount = len(links_to_worker[thread_id])
2020-07-20 01:54:26 +00:00
workers_state_path.joinpath(thread_id).write_text(
f'waiting:{linkcount}:{linkcount}:0:0')
do_fancy_multithreading_panel = False
2020-06-05 22:19:45 +00:00
thread_id_count = len(thread_ids)
2020-07-20 01:54:26 +00:00
if DEBUG_WORKER is not None:
print(f'Will debug {repr(DEBUG_WORKER)}.')
thread_id = DEBUG_WORKER
links_list = links_to_worker[DEBUG_WORKER]
download_link_list(
links_list,
thread_id,
None,
f'Debugging {repr(DEBUG_WORKER)}...',
workers_state_path.joinpath(thread_id),
)
return
if links_to_worker:
with PoolExecutor(min(MAX_WORKERS, thread_id_count)) as pe:
if do_fancy_multithreading_panel:
print(f'\033[2J', end='', flush=True)
print(f'\033[0;0H', end='', flush=True)
print('Downloading...', flush=True)
if do_fancy_multithreading_panel:
print(f'\033[0;0H', end='', flush=True)
largest_tid_size = max(map(len, thread_ids))
line2tid = dict()
def done_callback_generator(line):
2020-06-05 22:19:45 +00:00
nonlocal totalfiles
2020-07-20 01:54:26 +00:00
def terminate_process_pool():
os.system('sync')
os.system("bash -c \"ps -aux | grep './redditgetter.py' | grep -v grep | sed -e 's/ */ /g' | cut -d' ' -f2 | xargs -r -- kill -15\"")
sys.exit(0xFF)
def done_callback(job):
nonlocal totalfiles
thread_id = line2tid[line]
links_list = links_to_worker[thread_id]
try:
workers_state_path.joinpath(thread_id).write_text(
f'finished:{len(links_list)}:0:0:0')
print(clrlib.stylize(
f'Received job #{line}: {thread_id}', [
clrlib.fg('white'),
clrlib.bg('green'),
clrlib.attr('bold'),
]
))
downloaded_links = list()
totalbytes = 0
thisfiles = 0
true = True
downloaded_links = job.result()
for link, files in downloaded_links:
if true:
statusdir = get_path_for_caching(
link, Path('i_gdl_s'))
statusdir.mkdir(parents=True, exist_ok=True)
statusfile = statusdir.joinpath('_gdl_status.json')
statuses = dict()
if statusfile.exists():
statuses = json.loads(statusfile.read_text())
link_statuses[link] = statuses.get(link, 0xFF)
if link not in files_from_links:
files_from_links[link] = list()
lenfiles = len(files)
totalfiles += lenfiles
for file in files:
filepath = Path(file)
thisfiles += 1
if filepath.exists():
files_from_links[link].append(file)
st_size = filepath.stat().st_size
files_sizes[file] = st_size
totalbytes += st_size
workers_state_path.joinpath(thread_id).write_text(
f'finished:{len(links_list)}:0:{totalbytes}:{thisfiles}')
save_ending_files()
except:
sio = StringIO()
traceback.print_exc(file=sio)
excTxt = sio.getvalue()
try:
workers_state_path.joinpath(thread_id).write_text(
f'failed:{len(links_list)}:0:0:0')
except:
pass
try:
workers_state_path.joinpath(thread_id+'=exc').write_text(excTxt)
except:
pass
try:
pe.shutdown(wait=False)
except:
pass
print(excTxt)
terminate_process_pool()
return
return done_callback
for line, thread_id in enumerate(thread_ids):
line2tid[line] = thread_id
2020-06-05 22:19:45 +00:00
links_list = links_to_worker[thread_id]
2020-07-20 01:54:26 +00:00
workers_state_path.joinpath(thread_id).write_text(
f'enqueued:{len(links_list)}:{len(links_list)}:0:0')
print(clrlib.stylize(f'Starting job #{line}: {thread_id}', [
clrlib.fg('white'),
clrlib.bg('light_red'),
clrlib.attr('bold'),
]))
jobstardedmsg = clrlib.stylize(f'Starting job #{line}: {thread_id}', [
clrlib.fg('black'),
clrlib.bg('light_yellow'),
clrlib.attr('bold'),
])
thread_id_nmsz = len(thread_id)
thread_id_display = thread_id + ' ' * (largest_tid_size - thread_id_nmsz)
job = pe.submit(
download_link_list,
links_list,
thread_id_display,
line+3 if do_fancy_multithreading_panel else None,
jobstardedmsg,
workers_state_path.joinpath(thread_id),
)
job.add_done_callback(done_callback_generator(line))
save_ending_files()
2020-06-05 22:19:45 +00:00
if (p := Path('latest_image_download.txt')).exists():
p.unlink()
if workers_state_path.exists():
for p in workers_state_path.glob('*'):
p.unlink()
shutil.rmtree(workers_state_path)
print(f'Downloaded {totalfiles} files')
def download_link_list(links: List[str],
thread_id: str,
line: Optional[int] = None,
2020-06-05 22:19:45 +00:00
job_started_msg: Optional[str] = None,
thread_state_path: Optional[Path] = None,
) -> List[Tuple[str, List[str]]]:
2020-06-05 22:19:45 +00:00
'''Downloads a link list inside a ProcessPoolExecutor'''
2020-07-20 01:54:26 +00:00
if STOP_JOBS_FLAG_PATH.exists():
raise InterruptedError(STOP_JOBS_FLAG_PATH)
2020-06-05 22:19:45 +00:00
if job_started_msg is not None:
print(job_started_msg)
has_its_own_line = line is not None
2020-06-05 22:19:45 +00:00
link_count = len(links)
remaining_links = link_count
configure_gdl()
2020-06-05 22:19:45 +00:00
if thread_state_path is not None:
2020-07-20 01:54:26 +00:00
thread_state_path.write_text(
f'running:{link_count}:{remaining_links}:0:0')
def get_printer():
return ColoredLineOutput(
has_its_own_line,
prefix=(f'\033[{line};0H' if has_its_own_line else '') +
2020-07-20 01:54:26 +00:00
clrlib.stylize('%9d' % remaining_links, [clrlib.fg('light_cyan')]) +
clrlib.stylize('@', [clrlib.fg('light_red')]) +
clrlib.stylize(thread_id, [clrlib.fg('yellow')]) +
2020-07-20 01:54:26 +00:00
clrlib.stylize('= ', [clrlib.fg('dark_gray')]),
suffix=('\033[K\033[0;0H' if has_its_own_line else ''),
2020-07-20 01:54:26 +00:00
prefixsz=len(('%9d' % 0)+' '+thread_id),
suffixsz=0,
2020-06-05 22:19:45 +00:00
write_successes_to=Path('latest_image_download.txt'),
)
gallery_dl.output.select = get_printer
result = list()
2020-06-05 22:19:45 +00:00
totalbytes = 0
totalfiles = 0
try:
for link in links:
2020-06-05 22:19:45 +00:00
scrubbing = True
cachedir = get_path_for_caching(link, Path('i_gdl_c'))
2020-07-20 01:54:26 +00:00
statusdir = get_path_for_caching(link, Path('i_gdl_s'))
cachedir.mkdir(parents=True, exist_ok=True)
2020-07-20 01:54:26 +00:00
statusdir.mkdir(parents=True, exist_ok=True)
metafile = cachedir.joinpath('_gdl_meta.json')
2020-07-20 01:54:26 +00:00
statusfile = statusdir.joinpath('_gdl_status.json')
meta = dict()
2020-07-20 01:54:26 +00:00
statuses = dict()
2020-06-05 22:19:45 +00:00
link_already_downloaded = False
if metafile.exists():
2020-07-20 01:54:26 +00:00
try:
meta = json.loads(metafile.read_text())
except json.JSONDecodeError:
pass
if statusfile.exists():
try:
statuses = json.loads(statusfile.read_text())
except json.JSONDecodeError:
pass
if link in meta and link in statuses:
2020-06-05 22:19:45 +00:00
link_already_downloaded = True
2020-07-20 01:54:26 +00:00
rc = statuses.get(link, 0xFF)
if rc == 0:
for fl in meta[link]:
pth = Path(fl)
try:
if not pth.exists():
link_already_downloaded = False
break
except OSError:
link_already_downloaded = False
break
if len(meta[link]) == 0 and REDOWNLOAD_EMPTIES:
link_already_downloaded = False
if (rc & RETRY_ERROR_MASK) != 0:
link_already_downloaded = False
if not link_already_downloaded or REDOWNLOAD:
2020-06-05 22:19:45 +00:00
scrubbing = False
if thread_state_path is not None:
2020-07-20 01:54:26 +00:00
thread_state_path.write_text(
f'running:{link_count}:{remaining_links}:{totalbytes}:{totalfiles}:{link}')
job = DownloadJobWithCallSaverPostProcessor(link)
job.out = get_printer()
2020-06-05 22:19:45 +00:00
job.out.message(link, clrlib.fg('light_magenta'))
2020-07-20 01:54:26 +00:00
rc = job.run()
os.sync()
# print('FINAL', job.cspp.calls)
# raise Exception(job.cspp.calls)
# files = job.cspp.calls['run_final'].copy() # Only brings the last element
files = job.cspp.calls['prepare'].copy()
files = list(filter(len, files))
has_changed = True
while has_changed:
has_changed = False
for seq, fl in enumerate(files):
if not (pth := Path(fl)).exists():
candidates = sorted(list(filter(
lambda p: (p.name.startswith(pth.name)
and
p.suffix != '.part'
and
p.suffix != '.json'),
pth.parent.iterdir())),
key=lambda p: len(p.name)
)
if len(candidates) > 0:
files[seq] = str(candidates[0])
has_changed = True
break
else:
rc |= 256
# raise Exception(pth.name, candidates, files)
del has_changed
meta[link] = files
2020-07-20 01:54:26 +00:00
statuses[link] = rc
metafile.write_text(json.dumps(meta, indent=1))
2020-07-20 01:54:26 +00:00
statusfile.write_text(json.dumps(statuses, indent=1))
os.sync()
2020-06-05 22:19:45 +00:00
for fl in meta[link]:
2020-07-20 01:54:26 +00:00
code = statuses[link]
2020-06-05 22:19:45 +00:00
pth = Path(fl)
if not pth.exists():
2020-07-20 01:54:26 +00:00
if code != 0:
continue
else:
raise FileNotFoundError((link,
link_already_downloaded,
meta[link]))
else:
totalfiles += 1
totalbytes += pth.stat().st_size
result.append((link, meta[link]))
remaining_links -= 1
2020-06-05 22:19:45 +00:00
if thread_state_path is not None:
scrubbing_running = 'scrubbing' if scrubbing else 'running'
2020-07-20 01:54:26 +00:00
thread_state_path.write_text(
f'{scrubbing_running}:{link_count}:{remaining_links}:{totalbytes}:{totalfiles}:{link}')
if STOP_JOBS_FLAG_PATH.exists():
raise InterruptedError(STOP_JOBS_FLAG_PATH)
finally:
print((f'\033[{line};0H' if has_its_own_line else '') +
2020-06-05 22:19:45 +00:00
clrlib.stylize(thread_id.strip(), [clrlib.fg('yellow'), clrlib.attr('bold')]) +
clrlib.stylize('#', [clrlib.fg('light_red')]) +
clrlib.stylize('Done', [clrlib.fg('light_green')]) +
('\033[K' if has_its_own_line else '')
)
return result
def configure_gdl():
2020-06-05 22:19:45 +00:00
'''Configures Gallery-DL for usage.'''
parser = gallery_dl.option.build_parser()
args = parser.parse_args([
2020-07-20 01:54:26 +00:00
*([] if USE_FIREFOX_COOKIES else ['--cookies=i_gdl/.cookies']),
'--dest=i_gdl',
'--write-metadata',
2020-07-20 01:54:26 +00:00
# '--write-tags',
2020-06-05 22:19:45 +00:00
# '--write-log=i_gdl_log.txt',
2020-07-20 01:54:26 +00:00
'--write-unsupported=i_gdl_unsupported.txt',
# '--quiet',
*(['--verbose'] if DEBUG_WORKER else []),
2020-11-06 00:08:05 +00:00
'--retries=1',
2020-07-20 01:54:26 +00:00
# '--retries=7',
2020-06-05 22:19:45 +00:00
# '--limit-rate=1500k',
])
gallery_dl.output.initialize_logging(args.loglevel)
# configuration
if args.load_config:
gallery_dl.config.load()
if args.cfgfiles:
gallery_dl.config.load(args.cfgfiles, strict=True)
if args.yamlfiles:
gallery_dl.config.load(args.yamlfiles, strict=True, fmt="yaml")
if args.postprocessors:
gallery_dl.config.set((), "postprocessors", args.postprocessors)
if args.abort:
gallery_dl.config.set((), "skip", "abort:" + str(args.abort))
for opts in args.options:
gallery_dl.config.set(*opts)
# loglevels
gallery_dl.output.configure_logging(args.loglevel)
gallery_dl.output.select = ColoredLineOutput
2020-07-20 01:54:26 +00:00
gallery_dl.util.PathFormatOriginal = gdl_pf
gallery_dl.util.PathFormat = OverriddenPathFormat
class DownloadJobWithCallSaverPostProcessor(gallery_dl.job.DownloadJob):
def __init__(self, url, parent=None):
super().__init__(url, parent)
2020-07-20 01:54:26 +00:00
self.cspp = CallSaverPostProcessor(
self) if parent is None else parent.cspp
def initialize(self, kwdict=None):
2021-01-23 06:15:31 +00:00
if not isinstance(self.hooks, tuple):
print('ADDED!!')
self.hooks['prepare'].append(self.cspp.prepare)
class ColoredLineOutput(gallery_dl.output.TerminalOutput):
2020-06-05 22:19:45 +00:00
def __init__(self, sameline=False, prefix="", suffix="", prefixsz=0, suffixsz=0, write_successes_to=None):
super().__init__()
self.sameline = sameline
self.eol = '\r' if sameline else '\n'
self.prefix = prefix
self.suffix = suffix
self.prefixsz = prefixsz
self.suffixsz = suffixsz
2020-06-05 22:19:45 +00:00
self.write_successes_to = write_successes_to
self._termsize_update()
def start(self, path):
2020-06-05 22:19:45 +00:00
self.message(path,
clrlib.fg("light_yellow"),
)
def skip(self, path):
2020-06-05 22:19:45 +00:00
self.message(path,
clrlib.attr('dim'),
)
def success(self, path, tries):
2020-06-05 22:19:45 +00:00
self.message(path,
clrlib.attr('bold'),
clrlib.fg('light_green'),
)
if self.write_successes_to is not None:
self.write_successes_to.write_text(path)
def message(self, txt: str, *attrs: List[str], do_print: bool = True) -> str:
"""Prints a message with given formatters"""
clrtxt = clrlib.stylize(self.shorten(txt), attrs)
fmtd = f"{self.prefix}{clrtxt}{self.suffix}"
if do_print:
print(fmtd, flush=True, end=self.eol)
return fmtd
def shorten(self, txt):
2020-06-05 22:19:45 +00:00
self._termsize_update()
self.width = self.termsize - self.prefixsz - self.suffixsz - 1
return super().shorten(txt)
2020-06-05 22:19:45 +00:00
def _termsize_update(self):
self.termsize = shutil.get_terminal_size().columns
2020-07-20 01:54:26 +00:00
class OverriddenPathFormat(gdl_pf):
def __init__(self, extractor):
super().__init__(extractor)
self.clean_path = FixFileNameFormatterWrapper(self.clean_path)
class CallSaverPostProcessor(gallery_dl.postprocessor.common.PostProcessor):
def __init__(self, job):
super().__init__(job)
self.calls = dict(
prepare=list(),
run=list(),
run_metadata=list(),
run_after=list(),
run_final=list(),
)
2020-07-20 01:54:26 +00:00
def prepare(self, pathfmt: gallery_dl.util.PathFormat):
"""Update file paths, etc."""
2020-07-20 01:54:26 +00:00
directory_formatters = pathfmt.directory_formatters
filename_formatter = pathfmt.filename_formatter
clean_segment = pathfmt.clean_segment
clean_path = pathfmt.clean_path
pathfmt.directory_formatters = None
pathfmt.filename_formatter = None
pathfmt.clean_segment = None
pathfmt.clean_path = None
cloned_pathfmt: gallery_dl.util.PathFormat = pickle.loads(pickle.dumps(pathfmt))
pathfmt.directory_formatters = directory_formatters
pathfmt.filename_formatter = filename_formatter
pathfmt.clean_segment = clean_segment
pathfmt.clean_path = clean_path
cloned_pathfmt.directory_formatters = directory_formatters
cloned_pathfmt.filename_formatter = filename_formatter
cloned_pathfmt.clean_segment = clean_segment
cloned_pathfmt.clean_path = clean_path
cloned_pathfmt.build_path()
# print(cloned_pathfmt.path)
# print(cloned_pathfmt.filename)
# print(cloned_pathfmt.kwdict)
# print(cloned_pathfmt)
self.calls['prepare'].append(cloned_pathfmt.path)
2021-01-23 06:15:31 +00:00
return pathfmt
2020-07-20 01:54:26 +00:00
def run(self, pathfmt: gallery_dl.util.PathFormat):
"""Execute the postprocessor for a file"""
2020-07-20 01:54:26 +00:00
self.calls['run'].append(pathfmt.path)
2020-07-20 01:54:26 +00:00
def run_metadata(self, pathfmt: gallery_dl.util.PathFormat):
"""Execute the postprocessor for a file"""
2020-07-20 01:54:26 +00:00
self.calls['run_metadata'].append(pathfmt.path)
2020-07-20 01:54:26 +00:00
def run_after(self, pathfmt: gallery_dl.util.PathFormat):
"""Execute postprocessor after moving a file to its target location"""
2020-07-20 01:54:26 +00:00
self.calls['run_after'].append(pathfmt.path)
2020-07-20 01:54:26 +00:00
def run_final(self, pathfmt: gallery_dl.util.PathFormat, status: int):
"""Postprocessor finalization after all files have been downloaded"""
self.calls['run_final'].append((pathfmt.path, status))
2020-07-20 01:54:26 +00:00
class FixFileNameFormatterWrapper:
"""Wraps file name formatter for ensuring a valid file name length"""
def __init__(self, formatter: gallery_dl.util.Formatter):
self.formatter = formatter
def __call__(self, *args, **kwargs) -> str:
path = self.formatter(*args, **kwargs)
parts = list(map(fix_filename_ending_extension,
map(fix_filename_length,
map(fix_filename_ending_extension,
Path(path).parts))))
return str(Path(*parts))
def fix_filename_length(filename: str) -> str:
"""Ensures a segment has a valid file name length"""
if len(filename.encode()) > 240:
extension = Path(filename).suffix
extension_bytes_length = len(extension.encode())
stem_bytes = Path(filename).stem.encode()
fixed_stem_bytes = stem_bytes[:240-extension_bytes_length]
fixed_stem = fixed_stem_bytes.decode(errors="ignore")
return fixed_stem + extension
return filename
def fix_filename_ending_extension(filename: str) -> str:
if (fp := Path(filename)).stem[-1:] in ('.', ' '):
return str(fp.parent.joinpath(f"{fp.stem.rstrip('. ')}{fp.suffix}"))
return filename
if __name__ == "__main__":
main()