From 926d9e08ad2d0a3eb7e68386c5f31019b8e6aa6b Mon Sep 17 00:00:00 2001 From: Adler Neves Date: Fri, 5 Jun 2020 19:19:45 -0300 Subject: [PATCH] has glitch --- .gitignore | 2 + d/.gitignore | 3 - d/About.md | 1 - error_hashes.txt | 9 - i/.gitignore | 3 - i/About.md | 1 - r/.gitignore | 3 - r/About.md | 3 - reddit_imgs/display_current_download.py | 98 +++-- reddit_imgs/display_fetch_futures.py | 412 ++++++++++++++++++ reddit_imgs/fetch2.py | 239 ++++++++-- reddit_imgs/runner.py | 4 +- reddit_imgs/sync.py | 5 +- reddit_imgs/system/.gitignore | 1 + reddit_imgs/system/edge_propagation_scaler.py | 131 ++++++ reddit_imgs/system/eps_test.png | Bin 0 -> 491 bytes showfetchworkersstate.py | 8 + w/.gitignore | 3 - w/About.md | 1 - 19 files changed, 834 insertions(+), 93 deletions(-) delete mode 100644 d/.gitignore delete mode 100644 d/About.md delete mode 100644 error_hashes.txt delete mode 100644 i/.gitignore delete mode 100644 i/About.md delete mode 100644 r/.gitignore delete mode 100644 r/About.md create mode 100644 reddit_imgs/display_fetch_futures.py create mode 100644 reddit_imgs/system/.gitignore create mode 100755 reddit_imgs/system/edge_propagation_scaler.py create mode 100644 reddit_imgs/system/eps_test.png create mode 100755 showfetchworkersstate.py delete mode 100644 w/.gitignore delete mode 100644 w/About.md diff --git a/.gitignore b/.gitignore index 62ce857..f629e9b 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,8 @@ i i/** r r/** +d +d/** w w/** i_gdl* diff --git a/d/.gitignore b/d/.gitignore deleted file mode 100644 index b66ba8a..0000000 --- a/d/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -** -!About.md -!.gitignore diff --git a/d/About.md b/d/About.md deleted file mode 100644 index 56d8b27..0000000 --- a/d/About.md +++ /dev/null @@ -1 +0,0 @@ -This folder contains all images grouped by subreddit. diff --git a/error_hashes.txt b/error_hashes.txt deleted file mode 100644 index f88deba..0000000 --- a/error_hashes.txt +++ /dev/null @@ -1,9 +0,0 @@ -9b5936f4006146e4e1e9025b474c02863c0b5614132ad40db4b925a10e8bfbb9 -22f61e8d7fc7d46086cd8ee981ac52315e004c826d976f2fe0c4b57e9ec742a1 -eba24ca839aa66d6408f75d1ff6789d57867309c10dd19e24d122153b0dfb146 -a6e5c2ee46b23c78614b2891d73a92f10ec1d4bf0cc3b792cb13d338ec6085eb -15a2f3731db495eb8c9d45c4c6a2591fb5efcf4443cb368450542617017b2508 -3d31dd5e68562ede60fb0ee29c2d743c8c378a91c77fe94d02fcd1ae5185bdf0 -194e03fb8c332f79be011cfc4f0c550ea51eac15d4d77a9b39c07e197c6ffd86 -19afab3de943b704ce686f9d015d6a8b65451a6a9501cf09c08380456982c9ad -897994b46140642095f268442437a123e9b76805b85f1b5076c3f76379b99377 diff --git a/i/.gitignore b/i/.gitignore deleted file mode 100644 index b66ba8a..0000000 --- a/i/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -** -!About.md -!.gitignore diff --git a/i/About.md b/i/About.md deleted file mode 100644 index b8f68f2..0000000 --- a/i/About.md +++ /dev/null @@ -1 +0,0 @@ -This folder holds the downloaded images grouped by its discussion id from reddit. Not user friendly. diff --git a/r/.gitignore b/r/.gitignore deleted file mode 100644 index b66ba8a..0000000 --- a/r/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -** -!About.md -!.gitignore diff --git a/r/About.md b/r/About.md deleted file mode 100644 index 6d9fdab..0000000 --- a/r/About.md +++ /dev/null @@ -1,3 +0,0 @@ -This folder contains the folders named as the subreddit name you want downloaded. - -As example, for downloading "/r/photoshopbattles/" you create a folder named "photoshopbattles". diff --git a/reddit_imgs/display_current_download.py b/reddit_imgs/display_current_download.py index 8882540..b2563c6 100644 --- a/reddit_imgs/display_current_download.py +++ b/reddit_imgs/display_current_download.py @@ -1,56 +1,96 @@ #!/usr/bin/env python3 # -*- encoding: utf-8 -*- +import json +import tkinter import traceback from io import BytesIO from pathlib import Path from typing import Optional -from PIL import Image - -from .system.downloader.downloadedData import MainApp +from PIL import Image, ImageTk millisamount = 10 AnyInt = Optional[int] -def updateImage(tk: MainApp, old_url_modified: AnyInt, old_file_modified: AnyInt, old_file_sz: AnyInt): +class MainApp(tkinter.Tk): + def __init__(self): + super().__init__() + self.geometry('500x500') + self.known_width = 500 + self.known_height = 500 + self.resizable(width=True, height=True) + self.photo = Image.new('RGB', (256, 256), (0, 0, 0)) + self.display_photo = self.photo.copy() + self.image = ImageTk.PhotoImage(self.display_photo) + self.frame = tkinter.Frame(self) + self.frame.pack(fill=tkinter.BOTH, expand=tkinter.YES) + self.panel = tkinter.Label(self.frame) + self.panel.pack(fill=tkinter.BOTH, expand=tkinter.YES) + self.panel.bind('', self._resize_image) + self.panel.configure(image=self.image) + self.update() + self._resize_image2() + self.update() + + def _resize_image(self, event): + self.known_width = event.width + self.known_height = event.height + self._resize_image2() + + def _resize_image2(self): + size_tuple = (self.known_width, self.known_height) + self.display_photo = self.photo.copy() + self.display_photo.thumbnail(size_tuple) + if self.display_photo is None: + self.display_photo = self.photo + self.image = ImageTk.PhotoImage(self.display_photo) + self.panel.configure(image=self.image) + + def update_image(self, image, windowTile=None): + if windowTile is not None: + self.winfo_toplevel().title(str(windowTile)) + self.photo = image + self._resize_image2() + self.update() + + +def updateImage(tk: MainApp, old_fpp_modified: AnyInt): try: - path_file = Path('latest_put_image.file') - path_url = Path('latest_put_image.url') - new_url_modified = None - new_file_modified = None - new_file_sz = None + file_path_path = Path('latest_image_download.txt') + new_fpp_modified = None try: - st = path_url.stat() - new_url_modified = st.st_mtime_ns + st = file_path_path.stat() + new_fpp_modified = st.st_mtime_ns except BaseException: pass - try: - st = path_file.stat() - new_file_modified = st.st_mtime_ns - new_file_sz = st.st_size - except BaseException: - pass - tk.after(millisamount, updateImage, tk, - new_url_modified, new_file_modified, new_file_sz) - if old_url_modified != new_url_modified or old_file_modified != new_file_modified or old_file_sz != new_file_sz: - url = None - bts = None + tk.after(millisamount, updateImage, tk, new_fpp_modified) + if old_fpp_modified != new_fpp_modified: + file_path = None + info_path = None + file = None + info = None try: - url = path_url.read_text() - bts = path_file.read_bytes() + fpp = file_path_path.read_text() + file_path = Path(fpp) + info_path = Path(fpp+'.json') + file = file_path.read_bytes() + info = json.loads(info_path.read_text()) except BaseException: pass - if url is not None and bts is not None: + if file is not None and info is not None: try: - tk.update_image(Image.open(BytesIO(bts)), url) + tk.update_image( + Image.open(BytesIO(file)), + info.get('url', str(file_path).split('/', 1)[-1]) + ) except BaseException: print() - print("Exception on link %r" % url) + print("Exception on entry %r" % file_path) traceback.print_exc() - if ((old_url_modified is not None or old_file_modified is not None) and (new_url_modified is None and new_file_modified is None)): + if (old_fpp_modified is not None and new_fpp_modified is None): tk.destroy() except BaseException: print() @@ -60,7 +100,7 @@ def updateImage(tk: MainApp, old_url_modified: AnyInt, old_file_modified: AnyInt def main(): tk = MainApp() - tk.after(1, updateImage, tk, None, None, None) + tk.after(1, updateImage, tk, None) tk.mainloop() diff --git a/reddit_imgs/display_fetch_futures.py b/reddit_imgs/display_fetch_futures.py new file mode 100644 index 0000000..fcfc755 --- /dev/null +++ b/reddit_imgs/display_fetch_futures.py @@ -0,0 +1,412 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + +import datetime +import importlib +import shutil +import time +import traceback +from pathlib import Path +from time import sleep + +import colored + +ESC = '\033' +LINK_MEMORY_SIZE = 64 + + +def reverse_mapping_list(d: dict) -> dict: + r = {k: list() for k in set(d.values())} + for k, v in d.items(): + r[v].append(k) + return r + + +def reverse_mapping(d: dict) -> dict: + return {v: k for k, v in d.items()} + + +def frequency_dict(l: list) -> dict: + f = {e: 0 for e in set(l)} + for e in l: + f[e] += 1 + return f + + +def print_terminal(workers_state_path: Path, keep_to_next_cycle=None): + jobs = list(map(lambda a: a.name, filter( + lambda a: '=' not in a.name, + workers_state_path.iterdir()))) + if len(jobs) == 0: + print(ESC+'[2J', end='', flush=True) + print(ESC+'[0;0H', end='', flush=True) + print( + f'Waiting for jobs @ {datetime.datetime.now()}', end='', flush=True) + print(ESC+'[K', flush=True) + return + jobs_lines = dict() + jobs_queues = dict() + jobs_enqueueds = dict() + jobs_states = dict() + jobs_dates = dict() + try: + jobs_lines = { + job: int( + workers_state_path.joinpath(job+'=line') + .read_text() + ) + for job in jobs + } + jobs_queues = { + job: int( + workers_state_path.joinpath(job) + .read_text().split(':')[1] + ) + for job in jobs + } + jobs_enqueueds = { + job: int( + workers_state_path.joinpath(job) + .read_text().split(':')[2] + ) + for job in jobs + } + jobs_states = { + job: ( + workers_state_path.joinpath(job) + .read_text().split(':')[0] + ) + for job in jobs + } + jobs_dates = { + job: ( + workers_state_path.joinpath(job) + .stat().st_mtime_ns + ) + for job in jobs + } + jobs_files = { + job: int( + workers_state_path.joinpath(job) + .read_text().split(':')[4] + ) + for job in jobs + } + jobs_bytes = { + job: int( + workers_state_path.joinpath(job) + .read_text().split(':')[3] + ) + for job in jobs + } + except KeyboardInterrupt: + raise + except: + return keep_to_next_cycle + # print(ESC+'[2J', end='', flush=True) + print(ESC+'[0;0H', end='', flush=True) + if keep_to_next_cycle is None: + keep_to_next_cycle = dict() + displayatbottom = '' + jobs_sizes = {job: len( + job + + '@' + + (str(jobs_enqueueds[job])+'/' if jobs_states[job] in ('running', 'scrubbing') else '') + + str(jobs_queues[job]) + ) + for job in jobs_lines.keys()} + max_job_size = max([*jobs_sizes.values(), 0]) + state_stats = frequency_dict(list(jobs_states.values())) + links_stats = dict(waiting=0, enqueued=0, scrubbing=0, + running1=0, running2=0, finished=0) + for job in jobs: + state = jobs_states[job] + jq1 = jobs_queues.get(job, 0) + jq2 = jobs_enqueueds.get(job, 0) + if state not in ('running', 'scrubbing'): + if state not in links_stats: + links_stats[state] = 0 + links_stats[state] += jq1 + elif state == 'scrubbing': + if state not in links_stats: + links_stats[state] = 0 + links_stats[state] += jq2 + else: + links_stats['running1'] += jq2 + links_stats['running2'] += jq1-jq2 + term_sz = shutil.get_terminal_size() + per_column = term_sz.columns//(max_job_size+1) + jobs_sorted = list(map(lambda a: a[1], sorted( + reverse_mapping(jobs_lines).items()))) + jobsdates_list = list(reversed(sorted(jobs_dates.values()))) + jobs_daterank = { + job: jobsdates_list.index(date) + for job, date in jobs_dates.items()} + bg_rank_color_names = [ + # "grey_93", # 24 + # "grey_89", # 23 + # "grey_85", # 22 + # "grey_82", # 21 + # "grey_78", # 20 + # "grey_74", # 19 + # "grey_70", # 18 + # "grey_66", # 17 + "grey_62", # 16 + "grey_58", # 15 + "grey_54", # 14 + "grey_50", # 13 + "grey_46", # 12 + "grey_42", # 11 + "grey_39", # 10 + "grey_35", # 9 + "grey_30", # 8 + "grey_27", # 7 + "grey_23", # 6 + "grey_19", # 5 + "grey_15", # 4 + "grey_11", # 3 + "grey_7", # 2 + "grey_3", # 1 + ] + bg_rank = [ + colored.bg(clr) + for clr in bg_rank_color_names + ] + bg_rank = bg_rank[-state_stats.get('running', 1):] + bg_rang_programmed_len = len(bg_rank) + bg_rank += ['']*(len(jobs_dates)-len(bg_rank)) + + link_processing_timestamps = keep_to_next_cycle.get( + 'link_processing_timestamps', list()) + for link_processing_timestamp in jobs_dates.values(): + if link_processing_timestamp not in link_processing_timestamps: + link_processing_timestamps.append(link_processing_timestamp) + link_processing_timestamps = list(reversed(sorted(link_processing_timestamps)))[ + :max(state_stats.get("running", 0), LINK_MEMORY_SIZE)] + keep_to_next_cycle['link_processing_timestamps'] = link_processing_timestamps + + link_processing_deltas = list(map( + lambda t: (t[0]-t[1])/10**9, + zip( + [time.time()*10**9]+link_processing_timestamps, + link_processing_timestamps+[link_processing_timestamps[-1]] + )))[0:-1] + link_processing_deltas_avg = sum( + link_processing_deltas+[0])/max(1, len(link_processing_deltas)) + links_per_sec = 1/max(2**-30, link_processing_deltas_avg) + + link_processing_deltas_var = 0 + if (l := len(link_processing_deltas)) > 0: + diff = list(map( + lambda lpd: (lpd - link_processing_deltas_avg), + link_processing_deltas + )) + diffsqd = list(map( + lambda d: d**2, + diff + )) + link_processing_deltas_var = sum(diffsqd)/l + + download_pending_count = ( + links_stats.get("waiting", 0) + + links_stats.get("enqueued", 0) + + links_stats.get("running1", 0) + ) + + seconds_to_finish = download_pending_count*link_processing_deltas_avg + datetime_when_finished = datetime.datetime.now( + ) + datetime.timedelta(seconds=seconds_to_finish) + + time_to_finish = '%2dd %2dh %2dm %2ds' % ( + seconds_to_finish//(3600*24), + (seconds_to_finish % (3600*24))//3600, + (seconds_to_finish % 3600)//60, + seconds_to_finish % 60, + ) + + displayatbottom += f'Speed: {"%.3f" % links_per_sec} links/s | ' + displayatbottom += f'ETA: {time_to_finish} | ' + displayatbottom += f'ETL: {datetime_when_finished} | ' + displayatbottom += f'Error: \u00b1{"%6.2f" % (100*(link_processing_deltas_var**.5)/link_processing_deltas_avg)}%' + # displayatbottom += str(link_processing_deltas) + + number1colors = dict( + waiting=[ + colored.fg('light_gray'), + colored.attr('dim'), + ], + enqueued=[ + colored.fg('light_red'), + ], + scrubbing=[ + colored.fg('light_cyan') + ], + running=[ + colored.fg('light_yellow') + ], + finished=[ + colored.fg('light_green') + ], + ) + number2colors = number1colors.copy() + number2colors['running1'] = number2colors['running'] + number2colors['running'] = [ + colored.fg('light_cyan'), + ] + number2colors['scrubbing'] = [ + colored.fg('light_magenta'), + ] + number2colors['running2'] = number2colors['running'] + print( + f'# Monitoring {len(jobs)} jobs @ {datetime.datetime.now()}', end='', flush=True) + print(ESC+'[K', flush=True) + print(ESC+'[K', flush=True) + print('Workers: ' + + '%.4f%% | ' % (100*state_stats.get("finished", 0)/(max(1, sum(state_stats.values())))) + + colored.stylize( + f'{state_stats.get("waiting", 0)} waiting', + number1colors['waiting'], + ) + + ' - ' + + colored.stylize( + f'{state_stats.get("enqueued", 0)} enqueued', + number1colors['enqueued'], + ) + + ' - ' + + colored.stylize( + f'{state_stats.get("running", 0)} running', + number1colors['running'], + ) + + ' - ' + + colored.stylize( + f'{state_stats.get("finished", 0)} finished', + number1colors['finished'], + ), + end='') + print(ESC+'[K', flush=True) + print('Links: ' + + '%.4f%% | ' % (100*(links_stats.get("running2", 0)+links_stats.get("finished", 0))/(max(1, sum(links_stats.values())))) + + colored.stylize( + f'{links_stats.get("waiting", 0)} w.', + number2colors['waiting'], + ) + + ' - ' + + colored.stylize( + f'{links_stats.get("enqueued", 0)} e.', + number2colors['enqueued'], + ) + + ' - ' + + colored.stylize( + f'{links_stats.get("running1", 0)} s.', + number2colors['running1'], + ) + + ' \u00b7 ' + + colored.stylize( + f'{links_stats.get("scrubbing", 0)} s.', + number2colors['scrubbing'], + ) + + ' \u00b7 ' + + colored.stylize( + f'{links_stats.get("running2", 0)} d.', + number2colors['running2'], + ) + + ' - ' + + colored.stylize( + f'{links_stats.get("finished", 0)} f.', + number2colors['finished'], + ), + end='') + print(ESC+'[K', flush=True) + print('Latest updates gradient: ', end='') + bg_rang_programmed_len_digits = len('%d' % (bg_rang_programmed_len+1)) + for i in range(bg_rang_programmed_len+1): + if i == bg_rang_programmed_len: + print(' ', end='') + print('-'*bg_rang_programmed_len_digits, end='') + else: + print(bg_rank[i], end='') + print(' ', end='') + print(('%%0%dd' % bg_rang_programmed_len_digits) % (i+1), end='') + print(' ', end='') + print(colored.attr('reset'), end='') + + print(ESC+'[K', flush=True) + print(ESC+'[K', flush=True) + current_jobs = jobs_sorted.copy() + while len(current_jobs) > 0: + for _ in range(per_column): + if len(current_jobs) > 0: + current_job, *current_jobs = current_jobs + current_state = jobs_states[current_job] + number1color = number1colors.get(current_state, '') + number2color = number2colors.get(current_state, '') + print(''.join(number1color), end='') + print(bg_rank[jobs_daterank[current_job]], end='') + print(' '*(max_job_size-jobs_sizes[current_job]), end='') + print(current_job, end='') + print('@', end='') + if current_state in ('running', 'scrubbing'): + print(''.join(number2color), end='') + print(str(jobs_enqueueds[current_job]), end='') + print(''.join(number1color), end='') + print('/', end='') + print(str(jobs_queues[current_job]), end='') + print(colored.attr('reset'), end='') + print(' ', end='') + print(ESC+'[K', flush=False) + print(displayatbottom, end=ESC+'[K', flush=True) + print(ESC+'[0J', end='', flush=True) + print(ESC+'[0;0H', end='', flush=True) + return keep_to_next_cycle + + +def do_cycle_sleep(): + sleep(1/60) + + +def main(): + selfmodule_path = (Path(__file__) + .absolute() + .relative_to(Path('.').absolute() + )) + selfmodule_name = ( + str(selfmodule_path.parent).replace('/', '.') + + '.' + + selfmodule_path.stem) + selfmodule_name = ( + selfmodule_name[1:] + if selfmodule_name.startswith('.') else + selfmodule_name) + selfmodule = importlib.import_module(selfmodule_name) + workers_state_path = Path('i_gdl_w') + from_exc = False + keep_to_next_cycle = None + print(ESC+'[2J', end='', flush=True) + print(ESC+'[0;0H', end='', flush=True) + while workers_state_path.exists(): + try: + selfmodule = importlib.reload(selfmodule) + if from_exc: + from_exc = False + print(ESC+'[2J', end='', flush=True) + print(ESC+'[0;0H', end='', flush=True) + keep_to_next_cycle = selfmodule.print_terminal( + workers_state_path, keep_to_next_cycle) + except KeyboardInterrupt: + print(ESC+'[2J', end='', flush=True) + print(ESC+'[0;0H', end='', flush=True) + raise + except BaseException: + print(ESC+'[2J', end='', flush=True) + print(ESC+'[0;0H', end='', flush=True) + traceback.print_exc() + from_exc = True + sleep(1) + selfmodule.do_cycle_sleep() + print(ESC+'[0;0H', end='', flush=True) + print(ESC+'[K', end='', flush=True) + print('Queue is empty') + + +if __name__ == "__main__": + main() diff --git a/reddit_imgs/fetch2.py b/reddit_imgs/fetch2.py index db24c7b..1fc99ab 100644 --- a/reddit_imgs/fetch2.py +++ b/reddit_imgs/fetch2.py @@ -2,11 +2,13 @@ # -*- encoding: utf-8 -*- import json -from concurrent.futures import Future +import math +import shutil +import sys +import traceback from concurrent.futures import ProcessPoolExecutor as PoolExecutor from pathlib import Path -import shutil -from typing import Any, List, Optional, Tuple +from typing import List, Optional, Tuple import colored as clrlib import gallery_dl @@ -20,6 +22,12 @@ import reddit_imgs.sync from .system.downloader.cache import get_normalized_link, get_path_for_caching from .system.urlmatcher import search_urls +MAX_WORKERS = 12 +SPLIT_WORKER_AFTER_N_LINKS = 1000 +FORBIDDEN_WORKER_SPLITS = { + 'deviantart', +} + def main(): subreddit_data_path = Path('r.json') @@ -27,6 +35,10 @@ def main(): print("Executing prerrequisite...") reddit_imgs.sync.main() print('Loading posts...') + workers_state_path = Path('i_gdl_w') + workers_state_path.mkdir(exist_ok=True, parents=True) + for wsp in workers_state_path.iterdir(): + wsp.unlink() subreddit_data = json.loads(subreddit_data_path.read_text()) links = dict() postsl = [ @@ -85,7 +97,7 @@ def main(): link_extractors = dict() if r_gdl_le_path.exists(): link_extractors = json.loads(r_gdl_le_path.read_text()) - for seq, link in enumerate(links.keys()): + for link in links.keys(): if link not in link_extractors or link_extractors[link] == '': ext = None try: @@ -112,6 +124,50 @@ def main(): print(f'{len(links_by_extractor)} extractors found') Path('r_gdl_lbe.json').write_text(json.dumps(links_by_extractor, indent=1)) + ignored_links = set() + if (p := Path('i_gdl_ignored.txt')).exists(): + ignored_links = set(list(filter(len, p.read_text().splitlines()))) + + max_expected_jobs_for_extractor = 0 + for extractor, links in links_by_extractor.items(): + links = [link for link in links if link not in ignored_links] + workers = math.ceil(len(links)/SPLIT_WORKER_AFTER_N_LINKS) + if workers <= 1 or extractor in FORBIDDEN_WORKER_SPLITS: + workers = 1 + max_expected_jobs_for_extractor = max( + max_expected_jobs_for_extractor, + workers + ) + worker_by_seq = [list() for _ in range(max_expected_jobs_for_extractor)] + + links_to_worker = dict() + for extractor, links in links_by_extractor.items(): + links = [link for link in links if link not in ignored_links] + workers = math.ceil(len(links)/SPLIT_WORKER_AFTER_N_LINKS) + if workers <= 1 or extractor in FORBIDDEN_WORKER_SPLITS: + links_to_worker[extractor] = links + worker_by_seq[0].append(extractor) + else: + digits = math.ceil(math.log10(max(1, workers+1))) + fmt = "%%0%dd" % digits + for worker_no in range(workers): + lowerlimit = (worker_no+0)*SPLIT_WORKER_AFTER_N_LINKS + upperlimit = (worker_no+1)*SPLIT_WORKER_AFTER_N_LINKS + thisrange = links[lowerlimit:upperlimit] + worker_nm = extractor + ':' + (fmt % (worker_no)) + links_to_worker[worker_nm] = thisrange + worker_by_seq[worker_no].append(worker_nm) + for w in worker_by_seq: + w.sort() + workers_nicely_grouped = [ + worker + for workergroup in worker_by_seq + for worker in workergroup + if worker != '' + ] + print(f'{len(links_to_worker)} workers to be spawned') + + configure_gdl() gallery_dl.output.select = lambda: ColoredLineOutput(False) @@ -120,40 +176,110 @@ def main(): totalfiles = 0 - thread_ids = sorted(list(links_by_extractor.keys())) + thread_ids = workers_nicely_grouped.copy() + for line, thread_id in enumerate(thread_ids): + workers_state_path.joinpath(thread_id+'=line').write_text(str(line)) + linkcount = len(links_to_worker[thread_id]) + workers_state_path.joinpath(thread_id).write_text(f'waiting:{linkcount}:{linkcount}:0:0') do_fancy_multithreading_panel = False - with PoolExecutor(len(thread_ids)) as pe: - jobs: List[Future] = list() + thread_id_count = len(thread_ids) + with PoolExecutor(min(MAX_WORKERS, thread_id_count)) as pe: if do_fancy_multithreading_panel: print(f'\033[2J', end='', flush=True) print(f'\033[0;0H', end='', flush=True) print('Downloading...', flush=True) if do_fancy_multithreading_panel: print(f'\033[0;0H', end='', flush=True) + largest_tid_size = max(map(len, thread_ids)) + line2tid = dict() + + def done_callback_generator(line): + nonlocal totalfiles + def done_callback(job): + nonlocal totalfiles + thread_id = line2tid[line] + links_list = links_to_worker[thread_id] + workers_state_path.joinpath(thread_id).write_text(f'finished:{len(links_list)}:0:0:0') + print(clrlib.stylize( + f'Received job #{line}: {thread_id}', [ + clrlib.fg('white'), + clrlib.bg('green'), + clrlib.attr('bold'), + ] + )) + totalbytes = 0 + thisfiles = 0 + generator = list() + try: + generator = job.result() + except: + with workers_state_path.joinpath(thread_id+'=exc').open('wt') as f: + traceback.print_exc(file=f) + traceback.print_exc() + sys.exit(255) + for link, files in generator: + files_from_links[link] = files + lenfiles = len(files) + totalfiles += lenfiles + for file in files: + st = Path(file).stat() + totalbytes += st.st_size + thisfiles += lenfiles + workers_state_path.joinpath(thread_id).write_text(f'finished:{len(links_list)}:0:{totalbytes}:{thisfiles}') + + + return done_callback for line, thread_id in enumerate(thread_ids): + line2tid[line] = thread_id + links_list = links_to_worker[thread_id] + workers_state_path.joinpath(thread_id).write_text(f'enqueued:{len(links_list)}:{len(links_list)}:0:0') + print(clrlib.stylize(f'Starting job #{line}: {thread_id}', [ + clrlib.fg('white'), + clrlib.bg('light_red'), + clrlib.attr('bold'), + ])) + jobstardedmsg = clrlib.stylize(f'Starting job #{line}: {thread_id}', [ + clrlib.fg('black'), + clrlib.bg('light_yellow'), + clrlib.attr('bold'), + ]) + thread_id_nmsz = len(thread_id) + thread_id_display = thread_id + ' ' * \ + (largest_tid_size - thread_id_nmsz) job = pe.submit( download_link_list, - links_by_extractor[thread_id], - thread_id, + links_list, + thread_id_display, line+3 if do_fancy_multithreading_panel else None, + jobstardedmsg, + workers_state_path.joinpath(thread_id), ) - jobs.append(job) - for job in jobs: - generator = job.result() - for k, v in generator: - files_from_links[k] = v - totalfiles += len(v) + job.add_done_callback(done_callback_generator(line)) Path('i_gdl_ffl.json').write_text(json.dumps(files_from_links, indent=1)) + if (p := Path('latest_image_download.txt')).exists(): + p.unlink() + if workers_state_path.exists(): + for p in workers_state_path.glob('*'): + p.unlink() + shutil.rmtree(workers_state_path) print(f'Downloaded {totalfiles} files') def download_link_list(links: List[str], thread_id: str, line: Optional[int] = None, + job_started_msg: Optional[str] = None, + thread_state_path: Optional[Path] = None, ) -> List[Tuple[str, List[str]]]: + '''Downloads a link list inside a ProcessPoolExecutor''' + if job_started_msg is not None: + print(job_started_msg) has_its_own_line = line is not None - remaining_links = len(links) + link_count = len(links) + remaining_links = link_count configure_gdl() + if thread_state_path is not None: + thread_state_path.write_text(f'running:{link_count}:{remaining_links}:0:0') def get_printer(): return ColoredLineOutput( @@ -164,33 +290,59 @@ def download_link_list(links: List[str], clrlib.stylize(thread_id, [clrlib.fg('yellow')]) + clrlib.stylize('=', [clrlib.fg('dark_gray')]), suffix=('\033[K\033[0;0H' if has_its_own_line else ''), - prefixsz=len(('% 9d'%0)+' '+thread_id), + prefixsz=len(('% 9d' % 0)+' '+thread_id), suffixsz=0, + write_successes_to=Path('latest_image_download.txt'), ) gallery_dl.output.select = get_printer result = list() + totalbytes = 0 + totalfiles = 0 try: for link in links: + scrubbing = True cachedir = get_path_for_caching(link, Path('i_gdl_c')) cachedir.mkdir(parents=True, exist_ok=True) metafile = cachedir.joinpath('_gdl_meta.json') meta = dict() + link_already_downloaded = False if metafile.exists(): meta = json.loads(metafile.read_text()) - if link not in meta: + if link in meta: + link_already_downloaded = True + for fl in meta[link]: + pth = Path(fl) + if not pth.exists(): + link_already_downloaded = False + break + if not link_already_downloaded: + scrubbing = False + if thread_state_path is not None: + thread_state_path.write_text(f'running:{link_count}:{remaining_links}:{totalbytes}:{totalfiles}') job = DownloadJobWithCallSaverPostProcessor(link) job.out = get_printer() - job.out.start(clrlib.stylize(link, [clrlib.fg('light_magenta')])) + job.out.message(link, clrlib.fg('light_magenta')) job.run() files = list(map(lambda a: a[0], job.cspp.calls['run_final'])) + files = list(filter(lambda a: Path(a).exists(), files)) meta[link] = files metafile.write_text(json.dumps(meta, indent=1)) + for fl in meta[link]: + pth = Path(fl) + if not pth.exists(): + raise FileNotFoundError((link, link_already_downloaded, meta[link])) + st = pth.stat() + totalbytes += st.st_size + totalfiles += 1 result.append((link, meta[link])) remaining_links -= 1 + if thread_state_path is not None: + scrubbing_running = 'scrubbing' if scrubbing else 'running' + thread_state_path.write_text(f'{scrubbing_running}:{link_count}:{remaining_links}:{totalbytes}:{totalfiles}') finally: print((f'\033[{line};0H' if has_its_own_line else '') + - clrlib.stylize(thread_id, [clrlib.fg('yellow')]) + + clrlib.stylize(thread_id.strip(), [clrlib.fg('yellow'), clrlib.attr('bold')]) + clrlib.stylize('#', [clrlib.fg('light_red')]) + clrlib.stylize('Done', [clrlib.fg('light_green')]) + ('\033[K' if has_its_own_line else '') @@ -199,15 +351,18 @@ def download_link_list(links: List[str], def configure_gdl(): + '''Configures Gallery-DL for usage.''' parser = gallery_dl.option.build_parser() args = parser.parse_args([ - '--download-archive=i_gdl.sqlite3', + '--download-archive=i_gdl/archive.db', '--dest=i_gdl', '--write-metadata', '--write-tags', - '--write-log=i_gdl_log.txt', - '--write-unsupported=i_gdl_unsupported.txt', + # '--write-log=i_gdl_log.txt', + # '--write-unsupported=i_gdl_unsupported.txt', '--quiet', + '--retries=15', + # '--limit-rate=1500k', ]) gallery_dl.output.initialize_logging(args.loglevel) @@ -242,7 +397,7 @@ class DownloadJobWithCallSaverPostProcessor(gallery_dl.job.DownloadJob): class ColoredLineOutput(gallery_dl.output.TerminalOutput): - def __init__(self, sameline=False, prefix="", suffix="", prefixsz=0, suffixsz=0): + def __init__(self, sameline=False, prefix="", suffix="", prefixsz=0, suffixsz=0, write_successes_to=None): super().__init__() self.sameline = sameline self.eol = '\r' if sameline else '\n' @@ -250,24 +405,42 @@ class ColoredLineOutput(gallery_dl.output.TerminalOutput): self.suffix = suffix self.prefixsz = prefixsz self.suffixsz = suffixsz - self.termsize = shutil.get_terminal_size().columns + self.write_successes_to = write_successes_to + self._termsize_update() def start(self, path): - print(f'{self.prefix}{clrlib.stylize(self.shorten(path), [clrlib.fg("light_yellow")])}{self.suffix}', - flush=True, end=self.eol) + self.message(path, + clrlib.fg("light_yellow"), + ) def skip(self, path): - print(f"{self.prefix}\033[2m{self.shorten(path)}\033[0m{self.suffix}", - flush=True, end=self.eol) + self.message(path, + clrlib.attr('dim'), + ) def success(self, path, tries): - print(f"{self.prefix}\033[1;32m{self.shorten(path)}\033[0m{self.suffix}", - flush=True, end=self.eol) - + self.message(path, + clrlib.attr('bold'), + clrlib.fg('light_green'), + ) + if self.write_successes_to is not None: + self.write_successes_to.write_text(path) + + def message(self, txt: str, *attrs: List[str], do_print: bool = True) -> str: + """Prints a message with given formatters""" + clrtxt = clrlib.stylize(self.shorten(txt), attrs) + fmtd = f"{self.prefix}{clrtxt}{self.suffix}" + if do_print: + print(fmtd, flush=True, end=self.eol) + return fmtd + def shorten(self, txt): - self.width = self.termsize - self.prefixsz - self.suffixsz -1 + self._termsize_update() + self.width = self.termsize - self.prefixsz - self.suffixsz - 1 return super().shorten(txt) + def _termsize_update(self): + self.termsize = shutil.get_terminal_size().columns class CallSaverPostProcessor(gallery_dl.postprocessor.common.PostProcessor): diff --git a/reddit_imgs/runner.py b/reddit_imgs/runner.py index 5bfc1b4..2903436 100755 --- a/reddit_imgs/runner.py +++ b/reddit_imgs/runner.py @@ -159,8 +159,8 @@ def cmdline(): cmds = sys.argv[1:] available_commands = (( ('sync', reddit_imgs.sync.main), - ('fetch', reddit_imgs.fetch.main), - ('fetch2', reddit_imgs.fetch2.main), + ('fetch', reddit_imgs.fetch2.main), + ('fetch_old', reddit_imgs.fetch.main), ('fetchretryingemptyalbuns', reddit_imgs.fetch.retry), ('cachedhash', reddit_imgs.cachedhash.main), ('hashit', reddit_imgs.hashit.main), diff --git a/reddit_imgs/sync.py b/reddit_imgs/sync.py index 5524a40..a2c1db3 100755 --- a/reddit_imgs/sync.py +++ b/reddit_imgs/sync.py @@ -30,7 +30,8 @@ def process_subreddit(subreddit): try: with open(os.path.join(srp, 'subreddit.json')) as f: srdt = json.loads(f.read()) - except: pass + except BaseException: + pass #srdt = getEmptySubredditData(subreddit) pageno = 0 ygst = srdt['date_first'] @@ -42,7 +43,7 @@ def process_subreddit(subreddit): redditBytes = None try: redditBytes = simpleDownloader.getUrlBytes(nextpage) - except (HTTPError, URLError, ContentTooShortError) as e: + except (HTTPError, URLError, ContentTooShortError): print(" >> HTTP Error with code: Skipping...") break if redditBytes is None: diff --git a/reddit_imgs/system/.gitignore b/reddit_imgs/system/.gitignore new file mode 100644 index 0000000..c279aea --- /dev/null +++ b/reddit_imgs/system/.gitignore @@ -0,0 +1 @@ +eps_test_out_*.png \ No newline at end of file diff --git a/reddit_imgs/system/edge_propagation_scaler.py b/reddit_imgs/system/edge_propagation_scaler.py new file mode 100755 index 0000000..29b26e3 --- /dev/null +++ b/reddit_imgs/system/edge_propagation_scaler.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + +import sys +from pathlib import Path +from typing import Tuple, List + +from PIL import Image, ImageDraw + + +START = -(sys.maxsize // 2) + 0 +END = -(sys.maxsize // 2) + 1 +CENTER = -(sys.maxsize // 2) + 2 + +POS_TOP_LEFT = (START, START) +POS_CENTER_LEFT = (START, CENTER) +POS_BOTTOM_LEFT = (START, END) + +POS_TOP_CENTER = (CENTER, START) +POS_CENTER_CENTER = (CENTER, CENTER) +POS_BOTTOM_CENTER = (CENTER, END) + +POS_TOP_RIGHT = (END, START) +POS_CENTER_RIGHT = (END, CENTER) +POS_BOTTOM_RIGHT = (END, END) + +P_TL = POS_TOP_LEFT +P_CL = POS_CENTER_LEFT +P_BL = POS_BOTTOM_LEFT + +P_TC = POS_TOP_CENTER +P_CC = POS_CENTER_CENTER +P_BC = POS_BOTTOM_CENTER + +P_TR = POS_TOP_RIGHT +P_CR = POS_CENTER_RIGHT +P_BR = POS_BOTTOM_RIGHT + + +def edge_propagation_scaling(image: Image.Image, + desired_size: Tuple[int, int], + paste_position: Tuple[int, int] = P_CC, + ) -> Image.Image: + image = image.copy() + scaled = Image.new(image.mode, desired_size) + new_placement_: List[int] = list() + for isz, ssz, pp in zip(image.size, scaled.size, paste_position,): + if pp == START: + new_placement_.append(0) + elif pp == END: + new_placement_.append(ssz-isz) + elif pp == CENTER: + new_placement_.append((ssz-isz)//2) + else: + new_placement_.append(pp) + new_placement: Tuple[int, int] = new_placement_[0], new_placement_[1] + del new_placement_ + scaled.paste(image, new_placement) + parts = dict( + # left, upper, right, lower + t=image.copy().crop( + (0, 0, image.size[0], 1)), + b=image.copy().crop( + (0, image.size[1]-1, image.size[0], image.size[1])), + l=image.copy().crop( + (0, 0, 1, image.size[1])), + r=image.copy().crop( + (image.size[0]-1, 0, image.size[0], image.size[1])), + ) + if (sz := new_placement[1]) > 0: + part = parts['t'].copy() + resized = part.resize((part.size[0], sz)) + scaled.paste(resized, (new_placement[0], 0)) + if (sz := scaled.size[1]-(dsp := new_placement[1]+image.size[1])) > 0: + part = parts['b'].copy() + resized = part.resize((part.size[0], sz)) + scaled.paste(resized, (new_placement[0], dsp)) + if (sz := new_placement[0]) > 0: + part = parts['l'].copy() + resized = part.resize((sz, part.size[1])) + scaled.paste(resized, (0, new_placement[1])) + if (sz := scaled.size[0]-(dsp := new_placement[0]+image.size[0])) > 0: + part = parts['r'].copy() + resized = part.resize((sz, part.size[1])) + scaled.paste(resized, (dsp, new_placement[1])) + del parts + corners = dict( + tl=image.getpixel((0, 0)), + tr=image.getpixel((image.size[0]-1, 0)), + bl=image.getpixel((0, image.size[1]-1)), + br=image.getpixel((image.size[0]-1, image.size[1]-1)), + ) + draw: ImageDraw.ImageDraw = ImageDraw.Draw(scaled) + szt = new_placement[1] + szb = scaled.size[1]-(dspb := new_placement[1]+image.size[1]) + szl = new_placement[0] + szr = scaled.size[0]-(dspr := new_placement[0]+image.size[0]) + if szt > 0 and szl > 0: + draw.rectangle(((0, 0), (szl-1, szt-1)), corners['tl']) + if szt > 0 and szr > 0: + draw.rectangle(((dspr, 0), (scaled.size[0], szt-1)), corners['tr']) + if szb > 0 and szl > 0: + draw.rectangle(((0, dspb), (szl-1, scaled.size[1])), corners['bl']) + if szb > 0 and szr > 0: + draw.rectangle(((dspr, dspb), scaled.size), corners['br']) + del dspr + del dspb + del szt + del szb + del szl + del szr + return scaled + + +def main(path: Path = Path('eps_test.png')): + im: Image.Image = Image.open(path) + + ptd = dict(a=START, b=CENTER, c=END) + + for xp in 'abc': + for yp in 'abc': + im2 = edge_propagation_scaling( + im, + tuple(map(lambda i: int(i*2.5), im.size)), + (ptd[xp], ptd[yp]), + ) + im2.save(Path(f'{path.stem}_out_{yp}{xp}{path.suffix}')) + + +if __name__ == "__main__": + main() diff --git a/reddit_imgs/system/eps_test.png b/reddit_imgs/system/eps_test.png new file mode 100644 index 0000000000000000000000000000000000000000..ba2eba3068f28faba75e54acb617d8e2d4491410 GIT binary patch literal 491 zcmV4Tx0C=2zkv&MmKpe$iKeRAFB6^c+H)C#RSm|XfHG-*gu zTpR`0f`cE6RRU7TlmpZjxkD|wRvK7n|K>4rtTPCT_~ z>74h8qpTz;#OK7L23?T&k?XR{Z=8z``*~*6$fW0pqr^h7jpa6GB|{~iB#tSnM*04% z%L?Z$&T6^Jn)l={jO4ZDC9cyPL=p>Fga`pLs@Omo7UHyOq?kz4e!{~);P^#y$>iDq zBgZ@{P$4;f@IUz7ty!2DcatIspyS21KZb$OF3_mi_V=-EH%@@SGjOG~{FOQ|{YiSQ zrNxebo^9abx~0i`z~v6m|D;QXz>%AxT6*RCt_Y`2YVu0|Nu`Bm+sR|Ns9_ hqUuo_Mr{~C8vxl?evm+N=G*`P002ovPDHLkV1jV4;sgKy literal 0 HcmV?d00001 diff --git a/showfetchworkersstate.py b/showfetchworkersstate.py new file mode 100755 index 0000000..978426d --- /dev/null +++ b/showfetchworkersstate.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + +from reddit_imgs.display_fetch_futures import main + +if __name__ == '__main__': + main() + diff --git a/w/.gitignore b/w/.gitignore deleted file mode 100644 index b66ba8a..0000000 --- a/w/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -** -!About.md -!.gitignore diff --git a/w/About.md b/w/About.md deleted file mode 100644 index 4625178..0000000 --- a/w/About.md +++ /dev/null @@ -1 +0,0 @@ -This folder contains all wallpapers grouped by safety.