#!/usr/bin/env python3 # -*- encoding: utf-8 -*- import hashlib import json import multiprocessing import socket import sys import time import traceback from concurrent.futures import Future, ProcessPoolExecutor from io import BytesIO, StringIO from pathlib import Path from typing import Any, Callable, Dict, List, Tuple, Type import requests from PIL import Image, ImageDraw, ImageOps class HashMismatch(Exception): pass START = -(sys.maxsize // 2) + 0 END = -(sys.maxsize // 2) + 1 CENTER = -(sys.maxsize // 2) + 2 POS_TOP_LEFT = (START, START) POS_CENTER_LEFT = (START, CENTER) POS_BOTTOM_LEFT = (START, END) POS_TOP_CENTER = (CENTER, START) POS_CENTER_CENTER = (CENTER, CENTER) POS_BOTTOM_CENTER = (CENTER, END) POS_TOP_RIGHT = (END, START) POS_CENTER_RIGHT = (END, CENTER) POS_BOTTOM_RIGHT = (END, END) P_TL = POS_TOP_LEFT P_CL = POS_CENTER_LEFT P_BL = POS_BOTTOM_LEFT P_TC = POS_TOP_CENTER P_CC = POS_CENTER_CENTER P_BC = POS_BOTTOM_CENTER P_TR = POS_TOP_RIGHT P_CR = POS_CENTER_RIGHT P_BR = POS_BOTTOM_RIGHT def edge_propagation_scaling(image: Image.Image, desired_size: Tuple[int, int], paste_position: Tuple[int, int] = P_CC, ) -> Image.Image: image = image.copy() scaled = Image.new(image.mode, desired_size) new_placement_: List[int] = list() for isz, ssz, pp in zip(image.size, scaled.size, paste_position,): if pp == START: new_placement_.append(0) elif pp == END: new_placement_.append(ssz-isz) elif pp == CENTER: new_placement_.append((ssz-isz)//2) else: new_placement_.append(pp) new_placement: Tuple[int, int] = new_placement_[0], new_placement_[1] del new_placement_ scaled.paste(image, new_placement) parts = dict( # left, upper, right, lower t=image.copy().crop( (0, 0, image.size[0], 1)), b=image.copy().crop( (0, image.size[1]-1, image.size[0], image.size[1])), l=image.copy().crop( (0, 0, 1, image.size[1])), r=image.copy().crop( (image.size[0]-1, 0, image.size[0], image.size[1])), ) if (sz := new_placement[1]) > 0: part = parts['t'].copy() resized = part.resize((part.size[0], sz)) scaled.paste(resized, (new_placement[0], 0)) if (sz := scaled.size[1]-(dsp := new_placement[1]+image.size[1])) > 0: part = parts['b'].copy() resized = part.resize((part.size[0], sz)) scaled.paste(resized, (new_placement[0], dsp)) if (sz := new_placement[0]) > 0: part = parts['l'].copy() resized = part.resize((sz, part.size[1])) scaled.paste(resized, (0, new_placement[1])) if (sz := scaled.size[0]-(dsp := new_placement[0]+image.size[0])) > 0: part = parts['r'].copy() resized = part.resize((sz, part.size[1])) scaled.paste(resized, (dsp, new_placement[1])) del parts corners = dict( tl=image.getpixel((0, 0)), tr=image.getpixel((image.size[0]-1, 0)), bl=image.getpixel((0, image.size[1]-1)), br=image.getpixel((image.size[0]-1, image.size[1]-1)), ) draw: ImageDraw.ImageDraw = ImageDraw.Draw(scaled) szt = new_placement[1] szb = scaled.size[1]-(dspb := new_placement[1]+image.size[1]) szl = new_placement[0] szr = scaled.size[0]-(dspr := new_placement[0]+image.size[0]) if szt > 0 and szl > 0: draw.rectangle(((0, 0), (szl-1, szt-1)), corners['tl']) if szt > 0 and szr > 0: draw.rectangle(((dspr, 0), (scaled.size[0], szt-1)), corners['tr']) if szb > 0 and szl > 0: draw.rectangle(((0, dspb), (szl-1, scaled.size[1])), corners['bl']) if szb > 0 and szr > 0: draw.rectangle(((dspr, dspb), scaled.size), corners['br']) del dspr del dspb del szt del szb del szl del szr return scaled def calculate_thumbnail_hashes(image: Image.Image) -> Dict[str, Dict[str, Dict[str, Dict[str, str]]]]: out_dict: Dict[str, Dict[str, Dict[str, Dict[str, str]]]] = dict() max_dimen = max(image.size) for filling in [True, False]: transparent_square = None if filling: transparent_square = edge_propagation_scaling(image, (max_dimen, max_dimen)) else: transparent_square = Image.new('RGBA', (max_dimen, max_dimen)) transparent_square.paste(image, ( (max_dimen - image.size[0]) // 2, (max_dimen - image.size[1]) // 2, )) backgrounds: Dict[str, Dict[str, Dict[str, str]]] = dict() for background in ['#000000', '#FFFFFF']: backgrounded = Image.new('RGB', transparent_square.size, background) backgrounded.paste(transparent_square) sizes: Dict[str, Dict[str, str]] = dict() for size in [4, 8, 16, 24, 32, 48, 64, 72, 96, 128]: resized = backgrounded.copy() resized = resized.resize((size, size)) bit_depths: Dict[str, str] = dict() for bit_depth in range(1, 9): posterized: Image.Image = resized.copy() posterized = ImageOps.posterize(posterized, bit_depth) bio = BytesIO() posterized.save(bio, format='BMP') hashsum = hexhashof(bio.getvalue(), hashlib.md5) bit_depths[str(bit_depth)] = hashsum sizes[str(size)] = bit_depths backgrounds[background] = sizes out_dict['fill' if filling else 'center'] = backgrounds return out_dict def hexhashof(bts: bytes, using: Callable[[], Any]) -> str: m = using() m.update(bts) return m.hexdigest() def upload_log(url, sender, **content): sent = False while not sent: try: requests.post(url, data={ 'sender': json.dumps(sender), 'content': json.dumps(content), }).raise_for_status() sent = True except: traceback.print_exc() def upload_job(url, **content): sent = False while not sent: try: requests.post(url, json=content).raise_for_status() sent = True except: traceback.print_exc() def do_work(base_address: str, worker_id: str): while True: try: tick_set = time.time() job_id = None try: job_id = requests.get(f'{base_address}job').text except KeyboardInterrupt: raise except: pass if job_id is None: continue elif job_id == 'done': break else: tick_downloading_job_started = time.time() tick_downloading_job_retry_started = tick_downloading_job_started tick_downloading_job_retry_count = 0 job = None while job is None: try: tick_downloading_job_retry_started = time.time() with requests.get(f'{base_address}job/{job_id}') as response: response.raise_for_status() job = response.json() except KeyboardInterrupt: raise except: tick_downloading_job_retry_count += 1 sio = StringIO() traceback.print_exc(file=sio) formatted_exception = sio.getvalue() print(formatted_exception, file=sys.stderr) upload_log( f'{base_address}log/error', worker_id, during='JobDownload', tick_set=tick_set, traceback=formatted_exception, job_id=job_id, tick_downloading_job_started=tick_downloading_job_started, tick_downloading_job_retry_started=tick_downloading_job_retry_started, tick_downloading_job_retry_count=tick_downloading_job_retry_count, ) tick_downloading_job_ended = time.time() tick_downloading_image_started = time.time() tick_downloading_image_retry_started = tick_downloading_image_started tick_downloading_image_retry_count = 0 tick_downloading_image_retry_mismatch = 0 image_bytes = None while image_bytes is None: try: tick_downloading_job_retry_started = time.time() with requests.get(f'{base_address}{job["file"]}') as response: if response.status_code == 404: break response.raise_for_status() response.raw.decode_content = True if hexhashof(response.content, hashlib.sha256) == job['hsh']: image_bytes = response.content else: raise HashMismatch() except KeyboardInterrupt: raise except BaseException as exception: tick_downloading_image_retry_count += 1 if isinstance(exception, HashMismatch): tick_downloading_image_retry_mismatch += 1 sio = StringIO() traceback.print_exc(file=sio) formatted_exception = sio.getvalue() print(formatted_exception, file=sys.stderr) upload_log( f'{base_address}log/error', worker_id, during='ImageDownload', tick_set=tick_set, traceback=formatted_exception, job_id=job_id, file=job["file"], hash=job["hsh"], tick_downloading_job_started=tick_downloading_job_started, tick_downloading_job_retry_started=tick_downloading_job_retry_started, tick_downloading_job_retry_count=tick_downloading_job_retry_count, tick_downloading_job_ended=tick_downloading_job_ended, tick_downloading_image_started=tick_downloading_image_started, tick_downloading_image_retry_started=tick_downloading_image_retry_started, tick_downloading_image_retry_count=tick_downloading_image_retry_count, tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, ) if tick_downloading_image_retry_mismatch >= 10: break tick_downloading_image_ended = time.time() if image_bytes is None: upload_job( f'{base_address}job/{job_id}', status='NoValidImageData', tick_set=tick_set, job_id=job_id, file=job["file"], hash=job["hsh"], tick_downloading_job_started=tick_downloading_job_started, tick_downloading_job_retry_started=tick_downloading_job_retry_started, tick_downloading_job_retry_count=tick_downloading_job_retry_count, tick_downloading_job_ended=tick_downloading_job_ended, tick_downloading_image_started=tick_downloading_image_started, tick_downloading_image_retry_started=tick_downloading_image_retry_started, tick_downloading_image_retry_count=tick_downloading_image_retry_count, tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, tick_downloading_image_ended=tick_downloading_image_ended, ) else: tick_image_decoding_start = time.time() image = None try: image = Image.open(BytesIO(image_bytes)).copy() except KeyboardInterrupt: raise except: pass tick_image_decoding_ended = time.time() if image is None: upload_job( f'{base_address}job/{job_id}', status='ImageIsBroken', tick_set=tick_set, job_id=job_id, file=job["file"], hash=job["hsh"], tick_downloading_job_started=tick_downloading_job_started, tick_downloading_job_retry_started=tick_downloading_job_retry_started, tick_downloading_job_retry_count=tick_downloading_job_retry_count, tick_downloading_job_ended=tick_downloading_job_ended, tick_downloading_image_started=tick_downloading_image_started, tick_downloading_image_retry_started=tick_downloading_image_retry_started, tick_downloading_image_retry_count=tick_downloading_image_retry_count, tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, tick_downloading_image_ended=tick_downloading_image_ended, tick_image_decoding_start=tick_image_decoding_start, tick_image_decoding_ended=tick_image_decoding_ended, ) else: tick_image_thumbnailing_start = time.time() calculated_thumbnail_hashes = calculate_thumbnail_hashes(image) tick_image_thumbnailing_ended = time.time() tick_uploading_started = time.time() upload_job( f'{base_address}job/{job_id}', status='Complete', tick_set=tick_set, job_id=job_id, file=job["file"], hash=job["hsh"], calculated_thumbnail_hashes=calculated_thumbnail_hashes, tick_downloading_job_started=tick_downloading_job_started, tick_downloading_job_retry_started=tick_downloading_job_retry_started, tick_downloading_job_retry_count=tick_downloading_job_retry_count, tick_downloading_job_ended=tick_downloading_job_ended, tick_downloading_image_started=tick_downloading_image_started, tick_downloading_image_retry_started=tick_downloading_image_retry_started, tick_downloading_image_retry_count=tick_downloading_image_retry_count, tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, tick_downloading_image_ended=tick_downloading_image_ended, tick_image_decoding_start=tick_image_decoding_start, tick_image_decoding_ended=tick_image_decoding_ended, tick_image_thumbnailing_start=tick_image_thumbnailing_start, tick_image_thumbnailing_ended=tick_image_thumbnailing_ended, ) tick_uploading_ended = time.time() upload_log( f'{base_address}log/performance', worker_id, file=job["file"], hash=job["hsh"], tick_downloading_job_started=tick_downloading_job_started, tick_downloading_job_retry_started=tick_downloading_job_retry_started, tick_downloading_job_retry_count=tick_downloading_job_retry_count, tick_downloading_job_ended=tick_downloading_job_ended, tick_downloading_image_started=tick_downloading_image_started, tick_downloading_image_retry_started=tick_downloading_image_retry_started, tick_downloading_image_retry_count=tick_downloading_image_retry_count, tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, tick_downloading_image_ended=tick_downloading_image_ended, tick_image_decoding_start=tick_image_decoding_start, tick_image_decoding_ended=tick_image_decoding_ended, tick_image_thumbnailing_start=tick_image_thumbnailing_start, tick_image_thumbnailing_ended=tick_image_thumbnailing_ended, tick_uploading_started=tick_uploading_started, tick_uploading_ended=tick_uploading_ended, ) tick_uploading_ended = time.time() print(f"Done: {job['hsh']}") except KeyboardInterrupt: raise except: raise def kickstart(base_address: str): job_count: int = multiprocessing.cpu_count() * 2 # job_count = 1 hostname: str = socket.gethostname() with ProcessPoolExecutor(job_count) as pe: for job_seq in range(job_count): job_id = f'{hostname}-{job_seq}' def on_completed(job: Future): job.result() pe.submit( do_work, worker_id=job_id, base_address=base_address, ).add_done_callback(on_completed) def main(): if len(sys.argv) == 2: base_address = sys.argv[1] if not base_address.startswith('http'): base_address = 'http://'+base_address if not base_address.endswith('/'): base_address += '/' kickstart(base_address) else: print(f'Usage:\n {sys.argv[0]} ') if __name__ == "__main__": main()