reddit-image-wall-getter/hash_compressor_distributed/worker_thumbnailer.py

423 lines
19 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import hashlib
import json
import multiprocessing
import socket
import sys
import time
import traceback
from concurrent.futures import Future, ProcessPoolExecutor
from io import BytesIO, StringIO
from pathlib import Path
from typing import Any, Callable, Dict, List, Tuple, Type
import requests
from PIL import Image, ImageDraw, ImageOps
class HashMismatch(Exception):
pass
START = -(sys.maxsize // 2) + 0
END = -(sys.maxsize // 2) + 1
CENTER = -(sys.maxsize // 2) + 2
POS_TOP_LEFT = (START, START)
POS_CENTER_LEFT = (START, CENTER)
POS_BOTTOM_LEFT = (START, END)
POS_TOP_CENTER = (CENTER, START)
POS_CENTER_CENTER = (CENTER, CENTER)
POS_BOTTOM_CENTER = (CENTER, END)
POS_TOP_RIGHT = (END, START)
POS_CENTER_RIGHT = (END, CENTER)
POS_BOTTOM_RIGHT = (END, END)
P_TL = POS_TOP_LEFT
P_CL = POS_CENTER_LEFT
P_BL = POS_BOTTOM_LEFT
P_TC = POS_TOP_CENTER
P_CC = POS_CENTER_CENTER
P_BC = POS_BOTTOM_CENTER
P_TR = POS_TOP_RIGHT
P_CR = POS_CENTER_RIGHT
P_BR = POS_BOTTOM_RIGHT
def edge_propagation_scaling(image: Image.Image,
desired_size: Tuple[int, int],
paste_position: Tuple[int, int] = P_CC,
) -> Image.Image:
image = image.copy()
scaled = Image.new(image.mode, desired_size)
new_placement_: List[int] = list()
for isz, ssz, pp in zip(image.size, scaled.size, paste_position,):
if pp == START:
new_placement_.append(0)
elif pp == END:
new_placement_.append(ssz-isz)
elif pp == CENTER:
new_placement_.append((ssz-isz)//2)
else:
new_placement_.append(pp)
new_placement: Tuple[int, int] = new_placement_[0], new_placement_[1]
del new_placement_
scaled.paste(image, new_placement)
parts = dict(
# left, upper, right, lower
t=image.copy().crop(
(0, 0, image.size[0], 1)),
b=image.copy().crop(
(0, image.size[1]-1, image.size[0], image.size[1])),
l=image.copy().crop(
(0, 0, 1, image.size[1])),
r=image.copy().crop(
(image.size[0]-1, 0, image.size[0], image.size[1])),
)
if (sz := new_placement[1]) > 0:
part = parts['t'].copy()
resized = part.resize((part.size[0], sz))
scaled.paste(resized, (new_placement[0], 0))
if (sz := scaled.size[1]-(dsp := new_placement[1]+image.size[1])) > 0:
part = parts['b'].copy()
resized = part.resize((part.size[0], sz))
scaled.paste(resized, (new_placement[0], dsp))
if (sz := new_placement[0]) > 0:
part = parts['l'].copy()
resized = part.resize((sz, part.size[1]))
scaled.paste(resized, (0, new_placement[1]))
if (sz := scaled.size[0]-(dsp := new_placement[0]+image.size[0])) > 0:
part = parts['r'].copy()
resized = part.resize((sz, part.size[1]))
scaled.paste(resized, (dsp, new_placement[1]))
del parts
corners = dict(
tl=image.getpixel((0, 0)),
tr=image.getpixel((image.size[0]-1, 0)),
bl=image.getpixel((0, image.size[1]-1)),
br=image.getpixel((image.size[0]-1, image.size[1]-1)),
)
draw: ImageDraw.ImageDraw = ImageDraw.Draw(scaled)
szt = new_placement[1]
szb = scaled.size[1]-(dspb := new_placement[1]+image.size[1])
szl = new_placement[0]
szr = scaled.size[0]-(dspr := new_placement[0]+image.size[0])
if szt > 0 and szl > 0:
draw.rectangle(((0, 0), (szl-1, szt-1)), corners['tl'])
if szt > 0 and szr > 0:
draw.rectangle(((dspr, 0), (scaled.size[0], szt-1)), corners['tr'])
if szb > 0 and szl > 0:
draw.rectangle(((0, dspb), (szl-1, scaled.size[1])), corners['bl'])
if szb > 0 and szr > 0:
draw.rectangle(((dspr, dspb), scaled.size), corners['br'])
del dspr
del dspb
del szt
del szb
del szl
del szr
return scaled
def calculate_thumbnail_hashes(image: Image.Image) -> Dict[str, Dict[str, Dict[str, Dict[str, str]]]]:
out_dict: Dict[str, Dict[str, Dict[str, Dict[str, str]]]] = dict()
max_dimen = max(image.size)
for filling in [True, False]:
transparent_square = None
if filling:
transparent_square = edge_propagation_scaling(image, (max_dimen, max_dimen))
else:
transparent_square = Image.new('RGBA', (max_dimen, max_dimen))
transparent_square.paste(image, (
(max_dimen - image.size[0]) // 2,
(max_dimen - image.size[1]) // 2,
))
backgrounds: Dict[str, Dict[str, Dict[str, str]]] = dict()
for background in ['#000000', '#FFFFFF']:
backgrounded = Image.new('RGB', transparent_square.size, background)
backgrounded.paste(transparent_square)
sizes: Dict[str, Dict[str, str]] = dict()
for size in [4, 8, 16, 24, 32, 48, 64, 72, 96, 128]:
resized = backgrounded.copy()
resized = resized.resize((size, size))
bit_depths: Dict[str, str] = dict()
for bit_depth in range(1, 9):
posterized: Image.Image = resized.copy()
posterized = ImageOps.posterize(posterized, bit_depth)
bio = BytesIO()
posterized.save(bio, format='BMP')
hashsum = hexhashof(bio.getvalue(), hashlib.md5)
bit_depths[str(bit_depth)] = hashsum
sizes[str(size)] = bit_depths
backgrounds[background] = sizes
out_dict['fill' if filling else 'center'] = backgrounds
return out_dict
def hexhashof(bts: bytes, using: Callable[[], Any]) -> str:
m = using()
m.update(bts)
return m.hexdigest()
def upload_log(url, sender, **content):
sent = False
while not sent:
try:
requests.post(url, data={
'sender': json.dumps(sender),
'content': json.dumps(content),
}).raise_for_status()
sent = True
except:
traceback.print_exc()
def upload_job(url, **content):
sent = False
while not sent:
try:
requests.post(url, json=content).raise_for_status()
sent = True
except:
traceback.print_exc()
def do_work(base_address: str, worker_id: str):
while True:
try:
tick_set = time.time()
job_id = None
try:
job_id = requests.get(f'{base_address}job').text
except KeyboardInterrupt:
raise
except:
pass
if job_id is None:
continue
elif job_id == 'done':
break
else:
tick_downloading_job_started = time.time()
tick_downloading_job_retry_started = tick_downloading_job_started
tick_downloading_job_retry_count = 0
job = None
while job is None:
try:
tick_downloading_job_retry_started = time.time()
with requests.get(f'{base_address}job/{job_id}') as response:
response.raise_for_status()
job = response.json()
except KeyboardInterrupt:
raise
except:
tick_downloading_job_retry_count += 1
sio = StringIO()
traceback.print_exc(file=sio)
formatted_exception = sio.getvalue()
print(formatted_exception, file=sys.stderr)
upload_log(
f'{base_address}log/error',
worker_id,
during='JobDownload',
tick_set=tick_set,
traceback=formatted_exception,
job_id=job_id,
tick_downloading_job_started=tick_downloading_job_started,
tick_downloading_job_retry_started=tick_downloading_job_retry_started,
tick_downloading_job_retry_count=tick_downloading_job_retry_count,
)
tick_downloading_job_ended = time.time()
tick_downloading_image_started = time.time()
tick_downloading_image_retry_started = tick_downloading_image_started
tick_downloading_image_retry_count = 0
tick_downloading_image_retry_mismatch = 0
image_bytes = None
while image_bytes is None:
try:
tick_downloading_job_retry_started = time.time()
with requests.get(f'{base_address}{job["file"]}') as response:
if response.status_code == 404:
break
response.raise_for_status()
response.raw.decode_content = True
if hexhashof(response.content, hashlib.sha256) == job['hsh']:
image_bytes = response.content
else:
raise HashMismatch()
except KeyboardInterrupt:
raise
except BaseException as exception:
tick_downloading_image_retry_count += 1
if isinstance(exception, HashMismatch):
tick_downloading_image_retry_mismatch += 1
sio = StringIO()
traceback.print_exc(file=sio)
formatted_exception = sio.getvalue()
print(formatted_exception, file=sys.stderr)
upload_log(
f'{base_address}log/error',
worker_id,
during='ImageDownload',
tick_set=tick_set,
traceback=formatted_exception,
job_id=job_id,
file=job["file"],
hash=job["hsh"],
tick_downloading_job_started=tick_downloading_job_started,
tick_downloading_job_retry_started=tick_downloading_job_retry_started,
tick_downloading_job_retry_count=tick_downloading_job_retry_count,
tick_downloading_job_ended=tick_downloading_job_ended,
tick_downloading_image_started=tick_downloading_image_started,
tick_downloading_image_retry_started=tick_downloading_image_retry_started,
tick_downloading_image_retry_count=tick_downloading_image_retry_count,
tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch,
)
if tick_downloading_image_retry_mismatch >= 10:
break
tick_downloading_image_ended = time.time()
if image_bytes is None:
upload_job(
f'{base_address}job/{job_id}',
status='NoValidImageData',
tick_set=tick_set,
job_id=job_id,
file=job["file"],
hash=job["hsh"],
tick_downloading_job_started=tick_downloading_job_started,
tick_downloading_job_retry_started=tick_downloading_job_retry_started,
tick_downloading_job_retry_count=tick_downloading_job_retry_count,
tick_downloading_job_ended=tick_downloading_job_ended,
tick_downloading_image_started=tick_downloading_image_started,
tick_downloading_image_retry_started=tick_downloading_image_retry_started,
tick_downloading_image_retry_count=tick_downloading_image_retry_count,
tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch,
tick_downloading_image_ended=tick_downloading_image_ended,
)
else:
tick_image_decoding_start = time.time()
image = None
try:
image = Image.open(BytesIO(image_bytes)).copy()
except KeyboardInterrupt:
raise
except:
pass
tick_image_decoding_ended = time.time()
if image is None:
upload_job(
f'{base_address}job/{job_id}',
status='ImageIsBroken',
tick_set=tick_set,
job_id=job_id,
file=job["file"],
hash=job["hsh"],
tick_downloading_job_started=tick_downloading_job_started,
tick_downloading_job_retry_started=tick_downloading_job_retry_started,
tick_downloading_job_retry_count=tick_downloading_job_retry_count,
tick_downloading_job_ended=tick_downloading_job_ended,
tick_downloading_image_started=tick_downloading_image_started,
tick_downloading_image_retry_started=tick_downloading_image_retry_started,
tick_downloading_image_retry_count=tick_downloading_image_retry_count,
tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch,
tick_downloading_image_ended=tick_downloading_image_ended,
tick_image_decoding_start=tick_image_decoding_start,
tick_image_decoding_ended=tick_image_decoding_ended,
)
else:
tick_image_thumbnailing_start = time.time()
calculated_thumbnail_hashes = calculate_thumbnail_hashes(image)
tick_image_thumbnailing_ended = time.time()
tick_uploading_started = time.time()
upload_job(
f'{base_address}job/{job_id}',
status='Complete',
tick_set=tick_set,
job_id=job_id,
file=job["file"],
hash=job["hsh"],
calculated_thumbnail_hashes=calculated_thumbnail_hashes,
tick_downloading_job_started=tick_downloading_job_started,
tick_downloading_job_retry_started=tick_downloading_job_retry_started,
tick_downloading_job_retry_count=tick_downloading_job_retry_count,
tick_downloading_job_ended=tick_downloading_job_ended,
tick_downloading_image_started=tick_downloading_image_started,
tick_downloading_image_retry_started=tick_downloading_image_retry_started,
tick_downloading_image_retry_count=tick_downloading_image_retry_count,
tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch,
tick_downloading_image_ended=tick_downloading_image_ended,
tick_image_decoding_start=tick_image_decoding_start,
tick_image_decoding_ended=tick_image_decoding_ended,
tick_image_thumbnailing_start=tick_image_thumbnailing_start,
tick_image_thumbnailing_ended=tick_image_thumbnailing_ended,
)
tick_uploading_ended = time.time()
upload_log(
f'{base_address}log/performance',
worker_id,
file=job["file"],
hash=job["hsh"],
tick_downloading_job_started=tick_downloading_job_started,
tick_downloading_job_retry_started=tick_downloading_job_retry_started,
tick_downloading_job_retry_count=tick_downloading_job_retry_count,
tick_downloading_job_ended=tick_downloading_job_ended,
tick_downloading_image_started=tick_downloading_image_started,
tick_downloading_image_retry_started=tick_downloading_image_retry_started,
tick_downloading_image_retry_count=tick_downloading_image_retry_count,
tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch,
tick_downloading_image_ended=tick_downloading_image_ended,
tick_image_decoding_start=tick_image_decoding_start,
tick_image_decoding_ended=tick_image_decoding_ended,
tick_image_thumbnailing_start=tick_image_thumbnailing_start,
tick_image_thumbnailing_ended=tick_image_thumbnailing_ended,
tick_uploading_started=tick_uploading_started,
tick_uploading_ended=tick_uploading_ended,
)
tick_uploading_ended = time.time()
print(f"Done: {job['hsh']}")
except KeyboardInterrupt:
raise
except:
raise
def kickstart(base_address: str):
job_count: int = multiprocessing.cpu_count() * 2
# job_count = 1
hostname: str = socket.gethostname()
with ProcessPoolExecutor(job_count) as pe:
for job_seq in range(job_count):
job_id = f'{hostname}-{job_seq}'
def on_completed(job: Future):
job.result()
pe.submit(
do_work,
worker_id=job_id,
base_address=base_address,
).add_done_callback(on_completed)
def main():
if len(sys.argv) == 2:
base_address = sys.argv[1]
if not base_address.startswith('http'):
base_address = 'http://'+base_address
if not base_address.endswith('/'):
base_address += '/'
kickstart(base_address)
else:
print(f'Usage:\n {sys.argv[0]} <ip_address:port>')
if __name__ == "__main__":
main()