reddit-image-wall-getter/hash_compressor_distributed/worker.py

276 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import bz2
import hashlib
import json
import lzma
import multiprocessing
import socket
import sys
import time
import traceback
import zlib
from concurrent.futures import Future, ProcessPoolExecutor
from io import BytesIO, StringIO
from pathlib import Path
from typing import Any, Callable, Dict, List, Tuple, Type
import requests
import zstandard as zstd
COMPRESSORS: List[Tuple[str, Callable[[bytes], bytes]]] = [
('uncompressed', lambda uncompressed: uncompressed),
('zlib', lambda uncompressed: zlib.compress(uncompressed, 9)),
('bz2', lambda uncompressed: bz2.compress(uncompressed, 9)),
('lzma', lzma.compress),
('zstd', lambda uncompressed: zstd.ZstdCompressor(
level=22).compress(uncompressed)),
]
class HashMismatch(Exception):
pass
def check_best_compressions(uncompressed: bytes) -> Dict[str, Tuple[int, float]]:
algos = dict()
for name, callback in COMPRESSORS:
time_start = time.time()
compressed_size = len(callback(uncompressed))
time_end = time.time()
algos[name] = (compressed_size, time_end - time_start)
return algos
def hexhashof(bts: bytes, using: Callable[[], Any]) -> str:
m = using()
m.update(bts)
return m.hexdigest()
def upload_log(url, sender, **content):
sent = False
while not sent:
try:
requests.post(url, data={
'sender': json.dumps(sender),
'content': json.dumps(content),
}).raise_for_status()
sent = True
except:
traceback.print_exc()
def upload_job(url, **content):
sent = False
while not sent:
try:
requests.post(url, json=content).raise_for_status()
sent = True
except:
traceback.print_exc()
def do_work(base_address: str, worker_id: str):
while True:
try:
tick_set = time.time()
job_id = None
try:
with requests.get(f'{base_address}job') as response:
response.raise_for_status()
job_id = response.text
except KeyboardInterrupt:
raise
except:
pass
if job_id is None:
continue
elif job_id == 'done':
break
else:
tick_downloading_job_started = time.time()
tick_downloading_job_retry_started = tick_downloading_job_started
tick_downloading_job_retry_count = 0
job = None
while job is None:
try:
tick_downloading_job_retry_started = time.time()
with requests.get(f'{base_address}job/{job_id}') as response:
response.raise_for_status()
job = response.json()
except KeyboardInterrupt:
raise
except:
tick_downloading_job_retry_count += 1
sio = StringIO()
traceback.print_exc(file=sio)
formatted_exception = sio.getvalue()
print(formatted_exception, file=sys.stderr)
upload_log(
f'{base_address}log/error',
worker_id,
during='JobDownload',
tick_set=tick_set,
traceback=formatted_exception,
job_id=job_id,
tick_downloading_job_started=tick_downloading_job_started,
tick_downloading_job_retry_started=tick_downloading_job_retry_started,
tick_downloading_job_retry_count=tick_downloading_job_retry_count,
)
tick_downloading_job_ended = time.time()
tick_downloading_image_started = time.time()
tick_downloading_image_retry_started = tick_downloading_image_started
tick_downloading_image_retry_count = 0
tick_downloading_image_retry_mismatch = 0
image_bytes = None
while image_bytes is None:
try:
tick_downloading_job_retry_started = time.time()
with requests.get(f'{base_address}{job["file"]}') as response:
if response.status_code == 404:
break
response.raise_for_status()
response.raw.decode_content = True
if hexhashof(response.content, hashlib.sha256) == job['hsh']:
image_bytes = response.content
else:
raise HashMismatch()
except KeyboardInterrupt:
raise
except BaseException as exception:
tick_downloading_image_retry_count += 1
if isinstance(exception, HashMismatch):
tick_downloading_image_retry_mismatch += 1
sio = StringIO()
traceback.print_exc(file=sio)
formatted_exception = sio.getvalue()
print(formatted_exception, file=sys.stderr)
upload_log(
f'{base_address}log/error',
worker_id,
during='ImageDownload',
tick_set=tick_set,
traceback=formatted_exception,
job_id=job_id,
file=job["file"],
hash=job["hsh"],
tick_downloading_job_started=tick_downloading_job_started,
tick_downloading_job_retry_started=tick_downloading_job_retry_started,
tick_downloading_job_retry_count=tick_downloading_job_retry_count,
tick_downloading_job_ended=tick_downloading_job_ended,
tick_downloading_image_started=tick_downloading_image_started,
tick_downloading_image_retry_started=tick_downloading_image_retry_started,
tick_downloading_image_retry_count=tick_downloading_image_retry_count,
tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch,
)
if tick_downloading_image_retry_mismatch >= 10:
break
tick_downloading_image_ended = time.time()
if image_bytes is None:
upload_job(
f'{base_address}job/{job_id}',
status='NoValidImageData',
tick_set=tick_set,
job_id=job_id,
file=job["file"],
hash=job["hsh"],
tick_downloading_job_started=tick_downloading_job_started,
tick_downloading_job_retry_started=tick_downloading_job_retry_started,
tick_downloading_job_retry_count=tick_downloading_job_retry_count,
tick_downloading_job_ended=tick_downloading_job_ended,
tick_downloading_image_started=tick_downloading_image_started,
tick_downloading_image_retry_started=tick_downloading_image_retry_started,
tick_downloading_image_retry_count=tick_downloading_image_retry_count,
tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch,
tick_downloading_image_ended=tick_downloading_image_ended,
)
else:
tick_image_compress_start = time.time()
compressions = check_best_compressions(image_bytes)
tick_image_compress_ended = time.time()
tick_uploading_started = time.time()
upload_job(
f'{base_address}job/{job_id}',
status='Complete',
worker=worker_id,
tick_set=tick_set,
job_id=job_id,
file=job["file"],
hash=job["hsh"],
compressions=compressions,
tick_downloading_job_started=tick_downloading_job_started,
tick_downloading_job_retry_started=tick_downloading_job_retry_started,
tick_downloading_job_retry_count=tick_downloading_job_retry_count,
tick_downloading_job_ended=tick_downloading_job_ended,
tick_downloading_image_started=tick_downloading_image_started,
tick_downloading_image_retry_started=tick_downloading_image_retry_started,
tick_downloading_image_retry_count=tick_downloading_image_retry_count,
tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch,
tick_downloading_image_ended=tick_downloading_image_ended,
tick_image_compress_start=tick_image_compress_start,
tick_image_compress_ended=tick_image_compress_ended,
)
tick_uploading_ended = time.time()
upload_log(
f'{base_address}log/performance',
worker_id,
file=job["file"],
hash=job["hsh"],
tick_downloading_job_started=tick_downloading_job_started,
tick_downloading_job_retry_started=tick_downloading_job_retry_started,
tick_downloading_job_retry_count=tick_downloading_job_retry_count,
tick_downloading_job_ended=tick_downloading_job_ended,
tick_downloading_image_started=tick_downloading_image_started,
tick_downloading_image_retry_started=tick_downloading_image_retry_started,
tick_downloading_image_retry_count=tick_downloading_image_retry_count,
tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch,
tick_downloading_image_ended=tick_downloading_image_ended,
tick_image_compress_start=tick_image_compress_start,
tick_image_compress_ended=tick_image_compress_ended,
tick_uploading_started=tick_uploading_started,
tick_uploading_ended=tick_uploading_ended,
)
tick_uploading_ended = time.time()
print(f"Done: {job['hsh']}")
except KeyboardInterrupt:
raise
except:
raise
def kickstart(base_address: str):
job_count: int = multiprocessing.cpu_count() * 2
# job_count = 1
hostname: str = socket.gethostname()
with ProcessPoolExecutor(job_count) as pe:
for job_seq in range(job_count):
job_id = f'{hostname}-{job_seq}'
def on_completed(job: Future):
job.result()
pe.submit(
do_work,
worker_id=job_id,
base_address=base_address,
).add_done_callback(on_completed)
print('Ready')
def main():
if len(sys.argv) == 2:
base_address = sys.argv[1]
if not base_address.startswith('http'):
base_address = 'http://'+base_address
if not base_address.endswith('/'):
base_address += '/'
kickstart(base_address)
else:
print(f'Usage:\n {sys.argv[0]} <ip_address:port>')
if __name__ == "__main__":
main()