reddit-image-wall-rs/gallery-dl-scriptable

358 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import argparse
import json
import logging
import os
import pickle
import shutil
import sys
from pathlib import Path
from typing import Any, Dict, List, Tuple
import colored as clrlib
import gallery_dl
import gallery_dl.config
import gallery_dl.extractor
import gallery_dl.job
import gallery_dl.path
import gallery_dl.postprocessor.common
import gallery_dl.util
if os.environ.get('FORCE_COLOR', '').strip() == '':
os.environ['FORCE_COLOR'] = '1'
PathFormatGalleryDl: gallery_dl.path.PathFormat = getattr(
gallery_dl.util, 'PathFormatOriginal', gallery_dl.path.PathFormat)
def parse_args(args: List[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description='Uses Gallery-DL to download a single link.')
parser.add_argument('runner_identification', type=str,
help='The identification for the logs')
parser.add_argument('remaining_links', type=int,
help='The number of links remaining')
parser.add_argument('destination_folder', type=Path,
help='The destination folder for downloaded data')
parser.add_argument('desired_url', type=str,
help='The URL to be fetched')
parser.add_argument('latest_image_txt', type=Path, default=Path('latest_image_download.txt'),
nargs='?', help='')
parser.add_argument('--verbose', default=False, const=True, action='store_const',
help='Be verbose on logs')
return parser.parse_args(args)
def get_printer(has_its_own_line: int, remaining_links: int, thread_id: str, latest_success_fetch_store: Path):
return ColoredLineOutput(
has_its_own_line,
prefix='' +
clrlib.stylize('%9d' % remaining_links, [clrlib.fg('light_cyan')]) +
clrlib.stylize('@', [clrlib.fg('light_red')]) +
clrlib.stylize(thread_id, [clrlib.fg('yellow')]) +
clrlib.stylize('= ', [clrlib.fg('dark_gray')]),
suffix=('\033[K\033[0;0H' if has_its_own_line else ''),
prefixsz=len(('%9d' % 0)+' '+thread_id),
suffixsz=0,
write_successes_to=latest_success_fetch_store,
)
class SaverLoggingHandler(logging.Handler):
def __init__(self, level=logging.NOTSET):
super().__init__(level)
self.messages: List[Tuple[int, str, str]] = list()
def emit(self, record):
try:
msg = self.format(record)
self.messages.append((record.levelno, record.levelname, msg))
self.flush()
except RecursionError:
raise
except Exception:
self.handleError(record)
def main(args: List[str] = sys.argv[1:]) -> int:
parsed = parse_args(args)
log_handler = SaverLoggingHandler(level=logging.DEBUG)
configure_gdl(parsed.destination_folder, parsed.verbose, log_handler)
job = DownloadJobWithCallSaverPostProcessor(parsed.desired_url)
job.out = get_printer(0, parsed.remaining_links,
parsed.runner_identification, parsed.latest_image_txt)
job.out.message(parsed.desired_url, clrlib.fg('light_magenta'))
rc = job.run()
files = job.cspp.calls['prepare'].copy()
files = list(filter(len, files))
has_changed = True
while has_changed:
has_changed = False
for seq, fl in enumerate(files):
if not (pth := Path(fl)).exists():
candidates = sorted(list(filter(
lambda p: (p.name.startswith(pth.name)
and
p.suffix != '.part'
and
p.suffix != '.json'),
pth.parent.iterdir())),
key=lambda p: len(p.name)
)
if len(candidates) > 0:
files[seq] = str(candidates[0])
has_changed = True
break
else:
rc |= 256
del has_changed
os.sync()
sys.stdout.write(json.dumps(dict(
status=rc,
link_requested=parsed.desired_url,
link_effective=job.extractor.url,
logs=log_handler.messages,
files=files,
), indent=4))
return 0
def configure_gdl(destination_folder: Path, verbose: bool, log_handler: logging.Handler = None):
'''Configures Gallery-DL for usage.'''
parser = gallery_dl.option.build_parser()
args = parser.parse_args([
f'--dest={str(destination_folder)}',
'--write-metadata',
# '--write-tags',
# '--write-log=i_gdl_log.txt',
# '--write-unsupported=i_gdl_unsupported.txt',
# '--quiet',
*(['--verbose'] if verbose else []),
'--retries=1',
# '--retries=7',
# '--limit-rate=1500k',
])
gallery_dl.output.initialize_logging(args.loglevel)
# configuration
if args.load_config:
gallery_dl.config.load()
if args.cfgfiles:
gallery_dl.config.load(args.cfgfiles, strict=True)
if args.yamlfiles:
gallery_dl.config.load(args.yamlfiles, strict=True, fmt="yaml")
if args.postprocessors:
gallery_dl.config.set((), "postprocessors", args.postprocessors)
if args.abort:
gallery_dl.config.set((), "skip", "abort:" + str(args.abort))
for opts in args.options:
gallery_dl.config.set(*opts)
# loglevels
gallery_dl.output.configure_logging(args.loglevel)
gallery_dl.output.select = ColoredLineOutput
gallery_dl.path.PathFormatOriginal = PathFormatGalleryDl
gallery_dl.path.PathFormat = OverriddenPathFormat
if log_handler is not None:
root_logger = logging.getLogger()
root_logger.addHandler(log_handler)
root_logger.setLevel(logging.DEBUG)
class DownloadJobWithCallSaverPostProcessor(gallery_dl.job.DownloadJob):
def __init__(self, url, parent=None):
super().__init__(url, parent)
self.cspp = CallSaverPostProcessor(
self) if parent is None else parent.cspp
def initialize(self, kwdict=None):
super().initialize(kwdict)
if not isinstance(self.hooks, tuple):
self.hooks['prepare'].append(self.cspp.prepare)
class ColoredLineOutput(gallery_dl.output.TerminalOutput):
def __init__(self, sameline=False, prefix="", suffix="", prefixsz=0, suffixsz=0, write_successes_to=None):
super().__init__()
self.sameline = sameline
self.eol = '\r' if sameline else '\n'
self.prefix = prefix
self.suffix = suffix
self.prefixsz = prefixsz
self.suffixsz = suffixsz
self.write_successes_to = write_successes_to
self._termsize_update()
def start(self, path):
self.message(path,
clrlib.fg("light_yellow"),
)
def skip(self, path):
self.message(path,
clrlib.attr('dim'),
)
def success(self, path, tries=None):
self.message(path,
clrlib.attr('bold'),
clrlib.fg('light_green'),
)
if self.write_successes_to is not None:
self.write_successes_to.write_text(path)
def progress(self, bytes_total, bytes_downloaded, bytes_per_second):
bdl = gallery_dl.util.format_value(bytes_downloaded)
bps = gallery_dl.util.format_value(bytes_per_second)
if bytes_total is None:
self.message("{:>7}B {:>7}B/s ".format(bdl, bps),
clrlib.fg('dark_gray'),
)
else:
self.message("{:>3}% {:>7}B {:>7}B/s ".format(
bytes_downloaded * 100 // bytes_total, bdl, bps),
clrlib.fg('dark_gray'),
)
def message(self, txt: str, *attrs: List[str], do_print: bool = True) -> str:
"""Prints a message with given formatters"""
clrtxt = clrlib.stylize(self.shorten(txt), attrs)
fmtd = f"{self.prefix}{clrtxt}{self.suffix}"
if do_print:
print(fmtd, file=sys.stderr, flush=True, end=self.eol)
return fmtd
def shorten(self, txt):
self._termsize_update()
self.width = self.termsize - self.prefixsz - self.suffixsz - 1
return super().shorten(txt)
def _termsize_update(self):
self.termsize = shutil.get_terminal_size().columns
class OverriddenPathFormat(PathFormatGalleryDl):
def __init__(self, extractor):
super().__init__(extractor)
self.clean_path = FixFileNameFormatterWrapper(self.clean_path)
class CallSaverPostProcessor(gallery_dl.postprocessor.common.PostProcessor):
def __init__(self, job):
super().__init__(job)
self.calls = dict(
prepare=list(),
run=list(),
run_metadata=list(),
run_after=list(),
run_final=list(),
)
def prepare(self, pathfmt: gallery_dl.path.PathFormat):
"""Update file paths, etc."""
directory_formatters = pathfmt.directory_formatters
filename_formatter = pathfmt.filename_formatter
clean_segment = pathfmt.clean_segment
clean_path = pathfmt.clean_path
kwdict_fallback = pathfmt.kwdict.get('_fallback', None)
# pp = pprint.PrettyPrinter()
# pp.pprint(pathfmt)
# pp.pprint(pathfmt.__dict__)
pathfmt.directory_formatters = None
pathfmt.filename_formatter = None
pathfmt.clean_segment = None
pathfmt.clean_path = None
if kwdict_fallback is not None:
pathfmt.kwdict['_fallback'] = None
cloned_pathfmt: gallery_dl.path.PathFormat = pickle.loads(
pickle.dumps(pathfmt))
pathfmt.directory_formatters = directory_formatters
pathfmt.filename_formatter = filename_formatter
pathfmt.clean_segment = clean_segment
pathfmt.clean_path = clean_path
if kwdict_fallback is not None:
pathfmt.kwdict['_fallback'] = kwdict_fallback
cloned_pathfmt.directory_formatters = directory_formatters
cloned_pathfmt.filename_formatter = filename_formatter
cloned_pathfmt.clean_segment = clean_segment
cloned_pathfmt.clean_path = clean_path
if kwdict_fallback is not None:
cloned_pathfmt.kwdict['_fallback'] = kwdict_fallback
cloned_pathfmt.build_path()
# print(cloned_pathfmt.path)
# print(cloned_pathfmt.filename)
# print(cloned_pathfmt.kwdict)
# print(cloned_pathfmt)
self.calls['prepare'].append(cloned_pathfmt.path)
return pathfmt
def run(self, pathfmt: gallery_dl.path.PathFormat):
"""Execute the postprocessor for a file"""
self.calls['run'].append(pathfmt.path)
def run_metadata(self, pathfmt: gallery_dl.path.PathFormat):
"""Execute the postprocessor for a file"""
self.calls['run_metadata'].append(pathfmt.path)
def run_after(self, pathfmt: gallery_dl.path.PathFormat):
"""Execute postprocessor after moving a file to its target location"""
self.calls['run_after'].append(pathfmt.path)
def run_final(self, pathfmt: gallery_dl.path.PathFormat, status: int):
"""Postprocessor finalization after all files have been downloaded"""
self.calls['run_final'].append((pathfmt.path, status))
class FixFileNameFormatterWrapper:
"""Wraps file name formatter for ensuring a valid file name length"""
def __init__(self, formatter: gallery_dl.formatter.StringFormatter):
self.formatter = formatter
def __call__(self, *args, **kwargs) -> str:
path = self.formatter(*args, **kwargs)
ews = len(path) > 0 and path[-1] == os.sep
parts = list(map(fix_filename_ending_extension,
map(fix_filename_length,
map(fix_filename_ending_extension,
Path(path).parts))))
return str(Path(*parts)) + (os.sep if ews else '')
def format_map(self, kwdict):
return self(kwdict=kwdict)
def fix_filename_length(filename: str) -> str:
"""Ensures a segment has a valid file name length"""
if len(filename.encode()) > 240:
extension = Path(filename).suffix
extension_bytes_length = len(extension.encode())
stem_bytes = Path(filename).stem.encode()
fixed_stem_bytes = stem_bytes[:240-extension_bytes_length]
fixed_stem = fixed_stem_bytes.decode(errors="ignore")
return fixed_stem + extension
return filename
def fix_filename_ending_extension(filename: str) -> str:
if (fp := Path(filename)).stem[-1:] in ('.', ' '):
return str(fp.parent.joinpath(f"{fp.stem.rstrip('. ')}{fp.suffix}"))
return filename
if __name__ == "__main__":
kwargs: Dict[str, Any] = dict()
sys.exit(main(**kwargs))