site_css_reliability/server-snpshtr.py

467 lines
15 KiB
Python
Raw Permalink Normal View History

2023-09-03 23:26:56 +00:00
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import hashlib
import json
2023-09-09 01:00:47 +00:00
import mimetypes
2023-09-05 03:05:54 +00:00
import shutil
2023-09-03 23:26:56 +00:00
import time
2023-09-09 01:00:47 +00:00
import zipfile
from collections import defaultdict
from io import BytesIO
from pathlib import Path
2023-09-05 03:05:54 +00:00
from typing import Any
2023-09-03 23:26:56 +00:00
from uuid import uuid4
2023-09-09 01:00:47 +00:00
import werkzeug.exceptions
from flask import (Flask, jsonify, make_response, redirect, request, send_file,
send_from_directory)
from flask_cors import CORS
2023-09-03 23:26:56 +00:00
APIKEY = Path('apikey.txt').read_text(encoding='utf-8').strip()
app = Flask(__name__, instance_relative_config=True)
2023-09-05 03:26:10 +00:00
CORS(app)
2023-09-03 23:26:56 +00:00
UPTIME_DB = Path('uptime.json')
if not UPTIME_DB.exists():
UPTIME_DB.write_bytes(b'{}')
2023-09-06 00:51:16 +00:00
UPTIME2_DB = Path('uptime2.json')
if not UPTIME2_DB.exists():
UPTIME2_DB.write_bytes(b'{}')
2023-09-05 02:26:54 +00:00
ID_DB = Path('ids.json')
if not ID_DB.exists():
ID_DB.write_bytes(b'{}')
CRON_DB = Path('crons.json')
if not CRON_DB.exists():
CRON_DB.write_bytes(b'[]')
2023-09-05 03:05:54 +00:00
JOB_DB = Path('jobs.json')
if not JOB_DB.exists():
JOB_DB.write_bytes(b'[]')
2023-09-06 00:51:16 +00:00
ANAL_DB = Path('analysis.json')
if not ANAL_DB.exists():
ANAL_DB.write_bytes(b'[]')
2023-09-03 23:26:56 +00:00
JOBS_PATH = Path('jobs')
for x in Path('.').glob('*.temp'):
x.unlink()
del x
2023-09-05 02:26:54 +00:00
JOB_DEFAULTS = dict(
2023-09-08 15:20:13 +00:00
hideScrollbar=1,
2023-09-05 02:26:54 +00:00
wait=0,
scrolltoJs='',
scrolltox=0,
scrolltoy=0,
preRunJs='',
waitJs=0,
checkReadyJs='',
)
2023-09-03 23:26:56 +00:00
class TempFile:
def __init__(self, place='/tmp') -> None:
self.file = Path(f'{place}/{uuid4().hex}.temp')
def __enter__(self) -> Path:
self.file.touch()
return self.file
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
if self.file.exists():
self.file.unlink()
2023-09-05 02:26:54 +00:00
@classmethod
def save_bytes(cls, dest: Path, b: bytes):
with cls('.') as f:
f.write_bytes(b)
dest.parent.mkdir(parents=True, exist_ok=True)
f.rename(dest)
@classmethod
def save_utf8(cls, dest: Path, s: str):
with cls('.') as f:
f.write_text(s, encoding='utf-8')
dest.parent.mkdir(parents=True, exist_ok=True)
f.rename(dest)
def next_id(name: str) -> int:
ids = json.loads(ID_DB.read_text(encoding='utf-8'))
if name not in ids:
ids[name] = 0
ids[name] += 1
2023-09-09 12:37:08 +00:00
TempFile.save_utf8(ID_DB, json.dumps(ids, indent=2))
2023-09-05 02:26:54 +00:00
return ids[name]
2023-09-03 23:26:56 +00:00
@app.route('/', methods=['HEAD', 'OPTIONS', 'GET'])
def index():
return 'Nothing to see here'
2023-09-05 03:05:54 +00:00
def get_updated_job_list() -> list[dict[str, Any]]:
tm = time.time()
2023-09-06 00:51:16 +00:00
anals: list[dict[str, Any]] = json.loads(
ANAL_DB.read_text(encoding='utf-8'))
2023-09-05 03:05:54 +00:00
crons: list[dict[str, Any]] = json.loads(
CRON_DB.read_text(encoding='utf-8'))
jobs: list[dict[str, Any]] = json.loads(JOB_DB.read_text(encoding='utf-8'))
pendingCrons: list[dict[str, Any]] = (
[*filter(lambda c: (tm-c['hours']*3600) > c['lastScheduledSec'], crons)])
for pendingCron in pendingCrons:
pendingCron['lastScheduledSec'] = tm
jobs.append(dict(jobId=next_id('job'), **pendingCron))
if len(pendingCrons):
cronId2historySize: dict[int, int] = dict(
map(lambda c: (c['cronId'], c['historySize']), crons))
2023-09-06 00:51:16 +00:00
#
2023-09-05 03:05:54 +00:00
cronId2jobpos: dict[int, list[int]] = defaultdict(list)
for i, job in enumerate(jobs):
cronId2jobpos[job['cronId']].append(i)
toDiscardPos: list[int] = list()
for cronId, jobspos in cronId2jobpos.items():
2023-09-05 03:13:40 +00:00
toDiscardPos += jobspos[:-round(cronId2historySize[cronId])]
2023-09-06 00:51:16 +00:00
#
for pos in toDiscardPos:
anals = [*filter(
lambda a: a["jobId"] != jobs[pos]["jobId"],
anals)]
#
2023-09-05 03:05:54 +00:00
toDiscardPos.sort()
toDiscardPos.reverse()
for pos in toDiscardPos:
job = jobs.pop(pos)
job_path = JOBS_PATH.joinpath(f'{job["jobId"]:020d}')
shutil.rmtree(job_path, ignore_errors=True)
2023-09-09 12:37:08 +00:00
TempFile.save_utf8(ANAL_DB, json.dumps(anals, indent=2))
TempFile.save_utf8(CRON_DB, json.dumps(crons, indent=2))
TempFile.save_utf8(JOB_DB, json.dumps(jobs, indent=2))
2023-09-05 03:05:54 +00:00
return jobs
2023-09-03 23:26:56 +00:00
def worker_lastseen_update(name: str):
namestrip = name.strip()
if len(namestrip) > 0:
2023-09-05 02:26:54 +00:00
uptimes: dict[str, float] = json.loads(
UPTIME_DB.read_text(encoding='utf-8'))
uptimes[namestrip] = time.time()
2023-09-09 12:37:08 +00:00
TempFile.save_utf8(UPTIME_DB, json.dumps(uptimes, indent=2))
2023-09-05 02:26:54 +00:00
2023-09-06 00:51:16 +00:00
def analizer_lastseen_update(name: str):
namestrip = name.strip()
if len(namestrip) > 0:
uptimes: dict[str, float] = json.loads(
UPTIME2_DB.read_text(encoding='utf-8'))
uptimes[namestrip] = time.time()
2023-09-09 12:37:08 +00:00
TempFile.save_utf8(UPTIME2_DB, json.dumps(uptimes, indent=2))
2023-09-06 00:51:16 +00:00
2023-09-05 02:26:54 +00:00
def worker_get_next_job(worker: str) -> dict | None:
2023-09-05 03:05:54 +00:00
for job in get_updated_job_list():
if not JOBS_PATH.joinpath(f'{job["jobId"]:020d}/{worker}.zip').exists():
return job
2023-09-05 02:26:54 +00:00
return None
2023-09-03 23:26:56 +00:00
2023-09-06 00:51:16 +00:00
def get_updated_analysis_list() -> list[dict[str, Any]]:
jobs = raw_job_submission_get()
anals: list[dict[str, Any]] = json.loads(
ANAL_DB.read_text(encoding='utf-8'))
upd: bool = False
for job in jobs:
cronId = job['cronId']
jobId = job['jobId']
anal = next(filter(lambda a: a['jobId'] == jobId, anals), None)
completeness = len([*filter(
lambda f: f is not None,
job['workers'].values()
)])
if anal is None or anal['completeness'] != completeness:
upd = True
anal2 = dict(
cronId=cronId,
jobId=jobId,
finished=False,
assignee=None,
assigneeTime=.0,
completeness=completeness,
workers=job['workers'],
analysisFile=None,
analysis=None,
)
if anal is None:
anals.append(anal2)
else:
anal.update(anal2)
if upd:
2023-09-09 12:37:08 +00:00
TempFile.save_utf8(ANAL_DB, json.dumps(anals, indent=2))
2023-09-06 00:51:16 +00:00
return anals
def analyzer_get_next_job(worker: str) -> dict | None:
tm = time.time()
anals = get_updated_analysis_list()
for anal in anals:
if not anal['finished'] and anal['completeness'] > 0:
if anal['assignee'] == worker and anal['assigneeTime']+300 < tm:
return anal
elif not anal['assignee'] or anal['assigneeTime']+300 >= tm:
anal['assignee'] = worker
anal['assigneeTime'] = tm
2023-09-09 12:37:08 +00:00
TempFile.save_utf8(ANAL_DB, json.dumps(anals, indent=2))
2023-09-06 00:51:16 +00:00
return anal
else:
# this
# unfinished job
# is considered to be
# assigned and
# still running
# so
# try next job
pass
# there is no analysis to be ran
return None
2023-09-05 03:26:10 +00:00
@app.route('/job/next', methods=['HEAD', 'OPTIONS', 'GET'])
2023-09-03 23:26:56 +00:00
def job_next_get():
if APIKEY != request.args.get('key', '').strip():
resp = make_response('wrong value for GET parameter: key')
resp.status_code = 404
return resp
2023-09-05 02:26:54 +00:00
worker = request.args.get('worker', '')
worker_lastseen_update(worker)
next_job = worker_get_next_job(worker)
if next_job is None:
resp = make_response('no new job')
resp.status_code = 404
return resp
return jsonify({**JOB_DEFAULTS, **next_job})
2023-09-03 23:26:56 +00:00
@app.route('/job', methods=['POST'])
def job_post():
if APIKEY != request.args.get('key', '').strip():
resp = make_response('wrong value for GET parameter: key')
resp.status_code = 404
return resp
worker = request.args.get('worker', '')
if worker.strip() == '':
raise Exception('Unknown worker')
worker_lastseen_update(worker)
2023-09-05 03:34:53 +00:00
next_job = worker_get_next_job(worker)
2023-09-03 23:26:56 +00:00
jobId = int(request.args.get('jobId', '0'))
2023-09-05 03:34:53 +00:00
if next_job['jobId'] != jobId:
raise ValueError('Wrong job')
2023-09-03 23:26:56 +00:00
hashed = request.args.get('sha256', '')
zfb = request.data
m = hashlib.sha256()
m.update(zfb)
h = m.hexdigest()
if h != hashed:
raise ValueError('Sent data was not received right')
2023-09-05 02:26:54 +00:00
TempFile.save_bytes(JOBS_PATH.joinpath(f'{jobId:020d}/{worker}.zip'), zfb)
2023-09-03 23:26:56 +00:00
return jsonify('OK')
2023-09-05 02:26:54 +00:00
2023-09-05 03:26:10 +00:00
@app.route('/job', methods=['HEAD', 'OPTIONS', 'GET'])
2023-09-05 03:18:04 +00:00
def job_get():
return send_file(JOB_DB)
2023-09-06 00:51:16 +00:00
def raw_job_submission_get() -> list[dict[str, Any]]:
2023-09-05 03:34:53 +00:00
jobs = json.loads(JOB_DB.read_text(encoding='utf-8'))
uptimes = json.loads(UPTIME_DB.read_text(encoding='utf-8'))
for job in jobs:
2023-09-05 03:36:09 +00:00
job['workers'] = dict()
2023-09-05 03:34:53 +00:00
for worker in uptimes:
workerzip = JOBS_PATH.joinpath(f'{job["jobId"]:020d}/{worker}.zip')
job['workers'][worker] = None if not workerzip.exists() else str(workerzip)
2023-09-06 00:51:16 +00:00
return jobs
@app.route('/job/submission', methods=['HEAD', 'OPTIONS', 'GET'])
def job_submission_get():
return jsonify(raw_job_submission_get())
@app.route('/analysis', methods=['HEAD', 'OPTIONS', 'GET'])
def analysis_get():
return send_file(ANAL_DB)
@app.route('/analysis/next', methods=['HEAD', 'OPTIONS', 'GET'])
def analysis_next_get():
if APIKEY != request.args.get('key', '').strip():
resp = make_response('wrong value for GET parameter: key')
resp.status_code = 404
return resp
worker = request.args.get('worker', '')
if worker.strip() == '':
raise Exception('Unknown worker')
analizer_lastseen_update(worker)
next_job = analyzer_get_next_job(worker)
if next_job is None:
resp = make_response('no new job')
resp.status_code = 404
return resp
2023-09-08 15:20:13 +00:00
return jsonify(next_job)
2023-09-06 00:51:16 +00:00
@app.route('/analysis', methods=['POST'])
def analysis_post():
if APIKEY != request.args.get('key', '').strip():
resp = make_response('wrong value for GET parameter: key')
resp.status_code = 404
return resp
worker = request.args.get('worker', '')
if worker.strip() == '':
raise Exception('Unknown worker')
analizer_lastseen_update(worker)
next_anal = analyzer_get_next_job(worker)
jobId = int(request.args.get('jobId', '0'))
completeness = int(request.args.get('completeness', '0'))
2023-09-08 18:53:08 +00:00
if next_anal['assignee'] != worker:
2023-09-06 00:51:16 +00:00
raise ValueError('Wrong job')
if next_anal['jobId'] != jobId:
raise ValueError('Wrong job')
if next_anal['completeness'] != completeness:
raise ValueError('Wrong job')
hashed = request.args.get('sha256', '')
zfb = request.data
m = hashlib.sha256()
m.update(zfb)
h = m.hexdigest()
if h != hashed:
raise ValueError('Sent data was not received right')
analysisFile = JOBS_PATH.joinpath(f'{jobId:020d}/analysis.zip')
TempFile.save_bytes(analysisFile, zfb)
zf = zipfile.ZipFile(BytesIO(zfb))
anals: list[dict[str, Any]] = json.loads(
ANAL_DB.read_text(encoding='utf-8'))
for anal in anals:
if anal['jobId'] == next_anal['jobId']:
anal['finished'] = True
anal['analysisFile'] = str(analysisFile)
anal['analysis'] = json.loads(
zf.read('analysis.json').decode(encoding='utf-8'))
break
2023-09-09 12:37:08 +00:00
TempFile.save_utf8(ANAL_DB, json.dumps(anals, indent=2))
2023-09-06 00:51:16 +00:00
return jsonify('OK')
2023-09-05 03:34:53 +00:00
2023-09-05 03:26:10 +00:00
@app.route('/uptime', methods=['HEAD', 'OPTIONS', 'GET'])
2023-09-05 03:18:04 +00:00
def uptime_get():
return send_file(UPTIME_DB)
2023-09-06 00:51:16 +00:00
@app.route('/uptime2', methods=['HEAD', 'OPTIONS', 'GET'])
def uptime2_get():
return send_file(UPTIME2_DB)
2023-09-05 03:26:10 +00:00
@app.route('/cron', methods=['HEAD', 'OPTIONS', 'GET'])
2023-09-05 02:26:54 +00:00
def cron():
return send_file(CRON_DB)
2023-09-05 03:26:10 +00:00
@app.route('/cron/form', methods=['HEAD', 'OPTIONS', 'GET'])
def cron_form_get():
if request.args.get('apikey', '').strip() and APIKEY != request.args.get('apikey', '').strip():
return redirect('/cron/form')
return send_file(Path('cronform.html'))
@app.route('/cron/form', methods=['POST'])
def cron_form_post():
if APIKEY != request.form.get('apikey', '').strip():
return redirect('/cron/form')
elif request.form['action'] == 'add':
cronId = next_id('cron')
cron = {
**JOB_DEFAULTS,
**dict(
cronId=cronId,
url=request.form['url'].strip(),
hours=float(request.form['hours'].strip()),
historySize=float(request.form['historySize'].strip()),
lastScheduledSec=time.time()-3600 *
float(request.form['hours'].strip()),
preRunJs=request.form['preRunJs'].strip(),
wait=float(request.form['wait'].strip()),
scrolltoJs=request.form['scrolltoJs'].strip(),
scrolltox=float(request.form['scrolltox'].strip()),
scrolltoy=float(request.form['scrolltoy'].strip()),
checkReadyJs=request.form['checkReadyJs'].strip(),
waitJs=float(request.form['waitJs'].strip()),
)
}
crons = json.loads(CRON_DB.read_text(encoding='utf-8'))
crons.append(cron)
2023-09-09 12:37:08 +00:00
TempFile.save_utf8(CRON_DB, json.dumps(crons, indent=2))
2023-09-05 03:26:10 +00:00
return redirect('/cron/form?message=added%20successfully&apikey=' + request.form['apikey'])
elif request.form['action'] == 'delete':
cronId = int(request.form['cronId'].strip())
crons = json.loads(CRON_DB.read_text(encoding='utf-8'))
crons = [*filter(lambda c: c['cronId'] != cronId, crons)]
2023-09-09 12:37:08 +00:00
TempFile.save_utf8(CRON_DB, json.dumps(crons, indent=2))
2023-09-05 03:26:10 +00:00
return redirect('/cron/form?message=deleted%20successfully&apikey=' + request.form['apikey'])
2023-09-08 15:20:13 +00:00
@app.route('/jobs/<path:path>', methods=['HEAD', 'OPTIONS', 'GET'])
def jobs_static(path):
return send_from_directory('jobs', path)
2023-09-09 01:00:47 +00:00
@app.route('/unzip/jobs', methods=['HEAD', 'OPTIONS', 'GET'])
def unzip_jobs():
return jsonify([*map(lambda a: a.name, Path('jobs').iterdir())])
@app.route('/unzip/jobs/<path:path>', methods=['HEAD', 'OPTIONS', 'GET'])
def unzip_jobs_path(path):
target_zip = Path('jobs').joinpath(path)
if not str(target_zip.resolve()).startswith(str(Path('jobs').resolve())):
raise werkzeug.exceptions.NotAcceptable()
if not target_zip.exists():
raise werkzeug.exceptions.NotFound()
if target_zip.is_dir():
return jsonify([*map(lambda a: a.name, target_zip.iterdir())])
with zipfile.ZipFile(target_zip, mode='r') as zf:
return send_file(
BytesIO(json.dumps(
2023-09-09 12:37:08 +00:00
[i.filename for i in zf.infolist()], indent=2).encode('utf-8')),
2023-09-09 01:00:47 +00:00
last_modified=target_zip.stat().st_mtime,
mimetype='application/json'
)
@app.route('/unzip/jobs/<path:path>.zip/<path:zippath>', methods=['HEAD', 'OPTIONS', 'GET'])
def unzip_jobs_path_inner(path, zippath):
target_zip = Path('jobs').joinpath(path+'.zip')
if not str(target_zip.resolve()).startswith(str(Path('jobs').resolve())):
raise werkzeug.exceptions.NotAcceptable()
if not target_zip.exists():
raise werkzeug.exceptions.NotFound()
if zippath in ('', '/'):
return unzip_jobs_path(path)
with zipfile.ZipFile(target_zip, mode='r') as zf:
try:
zd = zf.read(zippath)
except KeyError:
raise werkzeug.exceptions.NotFound()
return send_file(
BytesIO(zd),
last_modified=target_zip.stat().st_mtime,
mimetype=mimetypes.guess_type(zippath)[0]
)