358 lines
13 KiB
Python
Executable File
358 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- encoding: utf-8 -*-
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import pickle
|
|
import shutil
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Tuple
|
|
|
|
import colored as clrlib
|
|
import gallery_dl
|
|
import gallery_dl.config
|
|
import gallery_dl.extractor
|
|
import gallery_dl.job
|
|
import gallery_dl.path
|
|
import gallery_dl.postprocessor.common
|
|
import gallery_dl.util
|
|
|
|
if os.environ.get('FORCE_COLOR', '').strip() == '':
|
|
os.environ['FORCE_COLOR'] = '1'
|
|
|
|
PathFormatGalleryDl: gallery_dl.path.PathFormat = getattr(
|
|
gallery_dl.util, 'PathFormatOriginal', gallery_dl.path.PathFormat)
|
|
|
|
|
|
def parse_args(args: List[str]) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description='Uses Gallery-DL to download a single link.')
|
|
parser.add_argument('runner_identification', type=str,
|
|
help='The identification for the logs')
|
|
parser.add_argument('remaining_links', type=int,
|
|
help='The number of links remaining')
|
|
parser.add_argument('destination_folder', type=Path,
|
|
help='The destination folder for downloaded data')
|
|
parser.add_argument('desired_url', type=str,
|
|
help='The URL to be fetched')
|
|
parser.add_argument('latest_image_txt', type=Path, default=Path('latest_image_download.txt'),
|
|
nargs='?', help='')
|
|
parser.add_argument('--verbose', default=False, const=True, action='store_const',
|
|
help='Be verbose on logs')
|
|
return parser.parse_args(args)
|
|
|
|
|
|
def get_printer(has_its_own_line: int, remaining_links: int, thread_id: str, latest_success_fetch_store: Path):
|
|
return ColoredLineOutput(
|
|
has_its_own_line,
|
|
prefix='' +
|
|
clrlib.stylize('%9d' % remaining_links, [clrlib.fg('light_cyan')]) +
|
|
clrlib.stylize('@', [clrlib.fg('light_red')]) +
|
|
clrlib.stylize(thread_id, [clrlib.fg('yellow')]) +
|
|
clrlib.stylize('= ', [clrlib.fg('dark_gray')]),
|
|
suffix=('\033[K\033[0;0H' if has_its_own_line else ''),
|
|
prefixsz=len(('%9d' % 0)+' '+thread_id),
|
|
suffixsz=0,
|
|
write_successes_to=latest_success_fetch_store,
|
|
)
|
|
|
|
|
|
class SaverLoggingHandler(logging.Handler):
|
|
def __init__(self, level=logging.NOTSET):
|
|
super().__init__(level)
|
|
self.messages: List[Tuple[int, str, str]] = list()
|
|
|
|
def emit(self, record):
|
|
try:
|
|
msg = self.format(record)
|
|
self.messages.append((record.levelno, record.levelname, msg))
|
|
self.flush()
|
|
except RecursionError:
|
|
raise
|
|
except Exception:
|
|
self.handleError(record)
|
|
|
|
|
|
def main(args: List[str] = sys.argv[1:]) -> int:
|
|
parsed = parse_args(args)
|
|
log_handler = SaverLoggingHandler(level=logging.DEBUG)
|
|
configure_gdl(parsed.destination_folder, parsed.verbose, log_handler)
|
|
job = DownloadJobWithCallSaverPostProcessor(parsed.desired_url)
|
|
job.out = get_printer(0, parsed.remaining_links,
|
|
parsed.runner_identification, parsed.latest_image_txt)
|
|
job.out.message(parsed.desired_url, clrlib.fg('light_magenta'))
|
|
rc = job.run()
|
|
files = job.cspp.calls['prepare'].copy()
|
|
files = list(filter(len, files))
|
|
has_changed = True
|
|
while has_changed:
|
|
has_changed = False
|
|
for seq, fl in enumerate(files):
|
|
if not (pth := Path(fl)).exists():
|
|
candidates = sorted(list(filter(
|
|
lambda p: (p.name.startswith(pth.name)
|
|
and
|
|
p.suffix != '.part'
|
|
and
|
|
p.suffix != '.json'),
|
|
pth.parent.iterdir())),
|
|
key=lambda p: len(p.name)
|
|
)
|
|
if len(candidates) > 0:
|
|
files[seq] = str(candidates[0])
|
|
has_changed = True
|
|
break
|
|
else:
|
|
rc |= 256
|
|
del has_changed
|
|
os.sync()
|
|
sys.stdout.write(json.dumps(dict(
|
|
status=rc,
|
|
link_requested=parsed.desired_url,
|
|
link_effective=job.extractor.url,
|
|
logs=log_handler.messages,
|
|
files=files,
|
|
), indent=4))
|
|
return 0
|
|
|
|
|
|
def configure_gdl(destination_folder: Path, verbose: bool, log_handler: logging.Handler = None):
|
|
'''Configures Gallery-DL for usage.'''
|
|
parser = gallery_dl.option.build_parser()
|
|
args = parser.parse_args([
|
|
f'--dest={str(destination_folder)}',
|
|
'--write-metadata',
|
|
# '--write-tags',
|
|
# '--write-log=i_gdl_log.txt',
|
|
# '--write-unsupported=i_gdl_unsupported.txt',
|
|
# '--quiet',
|
|
*(['--verbose'] if verbose else []),
|
|
'--retries=1',
|
|
# '--retries=7',
|
|
# '--limit-rate=1500k',
|
|
])
|
|
gallery_dl.output.initialize_logging(args.loglevel)
|
|
|
|
# configuration
|
|
if args.load_config:
|
|
gallery_dl.config.load()
|
|
if args.cfgfiles:
|
|
gallery_dl.config.load(args.cfgfiles, strict=True)
|
|
if args.yamlfiles:
|
|
gallery_dl.config.load(args.yamlfiles, strict=True, fmt="yaml")
|
|
if args.postprocessors:
|
|
gallery_dl.config.set((), "postprocessors", args.postprocessors)
|
|
if args.abort:
|
|
gallery_dl.config.set((), "skip", "abort:" + str(args.abort))
|
|
for opts in args.options:
|
|
gallery_dl.config.set(*opts)
|
|
|
|
# loglevels
|
|
gallery_dl.output.configure_logging(args.loglevel)
|
|
|
|
gallery_dl.output.select = ColoredLineOutput
|
|
|
|
gallery_dl.path.PathFormatOriginal = PathFormatGalleryDl
|
|
gallery_dl.path.PathFormat = OverriddenPathFormat
|
|
|
|
if log_handler is not None:
|
|
root_logger = logging.getLogger()
|
|
root_logger.addHandler(log_handler)
|
|
root_logger.setLevel(logging.DEBUG)
|
|
|
|
|
|
class DownloadJobWithCallSaverPostProcessor(gallery_dl.job.DownloadJob):
|
|
def __init__(self, url, parent=None):
|
|
super().__init__(url, parent)
|
|
self.cspp = CallSaverPostProcessor(
|
|
self) if parent is None else parent.cspp
|
|
|
|
def initialize(self, kwdict=None):
|
|
super().initialize(kwdict)
|
|
if not isinstance(self.hooks, tuple):
|
|
self.hooks['prepare'].append(self.cspp.prepare)
|
|
|
|
|
|
class ColoredLineOutput(gallery_dl.output.TerminalOutput):
|
|
def __init__(self, sameline=False, prefix="", suffix="", prefixsz=0, suffixsz=0, write_successes_to=None):
|
|
super().__init__()
|
|
self.sameline = sameline
|
|
self.eol = '\r' if sameline else '\n'
|
|
self.prefix = prefix
|
|
self.suffix = suffix
|
|
self.prefixsz = prefixsz
|
|
self.suffixsz = suffixsz
|
|
self.write_successes_to = write_successes_to
|
|
self._termsize_update()
|
|
|
|
def start(self, path):
|
|
self.message(path,
|
|
clrlib.fg("light_yellow"),
|
|
)
|
|
|
|
def skip(self, path):
|
|
self.message(path,
|
|
clrlib.attr('dim'),
|
|
)
|
|
|
|
def success(self, path, tries):
|
|
self.message(path,
|
|
clrlib.attr('bold'),
|
|
clrlib.fg('light_green'),
|
|
)
|
|
if self.write_successes_to is not None:
|
|
self.write_successes_to.write_text(path)
|
|
|
|
def progress(self, bytes_total, bytes_downloaded, bytes_per_second):
|
|
bdl = gallery_dl.util.format_value(bytes_downloaded)
|
|
bps = gallery_dl.util.format_value(bytes_per_second)
|
|
if bytes_total is None:
|
|
self.message("{:>7}B {:>7}B/s ".format(bdl, bps),
|
|
clrlib.fg('dark_gray'),
|
|
)
|
|
else:
|
|
self.message("{:>3}% {:>7}B {:>7}B/s ".format(
|
|
bytes_downloaded * 100 // bytes_total, bdl, bps),
|
|
clrlib.fg('dark_gray'),
|
|
)
|
|
|
|
def message(self, txt: str, *attrs: List[str], do_print: bool = True) -> str:
|
|
"""Prints a message with given formatters"""
|
|
clrtxt = clrlib.stylize(self.shorten(txt), attrs)
|
|
fmtd = f"{self.prefix}{clrtxt}{self.suffix}"
|
|
if do_print:
|
|
print(fmtd, file=sys.stderr, flush=True, end=self.eol)
|
|
return fmtd
|
|
|
|
def shorten(self, txt):
|
|
self._termsize_update()
|
|
self.width = self.termsize - self.prefixsz - self.suffixsz - 1
|
|
return super().shorten(txt)
|
|
|
|
def _termsize_update(self):
|
|
self.termsize = shutil.get_terminal_size().columns
|
|
|
|
|
|
class OverriddenPathFormat(PathFormatGalleryDl):
|
|
def __init__(self, extractor):
|
|
super().__init__(extractor)
|
|
self.clean_path = FixFileNameFormatterWrapper(self.clean_path)
|
|
|
|
|
|
class CallSaverPostProcessor(gallery_dl.postprocessor.common.PostProcessor):
|
|
def __init__(self, job):
|
|
super().__init__(job)
|
|
self.calls = dict(
|
|
prepare=list(),
|
|
run=list(),
|
|
run_metadata=list(),
|
|
run_after=list(),
|
|
run_final=list(),
|
|
)
|
|
|
|
def prepare(self, pathfmt: gallery_dl.path.PathFormat):
|
|
"""Update file paths, etc."""
|
|
|
|
directory_formatters = pathfmt.directory_formatters
|
|
filename_formatter = pathfmt.filename_formatter
|
|
clean_segment = pathfmt.clean_segment
|
|
clean_path = pathfmt.clean_path
|
|
kwdict_fallback = pathfmt.kwdict.get('_fallback', None)
|
|
|
|
# pp = pprint.PrettyPrinter()
|
|
# pp.pprint(pathfmt)
|
|
# pp.pprint(pathfmt.__dict__)
|
|
|
|
pathfmt.directory_formatters = None
|
|
pathfmt.filename_formatter = None
|
|
pathfmt.clean_segment = None
|
|
pathfmt.clean_path = None
|
|
if kwdict_fallback is not None:
|
|
pathfmt.kwdict['_fallback'] = None
|
|
|
|
cloned_pathfmt: gallery_dl.path.PathFormat = pickle.loads(
|
|
pickle.dumps(pathfmt))
|
|
|
|
pathfmt.directory_formatters = directory_formatters
|
|
pathfmt.filename_formatter = filename_formatter
|
|
pathfmt.clean_segment = clean_segment
|
|
pathfmt.clean_path = clean_path
|
|
if kwdict_fallback is not None:
|
|
pathfmt.kwdict['_fallback'] = kwdict_fallback
|
|
|
|
cloned_pathfmt.directory_formatters = directory_formatters
|
|
cloned_pathfmt.filename_formatter = filename_formatter
|
|
cloned_pathfmt.clean_segment = clean_segment
|
|
cloned_pathfmt.clean_path = clean_path
|
|
if kwdict_fallback is not None:
|
|
cloned_pathfmt.kwdict['_fallback'] = kwdict_fallback
|
|
|
|
cloned_pathfmt.build_path()
|
|
# print(cloned_pathfmt.path)
|
|
# print(cloned_pathfmt.filename)
|
|
# print(cloned_pathfmt.kwdict)
|
|
# print(cloned_pathfmt)
|
|
self.calls['prepare'].append(cloned_pathfmt.path)
|
|
return pathfmt
|
|
|
|
def run(self, pathfmt: gallery_dl.path.PathFormat):
|
|
"""Execute the postprocessor for a file"""
|
|
self.calls['run'].append(pathfmt.path)
|
|
|
|
def run_metadata(self, pathfmt: gallery_dl.path.PathFormat):
|
|
"""Execute the postprocessor for a file"""
|
|
self.calls['run_metadata'].append(pathfmt.path)
|
|
|
|
def run_after(self, pathfmt: gallery_dl.path.PathFormat):
|
|
"""Execute postprocessor after moving a file to its target location"""
|
|
self.calls['run_after'].append(pathfmt.path)
|
|
|
|
def run_final(self, pathfmt: gallery_dl.path.PathFormat, status: int):
|
|
"""Postprocessor finalization after all files have been downloaded"""
|
|
self.calls['run_final'].append((pathfmt.path, status))
|
|
|
|
|
|
class FixFileNameFormatterWrapper:
|
|
"""Wraps file name formatter for ensuring a valid file name length"""
|
|
|
|
def __init__(self, formatter: gallery_dl.formatter.StringFormatter):
|
|
self.formatter = formatter
|
|
|
|
def __call__(self, *args, **kwargs) -> str:
|
|
path = self.formatter(*args, **kwargs)
|
|
ews = len(path) > 0 and path[-1] == os.sep
|
|
parts = list(map(fix_filename_ending_extension,
|
|
map(fix_filename_length,
|
|
map(fix_filename_ending_extension,
|
|
Path(path).parts))))
|
|
return str(Path(*parts)) + (os.sep if ews else '')
|
|
|
|
def format_map(self, kwdict):
|
|
return self(kwdict=kwdict)
|
|
|
|
|
|
def fix_filename_length(filename: str) -> str:
|
|
"""Ensures a segment has a valid file name length"""
|
|
if len(filename.encode()) > 240:
|
|
extension = Path(filename).suffix
|
|
extension_bytes_length = len(extension.encode())
|
|
stem_bytes = Path(filename).stem.encode()
|
|
fixed_stem_bytes = stem_bytes[:240-extension_bytes_length]
|
|
fixed_stem = fixed_stem_bytes.decode(errors="ignore")
|
|
return fixed_stem + extension
|
|
return filename
|
|
|
|
|
|
def fix_filename_ending_extension(filename: str) -> str:
|
|
if (fp := Path(filename)).stem[-1:] in ('.', ' '):
|
|
return str(fp.parent.joinpath(f"{fp.stem.rstrip('. ')}{fp.suffix}"))
|
|
return filename
|
|
|
|
|
|
if __name__ == "__main__":
|
|
kwargs: Dict[str, Any] = dict()
|
|
sys.exit(main(**kwargs))
|