#!/usr/bin/env python3 # -*- encoding: utf-8 -*- import datetime import importlib import shutil import time import traceback from pathlib import Path from time import sleep import colored ESC = '\033' LINK_MEMORY_SIZE = 64 def reverse_mapping_list(d: dict) -> dict: r = {k: list() for k in set(d.values())} for k, v in d.items(): r[v].append(k) return r def reverse_mapping(d: dict) -> dict: return {v: k for k, v in d.items()} def frequency_dict(l: list) -> dict: f = {e: 0 for e in set(l)} for e in l: f[e] += 1 return f def print_terminal(workers_state_path: Path, keep_to_next_cycle=None): jobs = list(map(lambda a: a.name, filter( lambda a: '=' not in a.name, workers_state_path.iterdir()))) if len(jobs) == 0: print(ESC+'[2J', end='', flush=True) print(ESC+'[0;0H', end='', flush=True) print( f'Waiting for jobs @ {datetime.datetime.now()}', end='', flush=True) print(ESC+'[K', flush=True) return jobs_lines = dict() jobs_queues = dict() jobs_enqueueds = dict() jobs_states = dict() jobs_dates = dict() try: jobs_lines = { job: int( workers_state_path.joinpath(job+'=line') .read_text() ) for job in jobs } jobs_queues = { job: int( workers_state_path.joinpath(job) .read_text().split(':')[1] ) for job in jobs } jobs_enqueueds = { job: int( workers_state_path.joinpath(job) .read_text().split(':')[2] ) for job in jobs } jobs_states = { job: ( workers_state_path.joinpath(job) .read_text().split(':')[0] ) for job in jobs } jobs_dates = { job: ( workers_state_path.joinpath(job) .stat().st_mtime_ns ) for job in jobs } jobs_files = { job: int( workers_state_path.joinpath(job) .read_text().split(':')[4] ) for job in jobs } jobs_bytes = { job: int( workers_state_path.joinpath(job) .read_text().split(':')[3] ) for job in jobs } except KeyboardInterrupt: raise except: return keep_to_next_cycle # print(ESC+'[2J', end='', flush=True) print(ESC+'[0;0H', end='', flush=True) if keep_to_next_cycle is None: keep_to_next_cycle = dict() displayatbottom = '' jobs_sizes = {job: len( job + '@' + (str(jobs_enqueueds[job])+'/' if jobs_states[job] in ('running', 'scrubbing') else '') + str(jobs_queues[job]) ) for job in jobs_lines.keys()} max_job_size = max([*jobs_sizes.values(), 0]) state_stats = frequency_dict(list(jobs_states.values())) links_stats = dict(waiting=0, enqueued=0, scrubbing=0, running1=0, running2=0, finished=0) for job in jobs: state = jobs_states[job] jq1 = jobs_queues.get(job, 0) jq2 = jobs_enqueueds.get(job, 0) if state not in ('running', 'scrubbing'): if state not in links_stats: links_stats[state] = 0 links_stats[state] += jq1 elif state == 'scrubbing': if state not in links_stats: links_stats[state] = 0 links_stats[state] += jq2 else: links_stats['running1'] += jq2 links_stats['running2'] += jq1-jq2 term_sz = shutil.get_terminal_size() per_column = term_sz.columns//(max_job_size+1) jobs_sorted = list(map(lambda a: a[1], sorted( reverse_mapping(jobs_lines).items()))) jobsdates_list = list(reversed(sorted(jobs_dates.values()))) jobs_daterank = { job: jobsdates_list.index(date) for job, date in jobs_dates.items()} bg_rank_color_names = [ # "grey_93", # 24 # "grey_89", # 23 # "grey_85", # 22 # "grey_82", # 21 # "grey_78", # 20 # "grey_74", # 19 # "grey_70", # 18 # "grey_66", # 17 "grey_62", # 16 "grey_58", # 15 "grey_54", # 14 "grey_50", # 13 "grey_46", # 12 "grey_42", # 11 "grey_39", # 10 "grey_35", # 9 "grey_30", # 8 "grey_27", # 7 "grey_23", # 6 "grey_19", # 5 "grey_15", # 4 "grey_11", # 3 "grey_7", # 2 "grey_3", # 1 ] bg_rank = [ colored.bg(clr) for clr in bg_rank_color_names ] bg_rank = bg_rank[-state_stats.get('running', 1):] bg_rang_programmed_len = len(bg_rank) bg_rank += ['']*(len(jobs_dates)-len(bg_rank)) link_processing_timestamps = keep_to_next_cycle.get( 'link_processing_timestamps', list()) for link_processing_timestamp in jobs_dates.values(): if link_processing_timestamp not in link_processing_timestamps: link_processing_timestamps.append(link_processing_timestamp) link_processing_timestamps = list(reversed(sorted(link_processing_timestamps)))[ :max(state_stats.get("running", 0), LINK_MEMORY_SIZE)] keep_to_next_cycle['link_processing_timestamps'] = link_processing_timestamps link_processing_deltas = list(map( lambda t: (t[0]-t[1])/10**9, zip( [time.time()*10**9]+link_processing_timestamps, link_processing_timestamps+[link_processing_timestamps[-1]] )))[0:-1] link_processing_deltas_avg = sum( link_processing_deltas+[0])/max(1, len(link_processing_deltas)) links_per_sec = 1/max(2**-30, link_processing_deltas_avg) link_processing_deltas_var = 0 if (l := len(link_processing_deltas)) > 0: diff = list(map( lambda lpd: (lpd - link_processing_deltas_avg), link_processing_deltas )) diffsqd = list(map( lambda d: d**2, diff )) link_processing_deltas_var = sum(diffsqd)/l download_pending_count = ( links_stats.get("waiting", 0) + links_stats.get("enqueued", 0) + links_stats.get("running1", 0) ) seconds_to_finish = download_pending_count*link_processing_deltas_avg datetime_when_finished = datetime.datetime.now( ) + datetime.timedelta(seconds=seconds_to_finish) time_to_finish = '%2dd %2dh %2dm %2ds' % ( seconds_to_finish//(3600*24), (seconds_to_finish % (3600*24))//3600, (seconds_to_finish % 3600)//60, seconds_to_finish % 60, ) displayatbottom += f'Speed: {"%.3f" % links_per_sec} links/s | ' displayatbottom += f'ETA: {time_to_finish} | ' displayatbottom += f'ETL: {datetime_when_finished} | ' displayatbottom += f'Error: \u00b1{"%6.2f" % (100*(link_processing_deltas_var**.5)/link_processing_deltas_avg)}%' # displayatbottom += str(link_processing_deltas) number1colors = dict( waiting=[ colored.fg('light_gray'), colored.attr('dim'), ], enqueued=[ colored.fg('light_red'), ], scrubbing=[ colored.fg('light_cyan') ], running=[ colored.fg('light_yellow') ], finished=[ colored.fg('light_green') ], ) number2colors = number1colors.copy() number2colors['running1'] = number2colors['running'] number2colors['running'] = [ colored.fg('light_cyan'), ] number2colors['scrubbing'] = [ colored.fg('light_magenta'), ] number2colors['running2'] = number2colors['running'] print( f'# Monitoring {len(jobs)} jobs @ {datetime.datetime.now()}', end='', flush=True) print(ESC+'[K', flush=True) print(ESC+'[K', flush=True) print('Workers: ' + '%.4f%% | ' % (100*state_stats.get("finished", 0)/(max(1, sum(state_stats.values())))) + colored.stylize( f'{state_stats.get("waiting", 0)} waiting', number1colors['waiting'], ) + ' - ' + colored.stylize( f'{state_stats.get("enqueued", 0)} enqueued', number1colors['enqueued'], ) + ' - ' + colored.stylize( f'{state_stats.get("running", 0)} running', number1colors['running'], ) + ' - ' + colored.stylize( f'{state_stats.get("finished", 0)} finished', number1colors['finished'], ), end='') print(ESC+'[K', flush=True) print('Links: ' + '%.4f%% | ' % (100*(links_stats.get("running2", 0)+links_stats.get("finished", 0))/(max(1, sum(links_stats.values())))) + colored.stylize( f'{links_stats.get("waiting", 0)} w.', number2colors['waiting'], ) + ' - ' + colored.stylize( f'{links_stats.get("enqueued", 0)} e.', number2colors['enqueued'], ) + ' - ' + colored.stylize( f'{links_stats.get("running1", 0)} s.', number2colors['running1'], ) + ' \u00b7 ' + colored.stylize( f'{links_stats.get("scrubbing", 0)} s.', number2colors['scrubbing'], ) + ' \u00b7 ' + colored.stylize( f'{links_stats.get("running2", 0)} d.', number2colors['running2'], ) + ' - ' + colored.stylize( f'{links_stats.get("finished", 0)} f.', number2colors['finished'], ), end='') print(ESC+'[K', flush=True) print('Latest updates gradient: ', end='') bg_rang_programmed_len_digits = len('%d' % (bg_rang_programmed_len+1)) for i in range(bg_rang_programmed_len+1): if i == bg_rang_programmed_len: print(' ', end='') print('-'*bg_rang_programmed_len_digits, end='') else: print(bg_rank[i], end='') print(' ', end='') print(('%%0%dd' % bg_rang_programmed_len_digits) % (i+1), end='') print(' ', end='') print(colored.attr('reset'), end='') print(ESC+'[K', flush=True) print(ESC+'[K', flush=True) current_jobs = jobs_sorted.copy() while len(current_jobs) > 0: for _ in range(per_column): if len(current_jobs) > 0: current_job, *current_jobs = current_jobs current_state = jobs_states[current_job] number1color = number1colors.get(current_state, '') number2color = number2colors.get(current_state, '') print(''.join(number1color), end='') print(bg_rank[jobs_daterank[current_job]], end='') print(' '*(max_job_size-jobs_sizes[current_job]), end='') print(current_job, end='') print('@', end='') if current_state in ('running', 'scrubbing'): print(''.join(number2color), end='') print(str(jobs_enqueueds[current_job]), end='') print(''.join(number1color), end='') print('/', end='') print(str(jobs_queues[current_job]), end='') print(colored.attr('reset'), end='') print(' ', end='') print(ESC+'[K', flush=False) print(displayatbottom, end=ESC+'[K', flush=True) print(ESC+'[0J', end='', flush=True) print(ESC+'[0;0H', end='', flush=True) return keep_to_next_cycle def do_cycle_sleep(): sleep(1/60) def main(): selfmodule_path = (Path(__file__) .absolute() .relative_to(Path('.').absolute() )) selfmodule_name = ( str(selfmodule_path.parent).replace('/', '.') + '.' + selfmodule_path.stem) selfmodule_name = ( selfmodule_name[1:] if selfmodule_name.startswith('.') else selfmodule_name) selfmodule = importlib.import_module(selfmodule_name) workers_state_path = Path('i_gdl_w') from_exc = False keep_to_next_cycle = None print(ESC+'[2J', end='', flush=True) print(ESC+'[0;0H', end='', flush=True) while workers_state_path.exists(): try: selfmodule = importlib.reload(selfmodule) if from_exc: from_exc = False print(ESC+'[2J', end='', flush=True) print(ESC+'[0;0H', end='', flush=True) keep_to_next_cycle = selfmodule.print_terminal( workers_state_path, keep_to_next_cycle) except KeyboardInterrupt: print(ESC+'[2J', end='', flush=True) print(ESC+'[0;0H', end='', flush=True) raise except BaseException: print(ESC+'[2J', end='', flush=True) print(ESC+'[0;0H', end='', flush=True) traceback.print_exc() from_exc = True sleep(1) selfmodule.do_cycle_sleep() print(ESC+'[0;0H', end='', flush=True) print(ESC+'[K', end='', flush=True) print('Queue is empty') if __name__ == "__main__": main()