reddit-image-wall-rs/gallery-dl-scriptable

302 lines
11 KiB
Plaintext
Raw Normal View History

2020-11-21 14:56:16 +00:00
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
2021-01-23 06:17:03 +00:00
import argparse
import json
import os
import pickle
import shutil
2020-11-21 14:56:16 +00:00
import sys
from pathlib import Path
2021-01-23 06:17:03 +00:00
from typing import Any, Dict, List
2020-11-21 14:56:16 +00:00
2021-01-23 06:17:03 +00:00
import colored as clrlib
import gallery_dl
import gallery_dl.config
import gallery_dl.extractor
import gallery_dl.job
import gallery_dl.postprocessor.common
import gallery_dl.util
2020-11-21 14:56:16 +00:00
2021-01-23 06:17:03 +00:00
PathFormatGalleryDl: gallery_dl.util.PathFormat = getattr(gallery_dl.util, 'PathFormatOriginal', gallery_dl.util.PathFormat)
2020-11-21 14:56:16 +00:00
2021-01-23 06:17:03 +00:00
def parse_args(args: List[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description='Uses Gallery-DL to download a single link.')
parser.add_argument('runner_identification', type=str,
help='The identification for the logs')
parser.add_argument('remaining_links', type=int,
help='The number of links remaining')
parser.add_argument('destination_folder', type=Path,
help='The destination folder for downloaded data')
parser.add_argument('desired_url', type=str,
help='The URL to be fetched')
parser.add_argument('latest_image_txt', type=Path, default=Path('latest_image_download.txt'),
nargs='?', help='')
parser.add_argument('--verbose', default=False, const=True, action='store_const',
help='Be verbose on logs')
return parser.parse_args(args)
2020-11-21 14:56:16 +00:00
2021-01-23 06:17:03 +00:00
def get_printer(has_its_own_line: int, remaining_links: int, thread_id: str, latest_success_fetch_store: Path):
return ColoredLineOutput(
has_its_own_line,
prefix='' +
clrlib.stylize('%9d' % remaining_links, [clrlib.fg('light_cyan')]) +
clrlib.stylize('@', [clrlib.fg('light_red')]) +
clrlib.stylize(thread_id, [clrlib.fg('yellow')]) +
clrlib.stylize('= ', [clrlib.fg('dark_gray')]),
suffix=('\033[K\033[0;0H' if has_its_own_line else ''),
prefixsz=len(('%9d' % 0)+' '+thread_id),
suffixsz=0,
write_successes_to=latest_success_fetch_store,
)
def main(args: List[str] = sys.argv[1:]) -> int:
parsed = parse_args(args)
configure_gdl(parsed.destination_folder, parsed.verbose)
job = DownloadJobWithCallSaverPostProcessor(parsed.desired_url)
job.out = get_printer(0, parsed.remaining_links, parsed.runner_identification, parsed.latest_image_txt)
job.out.message(parsed.desired_url, clrlib.fg('light_magenta'))
rc = job.run()
files = job.cspp.calls['prepare'].copy()
files = list(filter(len, files))
has_changed = True
while has_changed:
has_changed = False
for seq, fl in enumerate(files):
if not (pth := Path(fl)).exists():
candidates = sorted(list(filter(
lambda p: (p.name.startswith(pth.name)
and
p.suffix != '.part'
and
p.suffix != '.json'),
pth.parent.iterdir())),
key=lambda p: len(p.name)
)
if len(candidates) > 0:
files[seq] = str(candidates[0])
has_changed = True
break
else:
rc |= 256
# raise Exception(pth.name, candidates, files)
del has_changed
os.sync()
sys.stdout.write(json.dumps(dict(
status=rc,
link_requested=parsed.desired_url,
link_effective=job.extractor.url,
files=files,
), indent=4))
return 0
def configure_gdl(destination_folder: Path, verbose: bool):
'''Configures Gallery-DL for usage.'''
parser = gallery_dl.option.build_parser()
args = parser.parse_args([
f'--dest={str(destination_folder)}',
'--write-metadata',
# '--write-tags',
# '--write-log=i_gdl_log.txt',
'--write-unsupported=i_gdl_unsupported.txt',
# '--quiet',
*(['--verbose'] if verbose else []),
'--retries=1',
# '--retries=7',
# '--limit-rate=1500k',
])
gallery_dl.output.initialize_logging(args.loglevel)
# configuration
if args.load_config:
gallery_dl.config.load()
if args.cfgfiles:
gallery_dl.config.load(args.cfgfiles, strict=True)
if args.yamlfiles:
gallery_dl.config.load(args.yamlfiles, strict=True, fmt="yaml")
if args.postprocessors:
gallery_dl.config.set((), "postprocessors", args.postprocessors)
if args.abort:
gallery_dl.config.set((), "skip", "abort:" + str(args.abort))
for opts in args.options:
gallery_dl.config.set(*opts)
# loglevels
gallery_dl.output.configure_logging(args.loglevel)
gallery_dl.output.select = ColoredLineOutput
gallery_dl.util.PathFormatOriginal = PathFormatGalleryDl
gallery_dl.util.PathFormat = OverriddenPathFormat
class DownloadJobWithCallSaverPostProcessor(gallery_dl.job.DownloadJob):
def __init__(self, url, parent=None):
super().__init__(url, parent)
self.cspp = CallSaverPostProcessor(
self) if parent is None else parent.cspp
def initialize(self, kwdict=None):
super().initialize(kwdict)
if not isinstance(self.hooks, tuple):
self.hooks['prepare'].append(self.cspp.prepare)
class ColoredLineOutput(gallery_dl.output.TerminalOutput):
def __init__(self, sameline=False, prefix="", suffix="", prefixsz=0, suffixsz=0, write_successes_to=None):
super().__init__()
self.sameline = sameline
self.eol = '\r' if sameline else '\n'
self.prefix = prefix
self.suffix = suffix
self.prefixsz = prefixsz
self.suffixsz = suffixsz
self.write_successes_to = write_successes_to
self._termsize_update()
def start(self, path):
self.message(path,
clrlib.fg("light_yellow"),
)
def skip(self, path):
self.message(path,
clrlib.attr('dim'),
)
def success(self, path, tries):
self.message(path,
clrlib.attr('bold'),
clrlib.fg('light_green'),
)
if self.write_successes_to is not None:
self.write_successes_to.write_text(path)
def message(self, txt: str, *attrs: List[str], do_print: bool = True) -> str:
"""Prints a message with given formatters"""
clrtxt = clrlib.stylize(self.shorten(txt), attrs)
fmtd = f"{self.prefix}{clrtxt}{self.suffix}"
if do_print:
print(fmtd, flush=True, end=self.eol, file=sys.stderr)
return fmtd
def shorten(self, txt):
self._termsize_update()
self.width = self.termsize - self.prefixsz - self.suffixsz - 1
return super().shorten(txt)
def _termsize_update(self):
self.termsize = shutil.get_terminal_size().columns
class OverriddenPathFormat(PathFormatGalleryDl):
def __init__(self, extractor):
super().__init__(extractor)
self.clean_path = FixFileNameFormatterWrapper(self.clean_path)
class CallSaverPostProcessor(gallery_dl.postprocessor.common.PostProcessor):
def __init__(self, job):
super().__init__(job)
self.calls = dict(
prepare=list(),
run=list(),
run_metadata=list(),
run_after=list(),
run_final=list(),
)
def prepare(self, pathfmt: gallery_dl.util.PathFormat):
"""Update file paths, etc."""
directory_formatters = pathfmt.directory_formatters
filename_formatter = pathfmt.filename_formatter
clean_segment = pathfmt.clean_segment
clean_path = pathfmt.clean_path
kwdict_fallback = pathfmt.kwdict.get('_fallback', None)
pathfmt.directory_formatters = None
pathfmt.filename_formatter = None
pathfmt.clean_segment = None
pathfmt.clean_path = None
if kwdict_fallback is not None:
pathfmt.kwdict['_fallback'] = None
cloned_pathfmt: gallery_dl.util.PathFormat = pickle.loads(pickle.dumps(pathfmt))
pathfmt.directory_formatters = directory_formatters
pathfmt.filename_formatter = filename_formatter
pathfmt.clean_segment = clean_segment
pathfmt.clean_path = clean_path
if kwdict_fallback is not None:
pathfmt.kwdict['_fallback'] = kwdict_fallback
cloned_pathfmt.directory_formatters = directory_formatters
cloned_pathfmt.filename_formatter = filename_formatter
cloned_pathfmt.clean_segment = clean_segment
cloned_pathfmt.clean_path = clean_path
if kwdict_fallback is not None:
cloned_pathfmt.kwdict['_fallback'] = kwdict_fallback
cloned_pathfmt.build_path()
self.calls['prepare'].append(cloned_pathfmt.path)
return pathfmt
def run(self, pathfmt: gallery_dl.util.PathFormat):
"""Execute the postprocessor for a file"""
self.calls['run'].append(pathfmt.path)
def run_metadata(self, pathfmt: gallery_dl.util.PathFormat):
"""Execute the postprocessor for a file"""
self.calls['run_metadata'].append(pathfmt.path)
def run_after(self, pathfmt: gallery_dl.util.PathFormat):
"""Execute postprocessor after moving a file to its target location"""
self.calls['run_after'].append(pathfmt.path)
def run_final(self, pathfmt: gallery_dl.util.PathFormat, status: int):
"""Postprocessor finalization after all files have been downloaded"""
self.calls['run_final'].append((pathfmt.path, status))
class FixFileNameFormatterWrapper:
"""Wraps file name formatter for ensuring a valid file name length"""
def __init__(self, formatter: gallery_dl.util.Formatter):
self.formatter = formatter
def __call__(self, *args, **kwargs) -> str:
path = self.formatter(*args, **kwargs)
parts = list(map(fix_filename_ending_extension,
map(fix_filename_length,
map(fix_filename_ending_extension,
Path(path).parts))))
return str(Path(*parts))
def fix_filename_length(filename: str) -> str:
"""Ensures a segment has a valid file name length"""
if len(filename.encode()) > 240:
extension = Path(filename).suffix
extension_bytes_length = len(extension.encode())
stem_bytes = Path(filename).stem.encode()
fixed_stem_bytes = stem_bytes[:240-extension_bytes_length]
fixed_stem = fixed_stem_bytes.decode(errors="ignore")
return fixed_stem + extension
return filename
def fix_filename_ending_extension(filename: str) -> str:
if (fp := Path(filename)).stem[-1:] in ('.', ' '):
return str(fp.parent.joinpath(f"{fp.stem.rstrip('. ')}{fp.suffix}"))
return filename
if __name__ == "__main__":
kwargs: Dict[str, Any] = dict()
sys.exit(main(**kwargs))