#!/usr/bin/env python3 # -*- encoding: utf-8 -*- import argparse import json import logging import os import pickle import shutil import sys from pathlib import Path from typing import Any, Dict, List, Tuple import colored as clrlib import gallery_dl import gallery_dl.config import gallery_dl.extractor import gallery_dl.job import gallery_dl.path import gallery_dl.postprocessor.common import gallery_dl.util if os.environ.get('FORCE_COLOR', '').strip() == '': os.environ['FORCE_COLOR'] = '1' PathFormatGalleryDl: gallery_dl.path.PathFormat = getattr( gallery_dl.util, 'PathFormatOriginal', gallery_dl.path.PathFormat) def parse_args(args: List[str]) -> argparse.Namespace: parser = argparse.ArgumentParser( description='Uses Gallery-DL to download a single link.') parser.add_argument('runner_identification', type=str, help='The identification for the logs') parser.add_argument('remaining_links', type=int, help='The number of links remaining') parser.add_argument('destination_folder', type=Path, help='The destination folder for downloaded data') parser.add_argument('desired_url', type=str, help='The URL to be fetched') parser.add_argument('latest_image_txt', type=Path, default=Path('latest_image_download.txt'), nargs='?', help='') parser.add_argument('--verbose', default=False, const=True, action='store_const', help='Be verbose on logs') return parser.parse_args(args) def get_printer(has_its_own_line: int, remaining_links: int, thread_id: str, latest_success_fetch_store: Path): return ColoredLineOutput( has_its_own_line, prefix='' + clrlib.stylize('%9d' % remaining_links, [clrlib.fg('light_cyan')]) + clrlib.stylize('@', [clrlib.fg('light_red')]) + clrlib.stylize(thread_id, [clrlib.fg('yellow')]) + clrlib.stylize('= ', [clrlib.fg('dark_gray')]), suffix=('\033[K\033[0;0H' if has_its_own_line else ''), prefixsz=len(('%9d' % 0)+' '+thread_id), suffixsz=0, write_successes_to=latest_success_fetch_store, ) class SaverLoggingHandler(logging.Handler): def __init__(self, level=logging.NOTSET): super().__init__(level) self.messages: List[Tuple[int, str, str]] = list() def emit(self, record): try: msg = self.format(record) self.messages.append((record.levelno, record.levelname, msg)) self.flush() except RecursionError: raise except Exception: self.handleError(record) def main(args: List[str] = sys.argv[1:]) -> int: parsed = parse_args(args) log_handler = SaverLoggingHandler(level=logging.DEBUG) configure_gdl(parsed.destination_folder, parsed.verbose, log_handler) job = DownloadJobWithCallSaverPostProcessor(parsed.desired_url) job.out = get_printer(0, parsed.remaining_links, parsed.runner_identification, parsed.latest_image_txt) job.out.message(parsed.desired_url, clrlib.fg('light_magenta')) rc = job.run() files = job.cspp.calls['prepare'].copy() files = list(filter(len, files)) has_changed = True while has_changed: has_changed = False for seq, fl in enumerate(files): if not (pth := Path(fl)).exists(): candidates = sorted(list(filter( lambda p: (p.name.startswith(pth.name) and p.suffix != '.part' and p.suffix != '.json'), pth.parent.iterdir())), key=lambda p: len(p.name) ) if len(candidates) > 0: files[seq] = str(candidates[0]) has_changed = True break else: rc |= 256 del has_changed os.sync() sys.stdout.write(json.dumps(dict( status=rc, link_requested=parsed.desired_url, link_effective=job.extractor.url, logs=log_handler.messages, files=files, ), indent=4)) return 0 def configure_gdl(destination_folder: Path, verbose: bool, log_handler: logging.Handler = None): '''Configures Gallery-DL for usage.''' parser = gallery_dl.option.build_parser() args = parser.parse_args([ f'--dest={str(destination_folder)}', '--write-metadata', # '--write-tags', # '--write-log=i_gdl_log.txt', # '--write-unsupported=i_gdl_unsupported.txt', # '--quiet', *(['--verbose'] if verbose else []), '--retries=1', # '--retries=7', # '--limit-rate=1500k', ]) gallery_dl.output.initialize_logging(args.loglevel) # configuration if args.load_config: gallery_dl.config.load() if args.cfgfiles: gallery_dl.config.load(args.cfgfiles, strict=True) if args.yamlfiles: gallery_dl.config.load(args.yamlfiles, strict=True, fmt="yaml") if args.postprocessors: gallery_dl.config.set((), "postprocessors", args.postprocessors) if args.abort: gallery_dl.config.set((), "skip", "abort:" + str(args.abort)) for opts in args.options: gallery_dl.config.set(*opts) # loglevels gallery_dl.output.configure_logging(args.loglevel) gallery_dl.output.select = ColoredLineOutput gallery_dl.path.PathFormatOriginal = PathFormatGalleryDl gallery_dl.path.PathFormat = OverriddenPathFormat if log_handler is not None: root_logger = logging.getLogger() root_logger.addHandler(log_handler) root_logger.setLevel(logging.DEBUG) class DownloadJobWithCallSaverPostProcessor(gallery_dl.job.DownloadJob): def __init__(self, url, parent=None): super().__init__(url, parent) self.cspp = CallSaverPostProcessor( self) if parent is None else parent.cspp def initialize(self, kwdict=None): super().initialize(kwdict) if not isinstance(self.hooks, tuple): self.hooks['prepare'].append(self.cspp.prepare) class ColoredLineOutput(gallery_dl.output.TerminalOutput): def __init__(self, sameline=False, prefix="", suffix="", prefixsz=0, suffixsz=0, write_successes_to=None): super().__init__() self.sameline = sameline self.eol = '\r' if sameline else '\n' self.prefix = prefix self.suffix = suffix self.prefixsz = prefixsz self.suffixsz = suffixsz self.write_successes_to = write_successes_to self._termsize_update() def start(self, path): self.message(path, clrlib.fg("light_yellow"), ) def skip(self, path): self.message(path, clrlib.attr('dim'), ) def success(self, path, tries=None): self.message(path, clrlib.attr('bold'), clrlib.fg('light_green'), ) if self.write_successes_to is not None: self.write_successes_to.write_text(path) def progress(self, bytes_total, bytes_downloaded, bytes_per_second): bdl = gallery_dl.util.format_value(bytes_downloaded) bps = gallery_dl.util.format_value(bytes_per_second) if bytes_total is None: self.message("{:>7}B {:>7}B/s ".format(bdl, bps), clrlib.fg('dark_gray'), ) else: self.message("{:>3}% {:>7}B {:>7}B/s ".format( bytes_downloaded * 100 // bytes_total, bdl, bps), clrlib.fg('dark_gray'), ) def message(self, txt: str, *attrs: List[str], do_print: bool = True) -> str: """Prints a message with given formatters""" clrtxt = clrlib.stylize(self.shorten(txt), attrs) fmtd = f"{self.prefix}{clrtxt}{self.suffix}" if do_print: print(fmtd, file=sys.stderr, flush=True, end=self.eol) return fmtd def shorten(self, txt): self._termsize_update() self.width = self.termsize - self.prefixsz - self.suffixsz - 1 return super().shorten(txt) def _termsize_update(self): self.termsize = shutil.get_terminal_size().columns class OverriddenPathFormat(PathFormatGalleryDl): def __init__(self, extractor): super().__init__(extractor) self.clean_path = FixFileNameFormatterWrapper(self.clean_path) class CallSaverPostProcessor(gallery_dl.postprocessor.common.PostProcessor): def __init__(self, job): super().__init__(job) self.calls = dict( prepare=list(), run=list(), run_metadata=list(), run_after=list(), run_final=list(), ) def prepare(self, pathfmt: gallery_dl.path.PathFormat): """Update file paths, etc.""" directory_formatters = pathfmt.directory_formatters filename_formatter = pathfmt.filename_formatter clean_segment = pathfmt.clean_segment clean_path = pathfmt.clean_path kwdict_fallback = pathfmt.kwdict.get('_fallback', None) # pp = pprint.PrettyPrinter() # pp.pprint(pathfmt) # pp.pprint(pathfmt.__dict__) pathfmt.directory_formatters = None pathfmt.filename_formatter = None pathfmt.clean_segment = None pathfmt.clean_path = None if kwdict_fallback is not None: pathfmt.kwdict['_fallback'] = None cloned_pathfmt: gallery_dl.path.PathFormat = pickle.loads( pickle.dumps(pathfmt)) pathfmt.directory_formatters = directory_formatters pathfmt.filename_formatter = filename_formatter pathfmt.clean_segment = clean_segment pathfmt.clean_path = clean_path if kwdict_fallback is not None: pathfmt.kwdict['_fallback'] = kwdict_fallback cloned_pathfmt.directory_formatters = directory_formatters cloned_pathfmt.filename_formatter = filename_formatter cloned_pathfmt.clean_segment = clean_segment cloned_pathfmt.clean_path = clean_path if kwdict_fallback is not None: cloned_pathfmt.kwdict['_fallback'] = kwdict_fallback cloned_pathfmt.build_path() # print(cloned_pathfmt.path) # print(cloned_pathfmt.filename) # print(cloned_pathfmt.kwdict) # print(cloned_pathfmt) self.calls['prepare'].append(cloned_pathfmt.path) return pathfmt def run(self, pathfmt: gallery_dl.path.PathFormat): """Execute the postprocessor for a file""" self.calls['run'].append(pathfmt.path) def run_metadata(self, pathfmt: gallery_dl.path.PathFormat): """Execute the postprocessor for a file""" self.calls['run_metadata'].append(pathfmt.path) def run_after(self, pathfmt: gallery_dl.path.PathFormat): """Execute postprocessor after moving a file to its target location""" self.calls['run_after'].append(pathfmt.path) def run_final(self, pathfmt: gallery_dl.path.PathFormat, status: int): """Postprocessor finalization after all files have been downloaded""" self.calls['run_final'].append((pathfmt.path, status)) class FixFileNameFormatterWrapper: """Wraps file name formatter for ensuring a valid file name length""" def __init__(self, formatter: gallery_dl.formatter.StringFormatter): self.formatter = formatter def __call__(self, *args, **kwargs) -> str: path = self.formatter(*args, **kwargs) ews = len(path) > 0 and path[-1] == os.sep parts = list(map(fix_filename_ending_extension, map(fix_filename_length, map(fix_filename_ending_extension, Path(path).parts)))) return str(Path(*parts)) + (os.sep if ews else '') def format_map(self, kwdict): return self(kwdict=kwdict) def fix_filename_length(filename: str) -> str: """Ensures a segment has a valid file name length""" if len(filename.encode()) > 240: extension = Path(filename).suffix extension_bytes_length = len(extension.encode()) stem_bytes = Path(filename).stem.encode() fixed_stem_bytes = stem_bytes[:240-extension_bytes_length] fixed_stem = fixed_stem_bytes.decode(errors="ignore") return fixed_stem + extension return filename def fix_filename_ending_extension(filename: str) -> str: if (fp := Path(filename)).stem[-1:] in ('.', ' '): return str(fp.parent.joinpath(f"{fp.stem.rstrip('. ')}{fp.suffix}")) return filename if __name__ == "__main__": kwargs: Dict[str, Any] = dict() sys.exit(main(**kwargs))