#!/usr/bin/env python3 # -*- encoding: utf-8 -*- import bz2 import hashlib import json import lzma import multiprocessing import socket import sys import time import traceback import zlib from concurrent.futures import Future, ProcessPoolExecutor from io import BytesIO, StringIO from pathlib import Path from typing import Any, Callable, Dict, List, Tuple, Type import requests import zstandard as zstd COMPRESSORS: List[Tuple[str, Callable[[bytes], bytes]]] = [ ('uncompressed', lambda uncompressed: uncompressed), ('zlib', lambda uncompressed: zlib.compress(uncompressed, 9)), ('bz2', lambda uncompressed: bz2.compress(uncompressed, 9)), ('lzma', lzma.compress), ('zstd', lambda uncompressed: zstd.ZstdCompressor( level=22).compress(uncompressed)), ] class HashMismatch(Exception): pass def check_best_compressions(uncompressed: bytes) -> Dict[str, Tuple[int, float]]: algos = dict() for name, callback in COMPRESSORS: time_start = time.time() compressed_size = len(callback(uncompressed)) time_end = time.time() algos[name] = (compressed_size, time_end - time_start) return algos def hexhashof(bts: bytes, using: Callable[[], Any]) -> str: m = using() m.update(bts) return m.hexdigest() def upload_log(url, sender, **content): sent = False while not sent: try: requests.post(url, data={ 'sender': json.dumps(sender), 'content': json.dumps(content), }).raise_for_status() sent = True except: traceback.print_exc() def upload_job(url, **content): sent = False while not sent: try: requests.post(url, json=content).raise_for_status() sent = True except: traceback.print_exc() def do_work(base_address: str, worker_id: str): while True: try: tick_set = time.time() job_id = None try: with requests.get(f'{base_address}job') as response: response.raise_for_status() job_id = response.text except KeyboardInterrupt: raise except: pass if job_id is None: continue elif job_id == 'done': break else: tick_downloading_job_started = time.time() tick_downloading_job_retry_started = tick_downloading_job_started tick_downloading_job_retry_count = 0 job = None while job is None: try: tick_downloading_job_retry_started = time.time() with requests.get(f'{base_address}job/{job_id}') as response: response.raise_for_status() job = response.json() except KeyboardInterrupt: raise except: tick_downloading_job_retry_count += 1 sio = StringIO() traceback.print_exc(file=sio) formatted_exception = sio.getvalue() print(formatted_exception, file=sys.stderr) upload_log( f'{base_address}log/error', worker_id, during='JobDownload', tick_set=tick_set, traceback=formatted_exception, job_id=job_id, tick_downloading_job_started=tick_downloading_job_started, tick_downloading_job_retry_started=tick_downloading_job_retry_started, tick_downloading_job_retry_count=tick_downloading_job_retry_count, ) tick_downloading_job_ended = time.time() tick_downloading_image_started = time.time() tick_downloading_image_retry_started = tick_downloading_image_started tick_downloading_image_retry_count = 0 tick_downloading_image_retry_mismatch = 0 image_bytes = None while image_bytes is None: try: tick_downloading_job_retry_started = time.time() with requests.get(f'{base_address}{job["file"]}') as response: if response.status_code == 404: break response.raise_for_status() response.raw.decode_content = True if hexhashof(response.content, hashlib.sha256) == job['hsh']: image_bytes = response.content else: raise HashMismatch() except KeyboardInterrupt: raise except BaseException as exception: tick_downloading_image_retry_count += 1 if isinstance(exception, HashMismatch): tick_downloading_image_retry_mismatch += 1 sio = StringIO() traceback.print_exc(file=sio) formatted_exception = sio.getvalue() print(formatted_exception, file=sys.stderr) upload_log( f'{base_address}log/error', worker_id, during='ImageDownload', tick_set=tick_set, traceback=formatted_exception, job_id=job_id, file=job["file"], hash=job["hsh"], tick_downloading_job_started=tick_downloading_job_started, tick_downloading_job_retry_started=tick_downloading_job_retry_started, tick_downloading_job_retry_count=tick_downloading_job_retry_count, tick_downloading_job_ended=tick_downloading_job_ended, tick_downloading_image_started=tick_downloading_image_started, tick_downloading_image_retry_started=tick_downloading_image_retry_started, tick_downloading_image_retry_count=tick_downloading_image_retry_count, tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, ) if tick_downloading_image_retry_mismatch >= 10: break tick_downloading_image_ended = time.time() if image_bytes is None: upload_job( f'{base_address}job/{job_id}', status='NoValidImageData', tick_set=tick_set, job_id=job_id, file=job["file"], hash=job["hsh"], tick_downloading_job_started=tick_downloading_job_started, tick_downloading_job_retry_started=tick_downloading_job_retry_started, tick_downloading_job_retry_count=tick_downloading_job_retry_count, tick_downloading_job_ended=tick_downloading_job_ended, tick_downloading_image_started=tick_downloading_image_started, tick_downloading_image_retry_started=tick_downloading_image_retry_started, tick_downloading_image_retry_count=tick_downloading_image_retry_count, tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, tick_downloading_image_ended=tick_downloading_image_ended, ) else: tick_image_compress_start = time.time() compressions = check_best_compressions(image_bytes) tick_image_compress_ended = time.time() tick_uploading_started = time.time() upload_job( f'{base_address}job/{job_id}', status='Complete', worker=worker_id, tick_set=tick_set, job_id=job_id, file=job["file"], hash=job["hsh"], compressions=compressions, tick_downloading_job_started=tick_downloading_job_started, tick_downloading_job_retry_started=tick_downloading_job_retry_started, tick_downloading_job_retry_count=tick_downloading_job_retry_count, tick_downloading_job_ended=tick_downloading_job_ended, tick_downloading_image_started=tick_downloading_image_started, tick_downloading_image_retry_started=tick_downloading_image_retry_started, tick_downloading_image_retry_count=tick_downloading_image_retry_count, tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, tick_downloading_image_ended=tick_downloading_image_ended, tick_image_compress_start=tick_image_compress_start, tick_image_compress_ended=tick_image_compress_ended, ) tick_uploading_ended = time.time() upload_log( f'{base_address}log/performance', worker_id, file=job["file"], hash=job["hsh"], tick_downloading_job_started=tick_downloading_job_started, tick_downloading_job_retry_started=tick_downloading_job_retry_started, tick_downloading_job_retry_count=tick_downloading_job_retry_count, tick_downloading_job_ended=tick_downloading_job_ended, tick_downloading_image_started=tick_downloading_image_started, tick_downloading_image_retry_started=tick_downloading_image_retry_started, tick_downloading_image_retry_count=tick_downloading_image_retry_count, tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, tick_downloading_image_ended=tick_downloading_image_ended, tick_image_compress_start=tick_image_compress_start, tick_image_compress_ended=tick_image_compress_ended, tick_uploading_started=tick_uploading_started, tick_uploading_ended=tick_uploading_ended, ) tick_uploading_ended = time.time() print(f"Done: {job['hsh']}") except KeyboardInterrupt: raise except: raise def kickstart(base_address: str): job_count: int = multiprocessing.cpu_count() * 2 # job_count = 1 hostname: str = socket.gethostname() with ProcessPoolExecutor(job_count) as pe: for job_seq in range(job_count): job_id = f'{hostname}-{job_seq}' def on_completed(job: Future): job.result() pe.submit( do_work, worker_id=job_id, base_address=base_address, ).add_done_callback(on_completed) print('Ready') def main(): if len(sys.argv) == 2: base_address = sys.argv[1] if not base_address.startswith('http'): base_address = 'http://'+base_address if not base_address.endswith('/'): base_address += '/' kickstart(base_address) else: print(f'Usage:\n {sys.argv[0]} ') if __name__ == "__main__": main()