#!/usr/bin/env python3 # -*- encoding: utf-8 -*- import datetime import hashlib import json import multiprocessing import time import traceback from pathlib import Path from typing import Any, Dict, FrozenSet, Generator, List, Set, TypeVar import colored from .system.cmdline_parser import parse_cmdline from .system.flattener import flatten_generator from .system.format_file_size import format_power10 from .system.hexhashof import hexhashof WORKERS = 4 T = TypeVar('T') def cmdline(encoded_args: str = None): if encoded_args is None: return run_with_config() else: return parse_cmdline(run_with_config, encoded_args) def run_with_config(): main() def get_first_elem(e): return e[0] def get_second_elem(e): return e[1] def reverse_key_value(e): return e[1], e[0] class FileHash: def __init__(self, file: Path, hash: str): self.file = file self.hash = hash class FileHasher(multiprocessing.Process): def __init__(self, file_queue, hash_queue): multiprocessing.Process.__init__(self) self.file_queue = file_queue self.hash_queue = hash_queue def run(self): proc_name = self.name try: while True: seq, total, file = self.file_queue.get() if file is not None: print(colored.stylize( f'{proc_name}:{colored.fg("cyan")} {seq}/{total}:{colored.attr("reset")}{colored.attr("dim")} {file}', [colored.fg('yellow')])) self.hash_queue.put(FileHash( str(file), hexhashof(Path(file).read_bytes(), hashlib.sha256) )) else: print(colored.stylize(f'{proc_name}: Exiting', [ colored.fg('red'), colored.attr('bold')])) self.hash_queue.put(None) break except: traceback.print_exc() try: self.hash_queue.put(None) except: pass return def main(): print(colored.stylize('Reading files...', [ colored.fg('light_cyan'), colored.attr('bold')])) hashes_path = Path('i_gdl_hashes.txt') if not hashes_path.exists(): hashes_path.write_text('') hashes_text = list(filter(len, hashes_path.read_text().splitlines())) hashes: Set[str, str] = frozenset(map(reverse_key_value, filter( get_second_elem, map(lambda a: a.split('|', 1), hashes_text)))) # hashes -> link ; hash files_size: Dict[str, int] = json.loads(Path('i_gdl_fsz.json').read_text()) hashes = frozenset(filter(lambda a: files_size.get(a[0], 0), hashes)) non_empty_files_size_dict = dict( filter(lambda a: a[1], files_size.items())) non_empty_files = frozenset(non_empty_files_size_dict.keys()) downloaded_files_to_be_hashed = non_empty_files.difference( map(get_first_elem, hashes)) total_file_size_bytes = sum(files_size.values()) total_file_size = format_power10(total_file_size_bytes) hashed_size_bytes = sum( map(lambda l: files_size[l], map(get_first_elem, hashes))) hashes = set(hashes) total = len(downloaded_files_to_be_hashed) files_queue = multiprocessing.Queue() hashes_queue = multiprocessing.Queue() print(colored.stylize('Filling queues...', [ colored.fg('light_cyan'), colored.attr('bold')])) for enumeration, downloaded_file_to_be_hashed in enumerate(sorted(downloaded_files_to_be_hashed)): files_queue.put( (enumeration+1, total, str(downloaded_file_to_be_hashed))) del enumeration del total for _ in range(WORKERS): files_queue.put((0, 0, None)) print(colored.stylize('Starting processes...', [ colored.fg('light_cyan'), colored.attr('bold')])) workers = list() for _ in range(WORKERS): worker = FileHasher(files_queue, hashes_queue) workers.append(worker) for worker in workers: worker.start() # raise Exception('-'*50) print(colored.stylize('Listening queues...', [ colored.fg('light_cyan'), colored.attr('bold')])) timings: List[Tuple[float, float]] = list() with hashes_path.open('at') as hashes_handler: active_workers = WORKERS while active_workers > 0: file_hash: FileHash = hashes_queue.get() if file_hash is not None: hashed_size_bytes += files_size[file_hash.file] progress_pct = hashed_size_bytes / max(1, total_file_size_bytes) timings.append((progress_pct, time.time())) while len(timings) > 128: del timings[0] end_prediction = '' if len(timings) > 1: dp = timings[0][0] - timings[-1][0] dt = timings[0][1] - timings[-1][1] secs_pred = (1 - progress_pct) * (dt / dp) td = datetime.timedelta(seconds=secs_pred) end_prediction = (f' - {td}' + f' - {datetime.datetime.now() + td}') print(colored.stylize( '%11.6f%% - %s of %s%s' % ( 100*progress_pct, format_power10(hashed_size_bytes), total_file_size, end_prediction), [colored.fg('light_green'), colored.attr('bold')])) hashes_handler.write(f'{file_hash.hash}|{file_hash.file}\n') else: active_workers -= 1 del file_hash del active_workers print(colored.stylize('Stopping processes...', [ colored.fg('light_cyan'), colored.attr('bold')])) for worker in workers: worker.join() del worker del workers files_queue.close() files_queue.join_thread() hashes_queue.close() hashes_queue.join_thread() print(colored.stylize('Sorting output file...', [ colored.fg('light_cyan'), colored.attr('bold')])) hashes_path.write_text( '\n'.join( list(filter( lambda a: files_size.get(a.split('|', 1)[1], 0), sorted(hashes_path.read_text().splitlines()) )) ) + '\n') print(colored.stylize('Pointing out repeated hashes...', [ colored.fg('light_cyan'), colored.attr('bold')])) repeated_hashes = dict() for hashed, location in map(lambda a: a.split('|', 1), sorted(hashes_path.read_text().splitlines())): if hashed not in repeated_hashes: repeated_hashes[hashed] = list() repeated_hashes[hashed].append(location) Path('i_gdl_rh.json').write_text(json.dumps( dict(sorted( list(filter(lambda a: len(a[1]) > 1, repeated_hashes.items())), key=lambda a: (-len(a[1]), a[0]) )), indent=1, )) print(colored.stylize( 'Done', [colored.fg('light_cyan'), colored.attr('bold')]))