From 4ce83f0c5fead0222a322ef678c88655cdae8979 Mon Sep 17 00:00:00 2001 From: Adler Neves Date: Sun, 19 Jul 2020 22:54:26 -0300 Subject: [PATCH] big refactoring --- .gitignore | 3 + hash_thumbnailer_distributed/.gitignore | 9 + hash_thumbnailer_distributed/Makefile | 17 + hash_thumbnailer_distributed/manage.py | 21 + .../webproj/__init__.py | 0 .../webproj/adminModelRegister.py | 30 + hash_thumbnailer_distributed/webproj/asgi.py | 16 + .../webproj/settings.py | 129 +++ .../webproj/stackOverflowSnippets.py | 24 + .../webproj/thumbnailer/__init__.py | 0 .../webproj/thumbnailer/admin.py | 6 + .../webproj/thumbnailer/apps.py | 5 + .../management/commands/dumpresults.py | 28 + .../management/commands/loadhashes.py | 33 + .../thumbnailer/migrations/0001_initial.py | 50 ++ .../thumbnailer/migrations/__init__.py | 0 .../webproj/thumbnailer/models.py | 57 ++ .../webproj/thumbnailer/tests.py | 3 + .../webproj/thumbnailer/urls.py | 13 + .../webproj/thumbnailer/views.py | 66 ++ hash_thumbnailer_distributed/webproj/urls.py | 24 + hash_thumbnailer_distributed/webproj/wsgi.py | 16 + hash_thumbnailer_distributed/worker.py | 275 +++++++ .../worker_thumbnailer.py | 422 ++++++++++ prunedownloads.py | 7 + reddit_imgs/condensate_hashes.py | 238 ++++++ reddit_imgs/display_fetch_futures.py | 437 +++++++--- reddit_imgs/download_pruner.py | 176 +++++ reddit_imgs/fetch.py | 56 +- reddit_imgs/fetch2.py | 744 ++++++++++++++---- reddit_imgs/get_firefox_cookies.sh | 41 + reddit_imgs/hashit2.py | 173 ++++ reddit_imgs/runner.py | 138 ++-- reddit_imgs/suggest_subreddits_from_links.py | 44 ++ reddit_imgs/sync.py | 72 +- reddit_imgs/system/cmdline_parser.py | 83 ++ reddit_imgs/system/downloader/cache.py | 23 +- reddit_imgs/system/flattener.py | 13 + reddit_imgs/system/format_file_size.py | 18 + reddit_imgs/system/hexhashof.py | 6 +- reddit_imgs/system/table_fmt.py | 6 +- reddit_imgs/system/urlmatcher.py | 17 +- 42 files changed, 3187 insertions(+), 352 deletions(-) create mode 100644 hash_thumbnailer_distributed/.gitignore create mode 100644 hash_thumbnailer_distributed/Makefile create mode 100755 hash_thumbnailer_distributed/manage.py create mode 100644 hash_thumbnailer_distributed/webproj/__init__.py create mode 100644 hash_thumbnailer_distributed/webproj/adminModelRegister.py create mode 100644 hash_thumbnailer_distributed/webproj/asgi.py create mode 100644 hash_thumbnailer_distributed/webproj/settings.py create mode 100644 hash_thumbnailer_distributed/webproj/stackOverflowSnippets.py create mode 100644 hash_thumbnailer_distributed/webproj/thumbnailer/__init__.py create mode 100644 hash_thumbnailer_distributed/webproj/thumbnailer/admin.py create mode 100644 hash_thumbnailer_distributed/webproj/thumbnailer/apps.py create mode 100644 hash_thumbnailer_distributed/webproj/thumbnailer/management/commands/dumpresults.py create mode 100644 hash_thumbnailer_distributed/webproj/thumbnailer/management/commands/loadhashes.py create mode 100644 hash_thumbnailer_distributed/webproj/thumbnailer/migrations/0001_initial.py create mode 100644 hash_thumbnailer_distributed/webproj/thumbnailer/migrations/__init__.py create mode 100644 hash_thumbnailer_distributed/webproj/thumbnailer/models.py create mode 100644 hash_thumbnailer_distributed/webproj/thumbnailer/tests.py create mode 100644 hash_thumbnailer_distributed/webproj/thumbnailer/urls.py create mode 100644 hash_thumbnailer_distributed/webproj/thumbnailer/views.py create mode 100644 hash_thumbnailer_distributed/webproj/urls.py create mode 100644 hash_thumbnailer_distributed/webproj/wsgi.py create mode 100755 hash_thumbnailer_distributed/worker.py create mode 100755 hash_thumbnailer_distributed/worker_thumbnailer.py create mode 100755 prunedownloads.py create mode 100755 reddit_imgs/condensate_hashes.py create mode 100644 reddit_imgs/download_pruner.py create mode 100755 reddit_imgs/get_firefox_cookies.sh create mode 100644 reddit_imgs/hashit2.py create mode 100644 reddit_imgs/suggest_subreddits_from_links.py create mode 100644 reddit_imgs/system/cmdline_parser.py create mode 100644 reddit_imgs/system/flattener.py create mode 100644 reddit_imgs/system/format_file_size.py diff --git a/.gitignore b/.gitignore index f629e9b..ae5bf1f 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,8 @@ w w/** i_gdl* i_gdl*/** +r_gdl* +r_gdl*/** i_c i_c/** i_c.json @@ -19,6 +21,7 @@ fetch_missing.json i_he.json i_c_h.json most_repeated_hashes.json +display_fetch_futures.trace i_h i_h/** i_h.json diff --git a/hash_thumbnailer_distributed/.gitignore b/hash_thumbnailer_distributed/.gitignore new file mode 100644 index 0000000..c1af7c1 --- /dev/null +++ b/hash_thumbnailer_distributed/.gitignore @@ -0,0 +1,9 @@ +/.mypy_cache +/.mypy_cache/** +/.vscode +/.vscode/** +/static +/static/** +/db.sqlite3 +/dumped*.json +/hashes*.txt diff --git a/hash_thumbnailer_distributed/Makefile b/hash_thumbnailer_distributed/Makefile new file mode 100644 index 0000000..ed73ed6 --- /dev/null +++ b/hash_thumbnailer_distributed/Makefile @@ -0,0 +1,17 @@ +devserver: + -@mkdir -p static + python manage.py makemigrations + python manage.py migrate + python manage.py createcachetable + yes yes | python manage.py collectstatic + python manage.py runserver 0.0.0.0:8000 + +server: + python manage.py migrate + uvicorn webproj.asgi:application --lifespan off --host 0.0.0.0 --workers 12 + +prepare: + python manage.py loadhashes + +dumpresults: + python manage.py dumpresults diff --git a/hash_thumbnailer_distributed/manage.py b/hash_thumbnailer_distributed/manage.py new file mode 100755 index 0000000..4b7c18e --- /dev/null +++ b/hash_thumbnailer_distributed/manage.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python +"""Django's command-line utility for administrative tasks.""" +import os +import sys + + +def main(): + os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'webproj.settings') + try: + from django.core.management import execute_from_command_line + except ImportError as exc: + raise ImportError( + "Couldn't import Django. Are you sure it's installed and " + "available on your PYTHONPATH environment variable? Did you " + "forget to activate a virtual environment?" + ) from exc + execute_from_command_line(sys.argv) + + +if __name__ == '__main__': + main() diff --git a/hash_thumbnailer_distributed/webproj/__init__.py b/hash_thumbnailer_distributed/webproj/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hash_thumbnailer_distributed/webproj/adminModelRegister.py b/hash_thumbnailer_distributed/webproj/adminModelRegister.py new file mode 100644 index 0000000..cfa6e43 --- /dev/null +++ b/hash_thumbnailer_distributed/webproj/adminModelRegister.py @@ -0,0 +1,30 @@ +from django.db import models +from .stackOverflowSnippets import classesInModule +from django.core import exceptions +from sys import stderr + + +def onlyModels(userMadeModels): + return [model for model in userMadeModels if models.Model in model.__mro__] + + +def isAbstract(clazz): + return clazz._meta.abstract + + +def discardAbstractModels(userMadeModels): + return [model for model in userMadeModels if not isAbstract(model)] + + +def registrableModelsInModule(module): + return discardAbstractModels(onlyModels(classesInModule(module))) + + +def registerForMe(admin, models_module): + for model in registrableModelsInModule(models_module): + try: + admin.site.register(model) + except exceptions.ImproperlyConfigured: + pass + except BaseException as e: + print(str(e.__class__)+': '+str(e), file=stderr) diff --git a/hash_thumbnailer_distributed/webproj/asgi.py b/hash_thumbnailer_distributed/webproj/asgi.py new file mode 100644 index 0000000..88e69a7 --- /dev/null +++ b/hash_thumbnailer_distributed/webproj/asgi.py @@ -0,0 +1,16 @@ +""" +ASGI config for webproj project. + +It exposes the ASGI callable as a module-level variable named ``application``. + +For more information on this file, see +https://docs.djangoproject.com/en/3.0/howto/deployment/asgi/ +""" + +import os + +from django.core.asgi import get_asgi_application + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'webproj.settings') + +application = get_asgi_application() diff --git a/hash_thumbnailer_distributed/webproj/settings.py b/hash_thumbnailer_distributed/webproj/settings.py new file mode 100644 index 0000000..60cee8e --- /dev/null +++ b/hash_thumbnailer_distributed/webproj/settings.py @@ -0,0 +1,129 @@ +""" +Django settings for webproj project. + +Generated by 'django-admin startproject' using Django 3.0.8. + +For more information on this file, see +https://docs.djangoproject.com/en/3.0/topics/settings/ + +For the full list of settings and their values, see +https://docs.djangoproject.com/en/3.0/ref/settings/ +""" + +import os +from pathlib import Path + +import psycopg2.extensions + +# Build paths inside the project like this: os.path.join(BASE_DIR, ...) +BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + + +# Quick-start development settings - unsuitable for production +# See https://docs.djangoproject.com/en/3.0/howto/deployment/checklist/ + +# SECURITY WARNING: keep the secret key used in production secret! +SECRET_KEY = 'o$fohw1$^ber9jn^t2l@+$a5kb5ys%xm^^9)0&#n9--zs5zc@k' + +# SECURITY WARNING: don't run with debug turned on in production! +DEBUG = True + +ALLOWED_HOSTS = [] +ALLOWED_HOSTS = '*' + + +# Application definition + +INSTALLED_APPS = [ + 'django.contrib.admin', + 'django.contrib.auth', + 'django.contrib.contenttypes', + 'django.contrib.sessions', + 'django.contrib.messages', + 'django.contrib.staticfiles', + 'webproj.thumbnailer', +] + +MIDDLEWARE = [ + 'django.middleware.security.SecurityMiddleware', + 'django.contrib.sessions.middleware.SessionMiddleware', + 'django.middleware.common.CommonMiddleware', + 'django.middleware.csrf.CsrfViewMiddleware', + 'django.contrib.auth.middleware.AuthenticationMiddleware', + 'django.contrib.messages.middleware.MessageMiddleware', + 'django.middleware.clickjacking.XFrameOptionsMiddleware', +] + +ROOT_URLCONF = 'webproj.urls' + +TEMPLATES = [ + { + 'BACKEND': 'django.template.backends.django.DjangoTemplates', + 'DIRS': [], + 'APP_DIRS': True, + 'OPTIONS': { + 'context_processors': [ + 'django.template.context_processors.debug', + 'django.template.context_processors.request', + 'django.contrib.auth.context_processors.auth', + 'django.contrib.messages.context_processors.messages', + ], + }, + }, +] + +WSGI_APPLICATION = 'webproj.wsgi.application' + + +# Database +# https://docs.djangoproject.com/en/3.0/ref/settings/#databases + +DATABASES = { + 'default': { + 'ENGINE': 'django.db.backends.postgresql_psycopg2', + 'NAME': 'test', + 'USER': 'test', + 'PASSWORD': 'test', + 'HOST': 'localhost', + 'PORT': '5432', + } +} + +# Password validation +# https://docs.djangoproject.com/en/3.0/ref/settings/#auth-password-validators + +AUTH_PASSWORD_VALIDATORS = [ + { + 'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator', + }, + { + 'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator', + }, + { + 'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator', + }, + { + 'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator', + }, +] + + +# Internationalization +# https://docs.djangoproject.com/en/3.0/topics/i18n/ + +LANGUAGE_CODE = 'en-us' + +TIME_ZONE = 'UTC' + +USE_I18N = True + +USE_L10N = True + +USE_TZ = True + + +# Static files (CSS, JavaScript, Images) +# https://docs.djangoproject.com/en/3.0/howto/static-files/ + +STATIC_URL = '/static/' +STATIC_ROOT = str(Path(__file__).absolute().parent.parent.joinpath('static')) diff --git a/hash_thumbnailer_distributed/webproj/stackOverflowSnippets.py b/hash_thumbnailer_distributed/webproj/stackOverflowSnippets.py new file mode 100644 index 0000000..16a8aaf --- /dev/null +++ b/hash_thumbnailer_distributed/webproj/stackOverflowSnippets.py @@ -0,0 +1,24 @@ +class StackOverflowCopypaste: + __doc__ = None + __author__ = None + __license__ = 'CC BY-SA 3.0' + + def __str__(self): return str(self.__call__) + + def __call__(self, module): pass + + +class stackoverflow_a_21563930(StackOverflowCopypaste): + __doc__ = 'https://stackoverflow.com/a/21563930' + __author__ = 'piRSquared' + + def __call__(self, module): + moduleDict = module.__dict__ + return [ + definedClass for definedClass in moduleDict.values() if ( + isinstance(definedClass, type) and definedClass.__module__ == module.__name__ + ) + ] + + +classesInModule = stackoverflow_a_21563930() diff --git a/hash_thumbnailer_distributed/webproj/thumbnailer/__init__.py b/hash_thumbnailer_distributed/webproj/thumbnailer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hash_thumbnailer_distributed/webproj/thumbnailer/admin.py b/hash_thumbnailer_distributed/webproj/thumbnailer/admin.py new file mode 100644 index 0000000..021eee8 --- /dev/null +++ b/hash_thumbnailer_distributed/webproj/thumbnailer/admin.py @@ -0,0 +1,6 @@ +from django.contrib import admin + +import webproj.thumbnailer.models +from webproj.adminModelRegister import registerForMe + +registerForMe(admin, webproj.thumbnailer.models) diff --git a/hash_thumbnailer_distributed/webproj/thumbnailer/apps.py b/hash_thumbnailer_distributed/webproj/thumbnailer/apps.py new file mode 100644 index 0000000..d078272 --- /dev/null +++ b/hash_thumbnailer_distributed/webproj/thumbnailer/apps.py @@ -0,0 +1,5 @@ +from django.apps import AppConfig + + +class ThumbnailerConfig(AppConfig): + name = 'thumbnailer' diff --git a/hash_thumbnailer_distributed/webproj/thumbnailer/management/commands/dumpresults.py b/hash_thumbnailer_distributed/webproj/thumbnailer/management/commands/dumpresults.py new file mode 100644 index 0000000..9607021 --- /dev/null +++ b/hash_thumbnailer_distributed/webproj/thumbnailer/management/commands/dumpresults.py @@ -0,0 +1,28 @@ +from pathlib import Path + +from django.core.management.base import BaseCommand, no_translations + +from webproj.thumbnailer.models import ErrorLog, Job, Log, PerformanceLog, ToJsonableMixin + +from django.core.serializers.json import DjangoJSONEncoder + + +class Command(BaseCommand): + def add_arguments(self, parser): + return + + def handle(self, *args, **options): + # print(sorted(dir(self.style))) + self.stdout.write(self.style.HTTP_INFO( + 'Reading table entries...')) + jobs = list(map(ToJsonableMixin.to_jsonable_nested, Job.objects.order_by('taken_at').all())) + err_log = list(map(ToJsonableMixin.to_jsonable_nested, ErrorLog.objects.order_by('id').all())) + perf_log = list(map(ToJsonableMixin.to_jsonable_nested, PerformanceLog.objects.order_by('id').all())) + outdata = dict(jobs=jobs, logs=dict(error=err_log, performance=perf_log)) + self.stdout.write(self.style.HTTP_INFO( + 'Serializing to JSON...')) + jsoned = DjangoJSONEncoder(indent=1).encode(outdata) + self.stdout.write(self.style.HTTP_INFO( + 'Saving file...')) + Path('dumped.json').write_text(jsoned) + self.stdout.write(self.style.SUCCESS('Saved into "dumped.json".')) diff --git a/hash_thumbnailer_distributed/webproj/thumbnailer/management/commands/loadhashes.py b/hash_thumbnailer_distributed/webproj/thumbnailer/management/commands/loadhashes.py new file mode 100644 index 0000000..2861560 --- /dev/null +++ b/hash_thumbnailer_distributed/webproj/thumbnailer/management/commands/loadhashes.py @@ -0,0 +1,33 @@ +from pathlib import Path + +from django.core.management.base import BaseCommand, no_translations + +from webproj.thumbnailer.models import ErrorLog, Job, Log, PerformanceLog + + +class Command(BaseCommand): + def add_arguments(self, parser): + return + + def handle(self, *args, **options): + # print(sorted(dir(self.style))) + self.stdout.write(self.style.HTTP_INFO( + 'Truncating all relevant tables...')) + Job.truncate_table() + ErrorLog.truncate_table() + PerformanceLog.truncate_table() + self.stdout.write(self.style.SUCCESS('Tables trucated...')) + self.stdout.write(self.style.HTTP_INFO('Reading "hashes.txt"...')) + hashes = dict(map(lambda a: a.split('|', 1), Path( + 'hashes.txt').read_text().splitlines())) + hashes_len = len(hashes) + self.stdout.write(self.style.HTTP_INFO('Adding entries to list...')) + jobs = list() + for seq, (hsh, file) in enumerate(hashes.items()): + jobs.append(Job(hsh=hsh, file=file)) + if seq % 100000 == 0: + print(f'Added {seq} of {hashes_len}') + self.stdout.write(self.style.HTTP_INFO( + 'Bulk-creating jobs into database...')) + Job.objects.bulk_create(jobs) + self.stdout.write(self.style.SUCCESS('Hashes loaded.')) diff --git a/hash_thumbnailer_distributed/webproj/thumbnailer/migrations/0001_initial.py b/hash_thumbnailer_distributed/webproj/thumbnailer/migrations/0001_initial.py new file mode 100644 index 0000000..b1d18e8 --- /dev/null +++ b/hash_thumbnailer_distributed/webproj/thumbnailer/migrations/0001_initial.py @@ -0,0 +1,50 @@ +# Generated by Django 3.0.8 on 2020-07-18 13:16 + +from django.db import migrations, models +import webproj.thumbnailer.models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ] + + operations = [ + migrations.CreateModel( + name='ErrorLog', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('sender', models.TextField()), + ('content', models.TextField()), + ], + options={ + 'abstract': False, + }, + bases=(models.Model, webproj.thumbnailer.models.TruncatableMixin, webproj.thumbnailer.models.ToJsonableMixin), + ), + migrations.CreateModel( + name='Job', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('hsh', models.CharField(max_length=255)), + ('file', models.TextField()), + ('taken_at', models.DateTimeField(blank=True, default=None, null=True)), + ('result', models.TextField(blank=True, default=None, null=True)), + ], + bases=(models.Model, webproj.thumbnailer.models.TruncatableMixin, webproj.thumbnailer.models.ToJsonableMixin), + ), + migrations.CreateModel( + name='PerformanceLog', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('sender', models.TextField()), + ('content', models.TextField()), + ], + options={ + 'abstract': False, + }, + bases=(models.Model, webproj.thumbnailer.models.TruncatableMixin, webproj.thumbnailer.models.ToJsonableMixin), + ), + ] diff --git a/hash_thumbnailer_distributed/webproj/thumbnailer/migrations/__init__.py b/hash_thumbnailer_distributed/webproj/thumbnailer/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/hash_thumbnailer_distributed/webproj/thumbnailer/models.py b/hash_thumbnailer_distributed/webproj/thumbnailer/models.py new file mode 100644 index 0000000..d245826 --- /dev/null +++ b/hash_thumbnailer_distributed/webproj/thumbnailer/models.py @@ -0,0 +1,57 @@ +import json +from typing import List + +from django.db import connection, models + +# Create your models here. + + +class TruncatableMixin: + @classmethod + def truncate_table(cls): + cursor = connection.cursor() + cursor.execute('TRUNCATE TABLE "{0}"'.format(cls._meta.db_table)) + cursor.execute('ALTER SEQUENCE {0}_id_seq RESTART WITH 1'.format(cls._meta.db_table.lower())) + + +class ToJsonableMixin: + def to_jsonable(self): + internal_dict = self.__dict__.copy() + if '_state' in internal_dict: + del internal_dict['_state'] + return internal_dict + + NESTED_JSON_FIELDS: List[str] = [] + + def to_jsonable_nested(self): + data = self.to_jsonable() + for nested_json_field in type(self).NESTED_JSON_FIELDS: + field = data[nested_json_field] + if field is not None: + data[nested_json_field] = json.loads(data[nested_json_field]) + return data + + +class Job(models.Model, TruncatableMixin, ToJsonableMixin): + NESTED_JSON_FIELDS = ['result'] + hsh = models.CharField(max_length=255, null=False, blank=False) + file = models.TextField(null=False, blank=False) + taken_at = models.DateTimeField(default=None, null=True, blank=True) + result = models.TextField(default=None, null=True, blank=True) + + +class Log(models.Model, TruncatableMixin, ToJsonableMixin): + NESTED_JSON_FIELDS = ['sender', 'content'] + sender = models.TextField(null=False, blank=False) + content = models.TextField(null=False, blank=False) + + class Meta: + abstract = True + + +class ErrorLog(Log): + pass + + +class PerformanceLog(Log): + pass diff --git a/hash_thumbnailer_distributed/webproj/thumbnailer/tests.py b/hash_thumbnailer_distributed/webproj/thumbnailer/tests.py new file mode 100644 index 0000000..7ce503c --- /dev/null +++ b/hash_thumbnailer_distributed/webproj/thumbnailer/tests.py @@ -0,0 +1,3 @@ +from django.test import TestCase + +# Create your tests here. diff --git a/hash_thumbnailer_distributed/webproj/thumbnailer/urls.py b/hash_thumbnailer_distributed/webproj/thumbnailer/urls.py new file mode 100644 index 0000000..24a19c6 --- /dev/null +++ b/hash_thumbnailer_distributed/webproj/thumbnailer/urls.py @@ -0,0 +1,13 @@ +from pathlib import Path + +from django.conf.urls.static import static +from django.urls import path + +from . import views + +urlpatterns = [ + path('job', views.JobsView.as_view(), name='jobs'), + path('job/', views.JobView.as_view(), name='job'), + path('log/error', views.LogErrorView.as_view(), name='log_error'), + path('log/performance', views.LogPerformanceView.as_view(), name='log_performance'), +]+static('i_gdl/', document_root=str(Path(__file__).absolute().parent.parent.parent.parent.joinpath('i_gdl')), show_indexes=True) diff --git a/hash_thumbnailer_distributed/webproj/thumbnailer/views.py b/hash_thumbnailer_distributed/webproj/thumbnailer/views.py new file mode 100644 index 0000000..fc90f89 --- /dev/null +++ b/hash_thumbnailer_distributed/webproj/thumbnailer/views.py @@ -0,0 +1,66 @@ +import json + +from django.http import HttpResponse, JsonResponse +from django.http.request import HttpRequest +from django.shortcuts import get_object_or_404, render +from django.utils import timezone +from django.utils.decorators import method_decorator +from django.views.decorators.csrf import csrf_exempt +from django.views.generic import View + +from webproj.thumbnailer.models import ErrorLog, Job, Log, PerformanceLog, ToJsonableMixin + +# Create your views here. + + +@method_decorator(csrf_exempt, name='dispatch') +class JobsView(View): + def get(self, request: HttpRequest): + job: Job = Job.objects.filter(taken_at=None, result=None).first() + if job is None: + job = Job.objects.filter(result=None).order_by('taken_at').first() + if job is None: + return HttpResponse('done') + job.taken_at = timezone.now() + job.save() + return HttpResponse(f'{job.pk}') + + +@method_decorator(csrf_exempt, name='dispatch') +class JobView(View): + def get(self, request: HttpRequest, job_id: int): + job = get_object_or_404(Job, id=job_id) + job.taken_at = timezone.now() + job.save() + return JsonResponse(job.to_jsonable(), safe=False) + + def post(self, request, job_id): + job = get_object_or_404(Job, id=job_id) + job.result = json.dumps(json.loads(request.body)) + job.save() + return HttpResponse('ok') + + +class LogView(View): + model = Log + + def post(self, request: HttpRequest): + Logging = type(self).model + logging = Logging() + logging.sender = json.dumps(json.loads(request.POST['sender'])) + logging.content = json.dumps(json.loads(request.POST['content'])) + logging.save() + return JsonResponse(logging.to_jsonable(), safe=False) + + def get(self, request: HttpRequest): + return JsonResponse(list(map(ToJsonableMixin.to_jsonable, type(self).model.objects.all())), safe=False) + + +@method_decorator(csrf_exempt, name='dispatch') +class LogErrorView(LogView): + model = ErrorLog + + +@method_decorator(csrf_exempt, name='dispatch') +class LogPerformanceView(LogView): + model = PerformanceLog diff --git a/hash_thumbnailer_distributed/webproj/urls.py b/hash_thumbnailer_distributed/webproj/urls.py new file mode 100644 index 0000000..4eae7e9 --- /dev/null +++ b/hash_thumbnailer_distributed/webproj/urls.py @@ -0,0 +1,24 @@ +"""webproj URL Configuration + +The `urlpatterns` list routes URLs to views. For more information please see: + https://docs.djangoproject.com/en/3.0/topics/http/urls/ +Examples: +Function views + 1. Add an import: from my_app import views + 2. Add a URL to urlpatterns: path('', views.home, name='home') +Class-based views + 1. Add an import: from other_app.views import Home + 2. Add a URL to urlpatterns: path('', Home.as_view(), name='home') +Including another URLconf + 1. Import the include() function: from django.urls import include, path + 2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) +""" +from django.contrib import admin +from django.urls import path +from django.urls import include +import webproj.thumbnailer.urls + +urlpatterns = [ + path('admin/', admin.site.urls), + path('', include(webproj.thumbnailer.urls)), +] diff --git a/hash_thumbnailer_distributed/webproj/wsgi.py b/hash_thumbnailer_distributed/webproj/wsgi.py new file mode 100644 index 0000000..b78c077 --- /dev/null +++ b/hash_thumbnailer_distributed/webproj/wsgi.py @@ -0,0 +1,16 @@ +""" +WSGI config for webproj project. + +It exposes the WSGI callable as a module-level variable named ``application``. + +For more information on this file, see +https://docs.djangoproject.com/en/3.0/howto/deployment/wsgi/ +""" + +import os + +from django.core.wsgi import get_wsgi_application + +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'webproj.settings') + +application = get_wsgi_application() diff --git a/hash_thumbnailer_distributed/worker.py b/hash_thumbnailer_distributed/worker.py new file mode 100755 index 0000000..d50e3d7 --- /dev/null +++ b/hash_thumbnailer_distributed/worker.py @@ -0,0 +1,275 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + +import bz2 +import hashlib +import json +import lzma +import multiprocessing +import socket +import sys +import time +import traceback +import zlib +from concurrent.futures import Future, ProcessPoolExecutor +from io import BytesIO, StringIO +from pathlib import Path +from typing import Any, Callable, Dict, List, Tuple, Type + +import requests +import zstandard as zstd + +COMPRESSORS: List[Tuple[str, Callable[[bytes], bytes]]] = [ + ('uncompressed', lambda uncompressed: uncompressed), + ('zlib', lambda uncompressed: zlib.compress(uncompressed, 9)), + ('bz2', lambda uncompressed: bz2.compress(uncompressed, 9)), + ('lzma', lzma.compress), + ('zstd', lambda uncompressed: zstd.ZstdCompressor( + level=22).compress(uncompressed)), +] + + +class HashMismatch(Exception): + pass + + +def check_best_compressions(uncompressed: bytes) -> Dict[str, Tuple[int, float]]: + algos = dict() + for name, callback in COMPRESSORS: + time_start = time.time() + compressed_size = len(callback(uncompressed)) + time_end = time.time() + algos[name] = (compressed_size, time_end - time_start) + return algos + + +def hexhashof(bts: bytes, using: Callable[[], Any]) -> str: + m = using() + m.update(bts) + return m.hexdigest() + + +def upload_log(url, sender, **content): + sent = False + while not sent: + try: + requests.post(url, data={ + 'sender': json.dumps(sender), + 'content': json.dumps(content), + }).raise_for_status() + sent = True + except: + traceback.print_exc() + + +def upload_job(url, **content): + sent = False + while not sent: + try: + requests.post(url, json=content).raise_for_status() + sent = True + except: + traceback.print_exc() + + +def do_work(base_address: str, worker_id: str): + while True: + try: + tick_set = time.time() + job_id = None + try: + with requests.get(f'{base_address}job') as response: + response.raise_for_status() + job_id = response.text + except KeyboardInterrupt: + raise + except: + pass + if job_id is None: + continue + elif job_id == 'done': + break + else: + tick_downloading_job_started = time.time() + tick_downloading_job_retry_started = tick_downloading_job_started + tick_downloading_job_retry_count = 0 + job = None + while job is None: + try: + tick_downloading_job_retry_started = time.time() + with requests.get(f'{base_address}job/{job_id}') as response: + response.raise_for_status() + job = response.json() + except KeyboardInterrupt: + raise + except: + tick_downloading_job_retry_count += 1 + sio = StringIO() + traceback.print_exc(file=sio) + formatted_exception = sio.getvalue() + print(formatted_exception, file=sys.stderr) + upload_log( + f'{base_address}log/error', + worker_id, + during='JobDownload', + tick_set=tick_set, + traceback=formatted_exception, + job_id=job_id, + tick_downloading_job_started=tick_downloading_job_started, + tick_downloading_job_retry_started=tick_downloading_job_retry_started, + tick_downloading_job_retry_count=tick_downloading_job_retry_count, + ) + tick_downloading_job_ended = time.time() + tick_downloading_image_started = time.time() + tick_downloading_image_retry_started = tick_downloading_image_started + tick_downloading_image_retry_count = 0 + tick_downloading_image_retry_mismatch = 0 + image_bytes = None + while image_bytes is None: + try: + tick_downloading_job_retry_started = time.time() + with requests.get(f'{base_address}{job["file"]}') as response: + if response.status_code == 404: + break + response.raise_for_status() + response.raw.decode_content = True + if hexhashof(response.content, hashlib.sha256) == job['hsh']: + image_bytes = response.content + else: + raise HashMismatch() + except KeyboardInterrupt: + raise + except BaseException as exception: + tick_downloading_image_retry_count += 1 + if isinstance(exception, HashMismatch): + tick_downloading_image_retry_mismatch += 1 + sio = StringIO() + traceback.print_exc(file=sio) + formatted_exception = sio.getvalue() + print(formatted_exception, file=sys.stderr) + upload_log( + f'{base_address}log/error', + worker_id, + during='ImageDownload', + tick_set=tick_set, + traceback=formatted_exception, + job_id=job_id, + file=job["file"], + hash=job["hsh"], + tick_downloading_job_started=tick_downloading_job_started, + tick_downloading_job_retry_started=tick_downloading_job_retry_started, + tick_downloading_job_retry_count=tick_downloading_job_retry_count, + tick_downloading_job_ended=tick_downloading_job_ended, + tick_downloading_image_started=tick_downloading_image_started, + tick_downloading_image_retry_started=tick_downloading_image_retry_started, + tick_downloading_image_retry_count=tick_downloading_image_retry_count, + tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, + ) + if tick_downloading_image_retry_mismatch >= 10: + break + tick_downloading_image_ended = time.time() + if image_bytes is None: + upload_job( + f'{base_address}job/{job_id}', + status='NoValidImageData', + tick_set=tick_set, + job_id=job_id, + file=job["file"], + hash=job["hsh"], + tick_downloading_job_started=tick_downloading_job_started, + tick_downloading_job_retry_started=tick_downloading_job_retry_started, + tick_downloading_job_retry_count=tick_downloading_job_retry_count, + tick_downloading_job_ended=tick_downloading_job_ended, + tick_downloading_image_started=tick_downloading_image_started, + tick_downloading_image_retry_started=tick_downloading_image_retry_started, + tick_downloading_image_retry_count=tick_downloading_image_retry_count, + tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, + tick_downloading_image_ended=tick_downloading_image_ended, + ) + else: + tick_image_compress_start = time.time() + compressions = check_best_compressions(image_bytes) + tick_image_compress_ended = time.time() + tick_uploading_started = time.time() + upload_job( + f'{base_address}job/{job_id}', + status='Complete', + worker=worker_id, + tick_set=tick_set, + job_id=job_id, + file=job["file"], + hash=job["hsh"], + compressions=compressions, + tick_downloading_job_started=tick_downloading_job_started, + tick_downloading_job_retry_started=tick_downloading_job_retry_started, + tick_downloading_job_retry_count=tick_downloading_job_retry_count, + tick_downloading_job_ended=tick_downloading_job_ended, + tick_downloading_image_started=tick_downloading_image_started, + tick_downloading_image_retry_started=tick_downloading_image_retry_started, + tick_downloading_image_retry_count=tick_downloading_image_retry_count, + tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, + tick_downloading_image_ended=tick_downloading_image_ended, + tick_image_compress_start=tick_image_compress_start, + tick_image_compress_ended=tick_image_compress_ended, + ) + tick_uploading_ended = time.time() + upload_log( + f'{base_address}log/performance', + worker_id, + file=job["file"], + hash=job["hsh"], + tick_downloading_job_started=tick_downloading_job_started, + tick_downloading_job_retry_started=tick_downloading_job_retry_started, + tick_downloading_job_retry_count=tick_downloading_job_retry_count, + tick_downloading_job_ended=tick_downloading_job_ended, + tick_downloading_image_started=tick_downloading_image_started, + tick_downloading_image_retry_started=tick_downloading_image_retry_started, + tick_downloading_image_retry_count=tick_downloading_image_retry_count, + tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, + tick_downloading_image_ended=tick_downloading_image_ended, + tick_image_compress_start=tick_image_compress_start, + tick_image_compress_ended=tick_image_compress_ended, + tick_uploading_started=tick_uploading_started, + tick_uploading_ended=tick_uploading_ended, + ) + tick_uploading_ended = time.time() + print(f"Done: {job['hsh']}") + except KeyboardInterrupt: + raise + except: + raise + + +def kickstart(base_address: str): + job_count: int = multiprocessing.cpu_count() * 2 + # job_count = 1 + hostname: str = socket.gethostname() + with ProcessPoolExecutor(job_count) as pe: + for job_seq in range(job_count): + job_id = f'{hostname}-{job_seq}' + + def on_completed(job: Future): + job.result() + + pe.submit( + do_work, + worker_id=job_id, + base_address=base_address, + ).add_done_callback(on_completed) + print('Ready') + + +def main(): + if len(sys.argv) == 2: + base_address = sys.argv[1] + if not base_address.startswith('http'): + base_address = 'http://'+base_address + if not base_address.endswith('/'): + base_address += '/' + kickstart(base_address) + else: + print(f'Usage:\n {sys.argv[0]} ') + + +if __name__ == "__main__": + main() diff --git a/hash_thumbnailer_distributed/worker_thumbnailer.py b/hash_thumbnailer_distributed/worker_thumbnailer.py new file mode 100755 index 0000000..93fc31a --- /dev/null +++ b/hash_thumbnailer_distributed/worker_thumbnailer.py @@ -0,0 +1,422 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + +import hashlib +import json +import multiprocessing +import socket +import sys +import time +import traceback +from concurrent.futures import Future, ProcessPoolExecutor +from io import BytesIO, StringIO +from pathlib import Path +from typing import Any, Callable, Dict, List, Tuple, Type + +import requests +from PIL import Image, ImageDraw, ImageOps + + +class HashMismatch(Exception): + pass + + +START = -(sys.maxsize // 2) + 0 +END = -(sys.maxsize // 2) + 1 +CENTER = -(sys.maxsize // 2) + 2 + +POS_TOP_LEFT = (START, START) +POS_CENTER_LEFT = (START, CENTER) +POS_BOTTOM_LEFT = (START, END) + +POS_TOP_CENTER = (CENTER, START) +POS_CENTER_CENTER = (CENTER, CENTER) +POS_BOTTOM_CENTER = (CENTER, END) + +POS_TOP_RIGHT = (END, START) +POS_CENTER_RIGHT = (END, CENTER) +POS_BOTTOM_RIGHT = (END, END) + +P_TL = POS_TOP_LEFT +P_CL = POS_CENTER_LEFT +P_BL = POS_BOTTOM_LEFT + +P_TC = POS_TOP_CENTER +P_CC = POS_CENTER_CENTER +P_BC = POS_BOTTOM_CENTER + +P_TR = POS_TOP_RIGHT +P_CR = POS_CENTER_RIGHT +P_BR = POS_BOTTOM_RIGHT + + +def edge_propagation_scaling(image: Image.Image, + desired_size: Tuple[int, int], + paste_position: Tuple[int, int] = P_CC, + ) -> Image.Image: + image = image.copy() + scaled = Image.new(image.mode, desired_size) + new_placement_: List[int] = list() + for isz, ssz, pp in zip(image.size, scaled.size, paste_position,): + if pp == START: + new_placement_.append(0) + elif pp == END: + new_placement_.append(ssz-isz) + elif pp == CENTER: + new_placement_.append((ssz-isz)//2) + else: + new_placement_.append(pp) + new_placement: Tuple[int, int] = new_placement_[0], new_placement_[1] + del new_placement_ + scaled.paste(image, new_placement) + parts = dict( + # left, upper, right, lower + t=image.copy().crop( + (0, 0, image.size[0], 1)), + b=image.copy().crop( + (0, image.size[1]-1, image.size[0], image.size[1])), + l=image.copy().crop( + (0, 0, 1, image.size[1])), + r=image.copy().crop( + (image.size[0]-1, 0, image.size[0], image.size[1])), + ) + if (sz := new_placement[1]) > 0: + part = parts['t'].copy() + resized = part.resize((part.size[0], sz)) + scaled.paste(resized, (new_placement[0], 0)) + if (sz := scaled.size[1]-(dsp := new_placement[1]+image.size[1])) > 0: + part = parts['b'].copy() + resized = part.resize((part.size[0], sz)) + scaled.paste(resized, (new_placement[0], dsp)) + if (sz := new_placement[0]) > 0: + part = parts['l'].copy() + resized = part.resize((sz, part.size[1])) + scaled.paste(resized, (0, new_placement[1])) + if (sz := scaled.size[0]-(dsp := new_placement[0]+image.size[0])) > 0: + part = parts['r'].copy() + resized = part.resize((sz, part.size[1])) + scaled.paste(resized, (dsp, new_placement[1])) + del parts + corners = dict( + tl=image.getpixel((0, 0)), + tr=image.getpixel((image.size[0]-1, 0)), + bl=image.getpixel((0, image.size[1]-1)), + br=image.getpixel((image.size[0]-1, image.size[1]-1)), + ) + draw: ImageDraw.ImageDraw = ImageDraw.Draw(scaled) + szt = new_placement[1] + szb = scaled.size[1]-(dspb := new_placement[1]+image.size[1]) + szl = new_placement[0] + szr = scaled.size[0]-(dspr := new_placement[0]+image.size[0]) + if szt > 0 and szl > 0: + draw.rectangle(((0, 0), (szl-1, szt-1)), corners['tl']) + if szt > 0 and szr > 0: + draw.rectangle(((dspr, 0), (scaled.size[0], szt-1)), corners['tr']) + if szb > 0 and szl > 0: + draw.rectangle(((0, dspb), (szl-1, scaled.size[1])), corners['bl']) + if szb > 0 and szr > 0: + draw.rectangle(((dspr, dspb), scaled.size), corners['br']) + del dspr + del dspb + del szt + del szb + del szl + del szr + return scaled + + +def calculate_thumbnail_hashes(image: Image.Image) -> Dict[str, Dict[str, Dict[str, Dict[str, str]]]]: + out_dict: Dict[str, Dict[str, Dict[str, Dict[str, str]]]] = dict() + max_dimen = max(image.size) + for filling in [True, False]: + transparent_square = None + if filling: + transparent_square = edge_propagation_scaling(image, (max_dimen, max_dimen)) + else: + transparent_square = Image.new('RGBA', (max_dimen, max_dimen)) + transparent_square.paste(image, ( + (max_dimen - image.size[0]) // 2, + (max_dimen - image.size[1]) // 2, + )) + backgrounds: Dict[str, Dict[str, Dict[str, str]]] = dict() + for background in ['#000000', '#FFFFFF']: + backgrounded = Image.new('RGB', transparent_square.size, background) + backgrounded.paste(transparent_square) + sizes: Dict[str, Dict[str, str]] = dict() + for size in [4, 8, 16, 24, 32, 48, 64, 72, 96, 128]: + resized = backgrounded.copy() + resized = resized.resize((size, size)) + bit_depths: Dict[str, str] = dict() + for bit_depth in range(1, 9): + posterized: Image.Image = resized.copy() + posterized = ImageOps.posterize(posterized, bit_depth) + bio = BytesIO() + posterized.save(bio, format='BMP') + hashsum = hexhashof(bio.getvalue(), hashlib.md5) + bit_depths[str(bit_depth)] = hashsum + sizes[str(size)] = bit_depths + backgrounds[background] = sizes + out_dict['fill' if filling else 'center'] = backgrounds + return out_dict + + +def hexhashof(bts: bytes, using: Callable[[], Any]) -> str: + m = using() + m.update(bts) + return m.hexdigest() + + +def upload_log(url, sender, **content): + sent = False + while not sent: + try: + requests.post(url, data={ + 'sender': json.dumps(sender), + 'content': json.dumps(content), + }).raise_for_status() + sent = True + except: + traceback.print_exc() + + +def upload_job(url, **content): + sent = False + while not sent: + try: + requests.post(url, json=content).raise_for_status() + sent = True + except: + traceback.print_exc() + + +def do_work(base_address: str, worker_id: str): + while True: + try: + tick_set = time.time() + job_id = None + try: + job_id = requests.get(f'{base_address}job').text + except KeyboardInterrupt: + raise + except: + pass + if job_id is None: + continue + elif job_id == 'done': + break + else: + tick_downloading_job_started = time.time() + tick_downloading_job_retry_started = tick_downloading_job_started + tick_downloading_job_retry_count = 0 + job = None + while job is None: + try: + tick_downloading_job_retry_started = time.time() + with requests.get(f'{base_address}job/{job_id}') as response: + response.raise_for_status() + job = response.json() + except KeyboardInterrupt: + raise + except: + tick_downloading_job_retry_count += 1 + sio = StringIO() + traceback.print_exc(file=sio) + formatted_exception = sio.getvalue() + print(formatted_exception, file=sys.stderr) + upload_log( + f'{base_address}log/error', + worker_id, + during='JobDownload', + tick_set=tick_set, + traceback=formatted_exception, + job_id=job_id, + tick_downloading_job_started=tick_downloading_job_started, + tick_downloading_job_retry_started=tick_downloading_job_retry_started, + tick_downloading_job_retry_count=tick_downloading_job_retry_count, + ) + tick_downloading_job_ended = time.time() + tick_downloading_image_started = time.time() + tick_downloading_image_retry_started = tick_downloading_image_started + tick_downloading_image_retry_count = 0 + tick_downloading_image_retry_mismatch = 0 + image_bytes = None + while image_bytes is None: + try: + tick_downloading_job_retry_started = time.time() + with requests.get(f'{base_address}{job["file"]}') as response: + if response.status_code == 404: + break + response.raise_for_status() + response.raw.decode_content = True + if hexhashof(response.content, hashlib.sha256) == job['hsh']: + image_bytes = response.content + else: + raise HashMismatch() + except KeyboardInterrupt: + raise + except BaseException as exception: + tick_downloading_image_retry_count += 1 + if isinstance(exception, HashMismatch): + tick_downloading_image_retry_mismatch += 1 + sio = StringIO() + traceback.print_exc(file=sio) + formatted_exception = sio.getvalue() + print(formatted_exception, file=sys.stderr) + upload_log( + f'{base_address}log/error', + worker_id, + during='ImageDownload', + tick_set=tick_set, + traceback=formatted_exception, + job_id=job_id, + file=job["file"], + hash=job["hsh"], + tick_downloading_job_started=tick_downloading_job_started, + tick_downloading_job_retry_started=tick_downloading_job_retry_started, + tick_downloading_job_retry_count=tick_downloading_job_retry_count, + tick_downloading_job_ended=tick_downloading_job_ended, + tick_downloading_image_started=tick_downloading_image_started, + tick_downloading_image_retry_started=tick_downloading_image_retry_started, + tick_downloading_image_retry_count=tick_downloading_image_retry_count, + tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, + ) + if tick_downloading_image_retry_mismatch >= 10: + break + tick_downloading_image_ended = time.time() + if image_bytes is None: + upload_job( + f'{base_address}job/{job_id}', + status='NoValidImageData', + tick_set=tick_set, + job_id=job_id, + file=job["file"], + hash=job["hsh"], + tick_downloading_job_started=tick_downloading_job_started, + tick_downloading_job_retry_started=tick_downloading_job_retry_started, + tick_downloading_job_retry_count=tick_downloading_job_retry_count, + tick_downloading_job_ended=tick_downloading_job_ended, + tick_downloading_image_started=tick_downloading_image_started, + tick_downloading_image_retry_started=tick_downloading_image_retry_started, + tick_downloading_image_retry_count=tick_downloading_image_retry_count, + tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, + tick_downloading_image_ended=tick_downloading_image_ended, + ) + else: + tick_image_decoding_start = time.time() + image = None + try: + image = Image.open(BytesIO(image_bytes)).copy() + except KeyboardInterrupt: + raise + except: + pass + tick_image_decoding_ended = time.time() + if image is None: + upload_job( + f'{base_address}job/{job_id}', + status='ImageIsBroken', + tick_set=tick_set, + job_id=job_id, + file=job["file"], + hash=job["hsh"], + tick_downloading_job_started=tick_downloading_job_started, + tick_downloading_job_retry_started=tick_downloading_job_retry_started, + tick_downloading_job_retry_count=tick_downloading_job_retry_count, + tick_downloading_job_ended=tick_downloading_job_ended, + tick_downloading_image_started=tick_downloading_image_started, + tick_downloading_image_retry_started=tick_downloading_image_retry_started, + tick_downloading_image_retry_count=tick_downloading_image_retry_count, + tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, + tick_downloading_image_ended=tick_downloading_image_ended, + tick_image_decoding_start=tick_image_decoding_start, + tick_image_decoding_ended=tick_image_decoding_ended, + ) + else: + tick_image_thumbnailing_start = time.time() + calculated_thumbnail_hashes = calculate_thumbnail_hashes(image) + tick_image_thumbnailing_ended = time.time() + tick_uploading_started = time.time() + upload_job( + f'{base_address}job/{job_id}', + status='Complete', + tick_set=tick_set, + job_id=job_id, + file=job["file"], + hash=job["hsh"], + calculated_thumbnail_hashes=calculated_thumbnail_hashes, + tick_downloading_job_started=tick_downloading_job_started, + tick_downloading_job_retry_started=tick_downloading_job_retry_started, + tick_downloading_job_retry_count=tick_downloading_job_retry_count, + tick_downloading_job_ended=tick_downloading_job_ended, + tick_downloading_image_started=tick_downloading_image_started, + tick_downloading_image_retry_started=tick_downloading_image_retry_started, + tick_downloading_image_retry_count=tick_downloading_image_retry_count, + tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, + tick_downloading_image_ended=tick_downloading_image_ended, + tick_image_decoding_start=tick_image_decoding_start, + tick_image_decoding_ended=tick_image_decoding_ended, + tick_image_thumbnailing_start=tick_image_thumbnailing_start, + tick_image_thumbnailing_ended=tick_image_thumbnailing_ended, + ) + tick_uploading_ended = time.time() + upload_log( + f'{base_address}log/performance', + worker_id, + file=job["file"], + hash=job["hsh"], + tick_downloading_job_started=tick_downloading_job_started, + tick_downloading_job_retry_started=tick_downloading_job_retry_started, + tick_downloading_job_retry_count=tick_downloading_job_retry_count, + tick_downloading_job_ended=tick_downloading_job_ended, + tick_downloading_image_started=tick_downloading_image_started, + tick_downloading_image_retry_started=tick_downloading_image_retry_started, + tick_downloading_image_retry_count=tick_downloading_image_retry_count, + tick_downloading_image_retry_mismatch=tick_downloading_image_retry_mismatch, + tick_downloading_image_ended=tick_downloading_image_ended, + tick_image_decoding_start=tick_image_decoding_start, + tick_image_decoding_ended=tick_image_decoding_ended, + tick_image_thumbnailing_start=tick_image_thumbnailing_start, + tick_image_thumbnailing_ended=tick_image_thumbnailing_ended, + tick_uploading_started=tick_uploading_started, + tick_uploading_ended=tick_uploading_ended, + ) + tick_uploading_ended = time.time() + print(f"Done: {job['hsh']}") + except KeyboardInterrupt: + raise + except: + raise + + +def kickstart(base_address: str): + job_count: int = multiprocessing.cpu_count() * 2 + # job_count = 1 + hostname: str = socket.gethostname() + with ProcessPoolExecutor(job_count) as pe: + for job_seq in range(job_count): + job_id = f'{hostname}-{job_seq}' + + def on_completed(job: Future): + job.result() + + pe.submit( + do_work, + worker_id=job_id, + base_address=base_address, + ).add_done_callback(on_completed) + + +def main(): + if len(sys.argv) == 2: + base_address = sys.argv[1] + if not base_address.startswith('http'): + base_address = 'http://'+base_address + if not base_address.endswith('/'): + base_address += '/' + kickstart(base_address) + else: + print(f'Usage:\n {sys.argv[0]} ') + + +if __name__ == "__main__": + main() diff --git a/prunedownloads.py b/prunedownloads.py new file mode 100755 index 0000000..5dc8000 --- /dev/null +++ b/prunedownloads.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + +from reddit_imgs.download_pruner import main + +if __name__ == '__main__': + main() diff --git a/reddit_imgs/condensate_hashes.py b/reddit_imgs/condensate_hashes.py new file mode 100755 index 0000000..3ca64d4 --- /dev/null +++ b/reddit_imgs/condensate_hashes.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + +import datetime +import json +import multiprocessing +from pathlib import Path +from typing import Any, Collection, Dict, FrozenSet, List, Optional, Tuple + +import colored as clrlib + +from .system.cmdline_parser import parse_cmdline +from .system.flattener import flatten_generator +from .system.format_file_size import format_power10 + +HISTORICAL_EXPORT = False +EXTENSION_FILTER = None +NSFW_NESS_FILTER = None +SUBREDDIT_FILTER = None + + +def cmdline(encoded_args: str = None): + if encoded_args is None: + return run_with_config() + else: + return parse_cmdline(run_with_config, encoded_args) + + +def run_with_config(historical: bool = False, + nsfw_ness_filter: bool = None, + extension_filter: list = None, + subreddit_filter: frozenset = None): + global HISTORICAL_EXPORT + global EXTENSION_FILTER + global NSFW_NESS_FILTER + global SUBREDDIT_FILTER + EXTENSION_FILTER = extension_filter + HISTORICAL_EXPORT = historical + NSFW_NESS_FILTER = nsfw_ness_filter + SUBREDDIT_FILTER = (None + if subreddit_filter is None else + frozenset(sr.lower() for sr in subreddit_filter)) + return main() + + +class ExtensionFilter(multiprocessing.Process): + def __init__(self, + allowed_extensions: Collection[str], + input_queue: multiprocessing.Queue, + output_queue: multiprocessing.Queue): + multiprocessing.Process.__init__(self) + self.allowed_extensions = allowed_extensions + self.input_queue = input_queue + self.output_queue = output_queue + + def run(self): + while True: + next_item: str = self.input_queue.get() + if next_item is None: + self.output_queue.put(None) + break + if self.allowed_extensions is None: + self.output_queue.put(next_item) + elif Path(next_item).suffix in self.allowed_extensions: + self.output_queue.put(next_item) + + +class NsfwNessFilter(multiprocessing.Process): + def __init__(self, + nsfw_ness: Optional[bool], + file2link: Dict[str, List[str]], + link2post: Dict[str, List[str]], + post_info: Dict[str, Dict[str, Any]], + input_queue: multiprocessing.Queue, + output_queue: multiprocessing.Queue): + multiprocessing.Process.__init__(self) + self.nsfw_ness = nsfw_ness + self.file2link = file2link + self.link2post = link2post + self.post_info = post_info + self.input_queue = input_queue + self.output_queue = output_queue + + def run(self): + while True: + next_item: str = self.input_queue.get() + if next_item is None: + self.output_queue.put(None) + break + if ( + (self.nsfw_ness is None) + or + all(map( + lambda post: ( + self + .post_info + .get(post, {}) + .get('nsfw', None) + == + self.nsfw_ness + ), + flatten_generator(map( + lambda link: self.link2post.get(link, []), + self.file2link.get(next_item, []) + )) + )) + ): + self.output_queue.put(next_item) + + +class SubredditFilter(multiprocessing.Process): + def __init__(self, + subreddits: Optional[FrozenSet[str]], + file2link: Dict[str, List[str]], + link2post: Dict[str, List[str]], + post_info: Dict[str, Dict[str, Any]], + input_queue: multiprocessing.Queue, + output_queue: multiprocessing.Queue): + multiprocessing.Process.__init__(self) + self.subreddits = subreddits + self.file2link = file2link + self.link2post = link2post + self.post_info = post_info + self.input_queue = input_queue + self.output_queue = output_queue + + def run(self): + while True: + next_item: str = self.input_queue.get() + if next_item is None: + self.output_queue.put(None) + break + if ( + (self.subreddits is None) + or + all(map( + lambda subreddit: subreddit in self.subreddits, + flatten_generator(map( + lambda post: ( + self + .post_info + .get(post, {}) + .get('subreddits', []) + ), + flatten_generator(map( + lambda link: self.link2post.get(link, []), + self.file2link.get(next_item, []) + )) + )) + )) + ): + self.output_queue.put(next_item) + + +class FileExistsFilter(multiprocessing.Process): + def __init__(self, + input_queue: multiprocessing.Queue, + output_queue: multiprocessing.Queue): + multiprocessing.Process.__init__(self) + self.input_queue = input_queue + self.output_queue = output_queue + + def run(self): + while True: + next_item: str = self.input_queue.get() + if next_item is None: + self.output_queue.put(None) + break + if Path(next_item).exists(): + self.output_queue.put(next_item) + + +def main(): + exported_name = 'h_gdl.txt' if not HISTORICAL_EXPORT else f'h_gdl_{datetime.datetime.now()}.txt' + exported_path = Path(exported_name) + exported_path.write_text('') + hashes_list: List[Tuple[str, str]] = list(map( + lambda a: (a[1], a[0]), + map( + lambda a: a.split('|', 1), + Path('i_gdl_hashes.txt').read_text().splitlines()))) + hashes_dict: Dict[str, str] = dict(hashes_list) + r_gdl_p = json.loads(Path('r_gdl_p.json').read_text()) + r_gdl_lp = json.loads(Path('r_gdl_lp.json').read_text()) + i_gdl_lff = json.loads(Path('i_gdl_lff.json').read_text()) + file_sizes = json.loads(Path('i_gdl_fsz.json').read_text()) + general_size = 0 + general_count = 0 + existing_files_queue = multiprocessing.Queue() + intermediary_queue = multiprocessing.Queue() + intermediary_queue2 = multiprocessing.Queue() + remaining_queue = multiprocessing.Queue() + ffp = ExtensionFilter(EXTENSION_FILTER, existing_files_queue, + intermediary_queue) + nfp = NsfwNessFilter(NSFW_NESS_FILTER, i_gdl_lff, r_gdl_lp, r_gdl_p, + intermediary_queue, intermediary_queue2) + srp = SubredditFilter(SUBREDDIT_FILTER, i_gdl_lff, r_gdl_lp, r_gdl_p, + intermediary_queue2, remaining_queue) + # fep = FileExistsFilter(intermediary_queue, remaining_queue) + ffp.start() + nfp.start() + srp.start() + # fep.start() + for file, _ in hashes_list: + existing_files_queue.put(file) + existing_files_queue.put(None) + known_hashes = set() + with exported_path.open('at') as fd: + while True: + next_item: str = remaining_queue.get() + if next_item is None: + break + else: + hsh = hashes_dict[next_item] + known_hashes.add(hsh) + fd.write(f'{hsh}|{next_item}\n') + general_size += file_sizes.get(next_item, 0) + general_count += 1 + ffp.join() + nfp.join() + srp.join() + # fep.join() + existing_files_queue.close() + existing_files_queue.join_thread() + intermediary_queue.close() + intermediary_queue.join_thread() + intermediary_queue2.close() + intermediary_queue2.join_thread() + remaining_queue.close() + remaining_queue.join_thread() + print(f'Found {general_count} files') + print(f'Found {len(known_hashes)} unique hashes') + print(f'Size: {general_size} bytes ({format_power10(general_size)})') + + +if __name__ == "__main__": + HISTORICAL_EXPORT = True + main() diff --git a/reddit_imgs/display_fetch_futures.py b/reddit_imgs/display_fetch_futures.py index fcfc755..890cbfe 100644 --- a/reddit_imgs/display_fetch_futures.py +++ b/reddit_imgs/display_fetch_futures.py @@ -6,13 +6,20 @@ import importlib import shutil import time import traceback +from io import StringIO from pathlib import Path from time import sleep +from typing import Dict, List, Union import colored +from .system.format_file_size import Numeric, format_power2, format_power10 + +Numeric = Union[int, float] ESC = '\033' LINK_MEMORY_SIZE = 64 +JOB_BYTES_MEMORY_SIZE = 4 +JOB_LINKS_MEMORY_SIZE = 8 def reverse_mapping_list(d: dict) -> dict: @@ -44,12 +51,24 @@ def print_terminal(workers_state_path: Path, keep_to_next_cycle=None): f'Waiting for jobs @ {datetime.datetime.now()}', end='', flush=True) print(ESC+'[K', flush=True) return + jobs_sequenced_by_guess = sorted( + jobs, + key=lambda a: ( + 0 if ':' not in a else int(a.split(':', 1)[1]), + a, + ) + ) + jobs_state_description_content = dict() jobs_lines = dict() jobs_queues = dict() jobs_enqueueds = dict() jobs_states = dict() jobs_dates = dict() + jobs_latest_link = dict() + start_date = datetime.datetime.now() try: + start_date = datetime.datetime.fromtimestamp( + workers_state_path.stat().st_mtime) jobs_lines = { job: int( workers_state_path.joinpath(job+'=line') @@ -57,24 +76,34 @@ def print_terminal(workers_state_path: Path, keep_to_next_cycle=None): ) for job in jobs } + while len(jobs_sequenced_by_guess) > 0: + job, *jobs_sequenced_by_guess = jobs_sequenced_by_guess + if job not in jobs_lines: + jobs_lines[job] = max([-1] + list(jobs_lines.values())) + 1 + del job + del jobs_sequenced_by_guess + jobs_state_description_content = { + job: workers_state_path.joinpath(job).read_text() + for job in jobs + } jobs_queues = { job: int( - workers_state_path.joinpath(job) - .read_text().split(':')[1] + jobs_state_description_content[job] + .split(':')[1] ) for job in jobs } jobs_enqueueds = { job: int( - workers_state_path.joinpath(job) - .read_text().split(':')[2] + jobs_state_description_content[job] + .split(':')[2] ) for job in jobs } jobs_states = { job: ( - workers_state_path.joinpath(job) - .read_text().split(':')[0] + jobs_state_description_content[job] + .split(':')[0] ) for job in jobs } @@ -87,21 +116,30 @@ def print_terminal(workers_state_path: Path, keep_to_next_cycle=None): } jobs_files = { job: int( - workers_state_path.joinpath(job) - .read_text().split(':')[4] + jobs_state_description_content[job] + .split(':')[4] ) for job in jobs } jobs_bytes = { job: int( - workers_state_path.joinpath(job) - .read_text().split(':')[3] + jobs_state_description_content[job] + .split(':')[3] ) for job in jobs } + jobs_latest_link = { + job: (None + if len(parts := jobs_state_description_content[job].split(':', 6)) < 6 + else parts[5]) + for job in jobs + } except KeyboardInterrupt: raise except: + # print(ESC+'[2J', end='', flush=True) + # print(ESC+'[0;0H', end='', flush=True) + # traceback.print_exc() return keep_to_next_cycle # print(ESC+'[2J', end='', flush=True) print(ESC+'[0;0H', end='', flush=True) @@ -115,10 +153,18 @@ def print_terminal(workers_state_path: Path, keep_to_next_cycle=None): str(jobs_queues[job]) ) for job in jobs_lines.keys()} - max_job_size = max([*jobs_sizes.values(), 0]) + state_sequence = [ + ('finished',), + ('running', 'scrubbing'), + ('enqueued',), + ('waiting',), + ('failed',), + ] + block_sequence = ' \u2581\u2582\u2583\u2584\u2585\u2586\u2587\u2588' state_stats = frequency_dict(list(jobs_states.values())) links_stats = dict(waiting=0, enqueued=0, scrubbing=0, - running1=0, running2=0, finished=0) + running1=0, running2=0, finished=0, + failed=0) for job in jobs: state = jobs_states[job] jq1 = jobs_queues.get(job, 0) @@ -128,14 +174,12 @@ def print_terminal(workers_state_path: Path, keep_to_next_cycle=None): links_stats[state] = 0 links_stats[state] += jq1 elif state == 'scrubbing': - if state not in links_stats: - links_stats[state] = 0 - links_stats[state] += jq2 + links_stats['running1'] += jq2 + links_stats['running2'] += jq1-jq2 else: links_stats['running1'] += jq2 links_stats['running2'] += jq1-jq2 term_sz = shutil.get_terminal_size() - per_column = term_sz.columns//(max_job_size+1) jobs_sorted = list(map(lambda a: a[1], sorted( reverse_mapping(jobs_lines).items()))) jobsdates_list = list(reversed(sorted(jobs_dates.values()))) @@ -172,10 +216,68 @@ def print_terminal(workers_state_path: Path, keep_to_next_cycle=None): colored.bg(clr) for clr in bg_rank_color_names ] - bg_rank = bg_rank[-state_stats.get('running', 1):] + bg_rank = bg_rank[-(max( + 1, + state_stats.get('running', 0) + state_stats.get('scrubbing', 0) + )):] bg_rang_programmed_len = len(bg_rank) bg_rank += ['']*(len(jobs_dates)-len(bg_rank)) + # jobs_timestamps = keep_to_next_cycle.get( + # 'jobs_timestamps', dict()) + # keep_to_next_cycle['jobs_timestamps'] = jobs_timestamps + # for job, state in jobs_states.items(): + # if state in ('running', 'scrubbing'): + # if (db := (jobs_dates[job], jobs_enqueueds[job])) not in (jqh := jobs_timestamps.get(job, list())): + # jqh.append(db) + # jobs_timestamps[job] = jqh[-JOB_LINKS_MEMORY_SIZE:] + # else: + # if job in jobs_timestamps: + # del jobs_timestamps[job] + + # jobs_timestamps_transitions = dict() + # jobs_links_avgs = list() + # jobs_links_vars = list() + # for job, timestamps_history in jobs_timestamps.items(): + # job_timestamp_transitions = list(map( + # lambda b: tuple(map( + # lambda a: a[1] - a[0], + # zip(*tuple(map( + # lambda a: (a[0]/10**9, -a[1]), + # b + # ))))), + # zip( + # [timestamps_history[0]] + timestamps_history, + # timestamps_history + [timestamps_history[-1]], + # )))[1:-1] + # job_timestamp = list(map( + # lambda a: a[1]/a[0], + # filter(lambda a: a[0]!=0 and a[1]!=0, job_timestamp_transitions) + # )) + # if len(job_timestamp) > 0: + # job_timestamp_avg = sum(job_timestamp)/len(job_timestamp) + # jobs_links_avgs.append(job_timestamp_avg) + # diff = list(map( + # lambda lpd: (lpd - job_timestamp_avg), + # job_timestamp + # )) + # diffsqd = list(map( + # lambda d: d**2, + # diff + # )) + # job_timestamp_var = sum(diffsqd)/len(diffsqd) + # jobs_links_vars.append(job_timestamp_var) + # job_links_avg = sum(jobs_links_avgs) + # job_links_var = .0 + # if len(jobs_links_vars) > 0: + # jobs_links_var = sum(jobs_links_vars) + # links_per_sec = sum(map(lambda a: 1/max(2**-30, a), jobs_links_avgs)) + # print(jobs_links_avgs, ESC+'[K') + # print(jobs_links_vars, ESC+'[K') + + # print(jobs_timestamps_transitions) + # return keep_to_next_cycle + link_processing_timestamps = keep_to_next_cycle.get( 'link_processing_timestamps', list()) for link_processing_timestamp in jobs_dates.values(): @@ -193,7 +295,6 @@ def print_terminal(workers_state_path: Path, keep_to_next_cycle=None): )))[0:-1] link_processing_deltas_avg = sum( link_processing_deltas+[0])/max(1, len(link_processing_deltas)) - links_per_sec = 1/max(2**-30, link_processing_deltas_avg) link_processing_deltas_var = 0 if (l := len(link_processing_deltas)) > 0: @@ -207,13 +308,17 @@ def print_terminal(workers_state_path: Path, keep_to_next_cycle=None): )) link_processing_deltas_var = sum(diffsqd)/l + job_links_avg = link_processing_deltas_avg + job_links_var = link_processing_deltas_var + download_pending_count = ( links_stats.get("waiting", 0) + links_stats.get("enqueued", 0) + links_stats.get("running1", 0) ) - seconds_to_finish = download_pending_count*link_processing_deltas_avg + links_per_sec = 1/max(2**-30, job_links_avg) + seconds_to_finish = download_pending_count*job_links_avg datetime_when_finished = datetime.datetime.now( ) + datetime.timedelta(seconds=seconds_to_finish) @@ -224,13 +329,15 @@ def print_terminal(workers_state_path: Path, keep_to_next_cycle=None): seconds_to_finish % 60, ) - displayatbottom += f'Speed: {"%.3f" % links_per_sec} links/s | ' + displayatbottom += f'Speed: {"%6.3f" % links_per_sec} links/s | ' + displayatbottom += f'Speed: ##BYTESPERSEC## | ' displayatbottom += f'ETA: {time_to_finish} | ' displayatbottom += f'ETL: {datetime_when_finished} | ' - displayatbottom += f'Error: \u00b1{"%6.2f" % (100*(link_processing_deltas_var**.5)/link_processing_deltas_avg)}%' + displayatbottom += f'Error: \u00b1{"%6.2f" % (100*(job_links_var**.5)/job_links_avg)}%' + # displayatbottom += f'Error: \u00b1{"%6.2f" % (100*(job_links_var**.5)/job_links_avg)}%' # displayatbottom += str(link_processing_deltas) - number1colors = dict( + number1colors: Dict[str, List[str]] = dict( waiting=[ colored.fg('light_gray'), colored.attr('dim'), @@ -248,73 +355,152 @@ def print_terminal(workers_state_path: Path, keep_to_next_cycle=None): colored.fg('light_green') ], ) - number2colors = number1colors.copy() - number2colors['running1'] = number2colors['running'] - number2colors['running'] = [ - colored.fg('light_cyan'), - ] - number2colors['scrubbing'] = [ - colored.fg('light_magenta'), - ] + number1colors['failed'] = number1colors['waiting'] + number2colors: Dict[str, List[str]] = number1colors.copy() + number2colors['running'] = [colored.fg('light_cyan')] + number2colors['scrubbing'] = [colored.fg('light_magenta')] + number2colors['running1'] = number1colors['running'] number2colors['running2'] = number2colors['running'] + + workers_text_stats = ( + colored.stylize( + f'{state_stats.get("waiting", 0)} waiting', + number1colors['waiting'], + ) + + ' - ' + + colored.stylize( + f'{state_stats.get("enqueued", 0)} enqueued', + number1colors['enqueued'], + ) + + ' - ' + + colored.stylize( + f'{state_stats.get("running", 0)} running', + number1colors['running'], + ) + + ' \u00b7 ' + + colored.stylize( + f'{state_stats.get("scrubbing", 0)} scrubbing', + number1colors['scrubbing'], + ) + + ' - ' + + colored.stylize( + f'{state_stats.get("finished", 0)} finished', + number1colors['finished'], + ) + ) + + links_text_stats = ( + colored.stylize( + f'{links_stats.get("waiting", 0)} w.', + number2colors['waiting'], + ) + + ' - ' + + colored.stylize( + f'{links_stats.get("enqueued", 0)} e.', + number2colors['enqueued'], + ) + + ' - ' + + colored.stylize( + f'{links_stats.get("running1", 0)} staging', + number2colors['running1'], + ) + + ' \u00b7 ' + + colored.stylize( + f'{links_stats.get("running2", 0)} downloaded', + number2colors['running2'], + ) + + ' - ' + + colored.stylize( + f'{links_stats.get("finished", 0)} f.', + number2colors['finished'], + ) + ) + + text_stats_size = max(map(len, [workers_text_stats, links_text_stats])) + text_stats_fmt = '{0:^%d}' % text_stats_size + workers_text_stats = text_stats_fmt.format(workers_text_stats) + links_text_stats = text_stats_fmt.format(links_text_stats) + + dwldcount = links_stats.get("running2", 0) + links_stats.get("finished", 0) + filecount = sum(jobs_files.values()) + bytecount = sum(jobs_bytes.values()) + + worker_ratio = state_stats.get( + "finished", 0)/(max(1, sum(state_stats.values()))) + links_ratio = (links_stats.get("running2", 0) + + links_stats.get("finished", 0))/max(1, sum(links_stats.values())) + + jobs_bytes_history = keep_to_next_cycle.get( + 'jobs_bytes_history', dict()) + keep_to_next_cycle['jobs_bytes_history'] = jobs_bytes_history + for job, state in jobs_states.items(): + if state in ('running', 'scrubbing'): + if (db := (jobs_dates[job], jobs_bytes[job])) not in (jbh := jobs_bytes_history.get(job, list())): + jbh.append(db) + jobs_bytes_history[job] = jbh[-JOB_LINKS_MEMORY_SIZE:] + else: + if job in jobs_bytes_history: + del jobs_bytes_history[job] + + jobs_speeds_avg = list() + for job, bytes_history in jobs_bytes_history.items(): + job_byte_transitions = list(map( + lambda b: tuple(map( + lambda a: a[1] - a[0], + zip(*tuple(map( + lambda a: (a[0]/10**9, a[1]), + b + ))))), + zip( + [bytes_history[0]] + bytes_history, + bytes_history + [bytes_history[-1]], + )))[1:-1] + job_speed = list(map( + lambda a: a[1]/a[0], + filter(lambda a: a[0] != 0 and a[1] != 0, job_byte_transitions) + )) + if len(job_speed) > 0: + job_speed_avg = sum(job_speed)/len(job_speed) + jobs_speeds_avg.append(job_speed_avg) + jobs_speed_avg = sum(jobs_speeds_avg) + # return keep_to_next_cycle + + displayatbottom = displayatbottom.replace( + "##BYTESPERSEC##", + "%10s/s \u00b7 %10sps" % ( + str(format_power2(jobs_speed_avg)), + str(format_power10(jobs_speed_avg*8, suffix='b')), + ), + ) + print( - f'# Monitoring {len(jobs)} jobs @ {datetime.datetime.now()}', end='', flush=True) - print(ESC+'[K', flush=True) + f'# Monitoring {len(jobs)} jobs ' + + f'@ {datetime.datetime.now()} ' + + f'@ {datetime.datetime.now() - start_date}' + + '', + end='', flush=True) print(ESC+'[K', flush=True) + # print(ESC+'[K', flush=True) print('Workers: ' + - '%.4f%% | ' % (100*state_stats.get("finished", 0)/(max(1, sum(state_stats.values())))) + - colored.stylize( - f'{state_stats.get("waiting", 0)} waiting', - number1colors['waiting'], - ) + - ' - ' + - colored.stylize( - f'{state_stats.get("enqueued", 0)} enqueued', - number1colors['enqueued'], - ) + - ' - ' + - colored.stylize( - f'{state_stats.get("running", 0)} running', - number1colors['running'], - ) + - ' - ' + - colored.stylize( - f'{state_stats.get("finished", 0)} finished', - number1colors['finished'], - ), + '%8.4f%% | ' % (100*worker_ratio) + + workers_text_stats + + ' | ' + + f'Current: ' + + f'{dwldcount} links, ' + + f'{filecount} files, ' + + f'{format_power10(bytecount)}' + + '', end='') print(ESC+'[K', flush=True) print('Links: ' + - '%.4f%% | ' % (100*(links_stats.get("running2", 0)+links_stats.get("finished", 0))/(max(1, sum(links_stats.values())))) + - colored.stylize( - f'{links_stats.get("waiting", 0)} w.', - number2colors['waiting'], - ) + - ' - ' + - colored.stylize( - f'{links_stats.get("enqueued", 0)} e.', - number2colors['enqueued'], - ) + - ' - ' + - colored.stylize( - f'{links_stats.get("running1", 0)} s.', - number2colors['running1'], - ) + - ' \u00b7 ' + - colored.stylize( - f'{links_stats.get("scrubbing", 0)} s.', - number2colors['scrubbing'], - ) + - ' \u00b7 ' + - colored.stylize( - f'{links_stats.get("running2", 0)} d.', - number2colors['running2'], - ) + - ' - ' + - colored.stylize( - f'{links_stats.get("finished", 0)} f.', - number2colors['finished'], - ), + '%8.4f%% | ' % (100*links_ratio) + + links_text_stats + + ' | ' + + f'Expected: ' + + '%.3f fl/lnk, ' % (filecount/max(1, dwldcount)) + + f'{int(filecount/max(2**-30, links_ratio))} files, ' + + f'{format_power10(int(bytecount/max(2**-30, links_ratio)))}' + + '', end='') print(ESC+'[K', flush=True) print('Latest updates gradient: ', end='') @@ -329,31 +515,71 @@ def print_terminal(workers_state_path: Path, keep_to_next_cycle=None): print(('%%0%dd' % bg_rang_programmed_len_digits) % (i+1), end='') print(' ', end='') print(colored.attr('reset'), end='') - + # print(ESC+'[K', flush=True) print(ESC+'[K', flush=True) + print('Overall progress: ', end='') + for current_job in jobs_sorted: + current_state = jobs_states[current_job] + number1color = number1colors.get(current_state, list()) + number2color = number2colors.get(current_state, list()) + current_block_no = ((len(block_sequence)-1)*( + jobs_queues[current_job]-jobs_enqueueds[current_job]))//max(1, jobs_queues[current_job]) + print( + colored.stylize( + block_sequence[current_block_no], + [color.replace("38;5;", "48;5;") for color in number1color] + + number2color + ), + end='' + ) print(ESC+'[K', flush=True) - current_jobs = jobs_sorted.copy() - while len(current_jobs) > 0: - for _ in range(per_column): - if len(current_jobs) > 0: - current_job, *current_jobs = current_jobs - current_state = jobs_states[current_job] - number1color = number1colors.get(current_state, '') - number2color = number2colors.get(current_state, '') - print(''.join(number1color), end='') - print(bg_rank[jobs_daterank[current_job]], end='') - print(' '*(max_job_size-jobs_sizes[current_job]), end='') - print(current_job, end='') - print('@', end='') - if current_state in ('running', 'scrubbing'): - print(''.join(number2color), end='') - print(str(jobs_enqueueds[current_job]), end='') + max_job_size = max([*jobs_sizes.values(), 0]) + per_column = term_sz.columns//(max_job_size+1) + for stateelems in state_sequence: + current_jobs = [job + for job in jobs_sorted + if jobs_states[job] in stateelems] + if len(current_jobs) <= 0: + continue + print('» ', end='') + print(' & '.join([ + colored.stylize(stateelem, number1colors[stateelem]) + for stateelem in stateelems + ]), + end='' + ) + print(' «', end='') + print(ESC+'[K', flush=True) + # print(stateelem) + max_job_size = max([0]+[jobs_sizes[job] for job in current_jobs]) + per_column = term_sz.columns//(max_job_size+1) + current_jobs = sorted( + current_jobs, + key=lambda j: ( + jobs_enqueueds[j]/max(0.00000001, jobs_queues[j]), + jobs_lines[j],) + ) + # current_jobs = jobs_sorted.copy() + while len(current_jobs) > 0: + for _ in range(per_column): + if len(current_jobs) > 0: + current_job, *current_jobs = current_jobs + current_state = jobs_states[current_job] + number1color = number1colors.get(current_state, list()) + number2color = number2colors.get(current_state, list()) print(''.join(number1color), end='') - print('/', end='') - print(str(jobs_queues[current_job]), end='') - print(colored.attr('reset'), end='') - print(' ', end='') - print(ESC+'[K', flush=False) + print(bg_rank[jobs_daterank[current_job]], end='') + print(' '*(max_job_size-jobs_sizes[current_job]), end='') + print(current_job, end='') + print('@', end='') + print(str(jobs_queues[current_job]), end='') + if current_state in ('running', 'scrubbing'): + print('/', end='') + print(''.join(number2color), end='') + print(str(jobs_enqueueds[current_job]), end='') + print(colored.attr('reset'), end='') + print(' ', end='') + print(ESC+'[K', flush=False) print(displayatbottom, end=ESC+'[K', flush=True) print(ESC+'[0J', end='', flush=True) print(ESC+'[0;0H', end='', flush=True) @@ -400,6 +626,9 @@ def main(): print(ESC+'[2J', end='', flush=True) print(ESC+'[0;0H', end='', flush=True) traceback.print_exc() + sio = StringIO() + traceback.print_exc(file=sio) + Path('display_fetch_futures.trace').write_text(sio.getvalue()) from_exc = True sleep(1) selfmodule.do_cycle_sleep() diff --git a/reddit_imgs/download_pruner.py b/reddit_imgs/download_pruner.py new file mode 100644 index 0000000..87e24dd --- /dev/null +++ b/reddit_imgs/download_pruner.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + +import json +from pathlib import Path +from .system.cmdline_parser import parse_cmdline +from typing import Tuple, List, FrozenSet +import colored as clrlib +import multiprocessing + + +def cmdline(encoded_args: str = None): + if encoded_args is None: + return run_with_config() + else: + return parse_cmdline(run_with_config, encoded_args) + + +def run_with_config(): + main() + + +def flatten(list_of_lists): + return [item for sublist in list_of_lists for item in sublist] + + +class FileDeleter(multiprocessing.Process): + def __init__(self, + file_queue: multiprocessing.Queue): + multiprocessing.Process.__init__(self) + self.file_queue = file_queue + + def run(self): + proc_name = self.name + while True: + next_file: Path = self.file_queue.get() + if next_file is None: + # Poison pill means shutdown + print(f'{proc_name}: Exiting') + break + print(clrlib.stylize(f'Deleted at {proc_name}: {next_file}', [clrlib.fg('light_magenta')])) + next_file.unlink() + return + + +class FileDeletionChecker(multiprocessing.Process): + def __init__(self, + allowed_files: FrozenSet[str], + check_queue: multiprocessing.Queue, + deletion_queue: multiprocessing.Queue): + multiprocessing.Process.__init__(self) + self.check_queue = check_queue + self.deletion_queue = deletion_queue + self.allowed_files = allowed_files + + def run(self): + proc_name = self.name + while True: + next_file: str = self.check_queue.get() + if next_file is None: + # Poison pill means shutdown + print(f'{proc_name}: Exiting') + # Forward poison pill + self.deletion_queue.put(None) + break + if next_file not in self.allowed_files: + print(clrlib.stylize(f'Segregating for deletion at {proc_name}: {next_file}', [ + clrlib.fg('red'), + ])) + self.deletion_queue.put(Path(next_file)) + # else: + # print(clrlib.stylize(f'File will be kept at {proc_name}: {next_file}', [ + # clrlib.fg('green'), + # ]), end='\r', flush=True) + + +def main(): + print(clrlib.stylize('Loading file list...', [clrlib.fg('light_cyan'), clrlib.attr('bold'), ])) + file_list_json_text = Path('i_gdl_ffl.json').read_text() + print(clrlib.stylize('Parsing file list...', [clrlib.fg('light_cyan'), ])) + file_list_json_obj = json.loads(file_list_json_text) + print(clrlib.stylize('Processing file list...', [clrlib.fg('light_cyan'), ])) + files: List[str] = list(map(Path, set(flatten(file_list_json_obj.values())))) + files: List[str] = sorted(files) + for file in files.copy(): + files.append(Path(str(file)+'.json')) + files.append(Path('i_gdl/.cookies')) + # print(clrlib.stylize('Making paths absolute...', [clrlib.fg('light_cyan'), ])) + files: List[str] = sorted(list(map(str, files))) + files_tuple: Tuple[str] = tuple(files) + files_frozenset: FrozenSet[str] = frozenset(files_tuple) + print(clrlib.stylize('Starting to prune files from filesystem...', [clrlib.fg('light_cyan'), clrlib.attr('bold'), ])) + file_checking_for_deletion_queue = multiprocessing.Queue() + file_deletion_queue = multiprocessing.Queue() + processFileDeleter = FileDeleter(file_deletion_queue) + processFileDeletionChecker = FileDeletionChecker(files_frozenset, file_checking_for_deletion_queue, file_deletion_queue) + processFileDeleter.start() + processFileDeletionChecker.start() + recursive_file_prunning(Path('i_gdl'), file_checking_for_deletion_queue) + file_checking_for_deletion_queue.put(None) + processFileDeleter.join() + processFileDeletionChecker.join() + file_checking_for_deletion_queue.close() + file_checking_for_deletion_queue.join_thread() + file_deletion_queue.close() + file_deletion_queue.join_thread() + print(clrlib.stylize('Starting to prune directories from filesystem...', [clrlib.fg('light_cyan'), clrlib.attr('bold'), ])) + recursive_empty_dir_prunning(Path('i_gdl')) + + +def recursive_file_prunning(start_path: Path, queue: multiprocessing.Queue): + print(clrlib.stylize(f'Checking directory: {start_path} ...', [clrlib.attr('dim'), ]), + end='\r', flush=True) + subpaths = list() + files = list() + sub_dirs = list() + for e, subpath in enumerate(start_path.iterdir()): + subpaths.append(subpath) + if subpath.is_file(): + files.append(subpath) + elif subpath.is_dir(): + sub_dirs.append(subpath) + if (e % 10) == 0: + print(clrlib.stylize( + f'Checking directory: {start_path} ... {len(files)} files + {len(sub_dirs)} folders', + [clrlib.attr('dim'), ]), end='\r', flush=True) + subpaths = sorted(subpaths) + files = sorted(files) + sub_dirs = sorted(sub_dirs) + print(clrlib.stylize( + f'Checking directory: {start_path} ... {len(files)} files + {len(sub_dirs)} folders', + [clrlib.attr('dim'), ])) + for file in files: + queue.put(str(file)) + for subdir in sub_dirs: + recursive_file_prunning(subdir, queue) + + +def recursive_empty_dir_prunning(start_path: Path): + print(clrlib.stylize(f'Checking directory: {start_path} ...', [clrlib.attr('dim'), ]), + end='\r', flush=True) + subpaths = list() + sub_dirs = list() + files = list() + for e, subpath in enumerate(start_path.iterdir()): + subpaths.append(subpath) + if subpath.is_dir(): + sub_dirs.append(subpath) + elif subpath.is_file(): + files.append(subpath) + if (e % 10) == 0: + print(clrlib.stylize( + f'Checking directory: {start_path} ... {len(files)} files + {len(sub_dirs)} folders', + [clrlib.attr('dim'), ]), end='\r', flush=True) + subpaths = sorted(subpaths) + sub_dirs = sorted(sub_dirs) + files = sorted(files) + print(clrlib.stylize( + f'Checking directory: {start_path} ... {len(files)} files + {len(sub_dirs)} folders', + [clrlib.attr('dim'), ])) + for subdir in sub_dirs: + recursive_empty_dir_prunning(subdir) + try: + next(start_path.iterdir()) + except StopIteration: + print(clrlib.stylize( + f'Removing folder: {start_path}', [ + clrlib.fg('red'), + clrlib.attr('bold'), + ] + )) + start_path.rmdir() + + +if __name__ == '__main__': + main() diff --git a/reddit_imgs/fetch.py b/reddit_imgs/fetch.py index 24f3257..181a85b 100755 --- a/reddit_imgs/fetch.py +++ b/reddit_imgs/fetch.py @@ -12,20 +12,24 @@ downloaderGetter = downloaderModule.getDownloader wdir = os.path.abspath('.') + def _isImageDirectLink(s): return s.endswith('.jpg') or s.endswith('.png') or s.endswith('.gif') or s.endswith('.webp') or s.endswith('.mp4') + def isImageDirectLink(s): return _isImageDirectLink(s) or _isImageDirectLink(s.split('?', 1)[0]) + def retry(): main(True) -def main(retryEmptyAlbums = False): + +def main(retryEmptyAlbums=False): links = list() - subreddits = sorted(filter(lambda sr: os.path.isdir(os.path.join(wdir,'r',sr)), os.listdir(os.path.join(wdir,'r')))) + subreddits = sorted(filter(lambda sr: os.path.isdir(os.path.join(wdir, 'r', sr)), os.listdir(os.path.join(wdir, 'r')))) for subreddit in subreddits: - srf = os.path.abspath(os.path.join(wdir,'r',subreddit,'subreddit.json')) + srf = os.path.abspath(os.path.join(wdir, 'r', subreddit, 'subreddit.json')) links2 = list() try: with open(srf) as f: @@ -33,18 +37,19 @@ def main(retryEmptyAlbums = False): for lnk in links2: lnk['subreddit'] = subreddit del lnk - except: pass - links+=links2 + except: + pass + links += links2 del links2 del srf del subreddit del subreddits - + links.sort(key=lambda link: link['timestamp']) links = list(filter(lambda l: '' in link: for linkcopy in search_urls(link): + linkcopy = get_normalized_link(linkcopy) if linkcopy not in post_links: post_links.append(linkcopy) - has_added_any_link = True - Path('r_gdl_p.json').write_text(json.dumps(posts, indent=1)) + has_changed_any_link = True + while link in post_links: + post_links.remove(link) + has_changed_any_link = True + else: + linkcopy = link + linkcopy = get_normalized_link(linkcopy) + if linkcopy not in post_links: + post_links.append(linkcopy) + has_changed_any_link = True + while linkcopy[-1:] in ('/', '#', '?'): + linkcopy = linkcopy[:-1] + linkcopy = get_normalized_link(linkcopy) + if linkcopy not in post_links: + post_links.append(linkcopy) + has_changed_any_link = True + if '?' in link: + linkcopy = link.split('?', 1)[0] + linkcopy = get_normalized_link(linkcopy) + if linkcopy not in post_links: + post_links.append(linkcopy) + has_changed_any_link = True + if '#' in link: + linkcopy = link.split('#', 1)[0] + linkcopy = get_normalized_link(linkcopy) + if linkcopy not in post_links: + post_links.append(linkcopy) + has_changed_any_link = True + if link == '': + while link in post_links: + post_links.remove(link) + has_changed_any_link = True + if link.startswith('/'): + while link in post_links: + post_links.remove(link) + has_changed_any_link = True + if link.startswith('#'): + while link in post_links: + post_links.remove(link) + has_changed_any_link = True + if link.startswith('mailto'): + while link in post_links: + post_links.remove(link) + has_changed_any_link = True + if (proto := (tpl := link.split(':', 1))[0]).lower() in ('http', 'https') and proto not in ('http', 'https'): + lst = list(tpl) + lst[0] = lst[0].lower() + linkcopy = ':'.join(lst) + post_links.remove(link) + if linkcopy not in post_links: + post_links.append(linkcopy) + has_changed_any_link = True + Path('r_gdl_p.json').write_text( + json.dumps(posts, indent=1, sort_keys=True)) print(f'Grouping links with the posts they show up in...') for dk, post in posts.items(): for link in post['links']: if link not in links: links[link] = list() links[link].append(dk) - Path('r_gdl_lp.json').write_text(json.dumps(links, indent=1)) + Path('r_gdl_lp.json').write_text( + json.dumps(links, indent=1, sort_keys=True)) print(f'{len(links)} links found') print(f'Checking if there is an extractor for each link...') r_gdl_le_path = Path('r_gdl_le.json') @@ -104,34 +270,92 @@ def main(): ext = gallery_dl.extractor.find(link) except gallery_dl.exception.NotFoundError: pass - link_extractors[link] = type( - ext).category if ext is not None else '' - r_gdl_le_path.write_text(json.dumps(link_extractors, indent=1)) + if ext is not None and type(ext).category == 'reddit' and type(ext).subcategory in ('subreddit', 'user'): + link_extractors[link] = f'{type(ext).category}_{type(ext).subcategory}' + else: + link_extractors[link] = (type(ext).category + if ext is not None else + '') + r_gdl_le_path.write_text(json.dumps( + link_extractors, indent=1, sort_keys=True)) links_by_extractor = { extractor: list() for extractor in list(set(link_extractors.values())) } for link, extractor in link_extractors.items(): links_by_extractor[extractor].append(link) - undownloadable_posts = links_by_extractor.get('', []) + not_downloadable_links = dict() + not_downloadable_links[''] = links_by_extractor.get('', []) + not_downloadable_links['reddit_user'] = links_by_extractor.get('reddit_user', []) + not_downloadable_links['reddit_subreddit'] = links_by_extractor.get('reddit_subreddit', []) Path('i_undownloadable.json').write_text( - json.dumps(undownloadable_posts, indent=1)) + json.dumps(not_downloadable_links, indent=1)) if '' in links_by_extractor: del links_by_extractor[''] + if 'reddit_user' in links_by_extractor: + del links_by_extractor['reddit_user'] + if 'reddit_subreddit' in links_by_extractor: + del links_by_extractor['reddit_subreddit'] + not_downloadable_link_set = frozenset(flatten_generator(not_downloadable_links.values())) - print(f'{len(links)-len(undownloadable_posts)} downloadable links found') - print(f'{len(undownloadable_posts)} undownloadable links found') + print(f'{len(links)-len(not_downloadable_link_set)} downloadable links found') + print(f'{len(not_downloadable_link_set)} undownloadable links found') print(f'{len(links_by_extractor)} extractors found') Path('r_gdl_lbe.json').write_text(json.dumps(links_by_extractor, indent=1)) + files_from_links = dict() + links_no_files = list() + files_sizes = dict() + link_statuses = dict() ignored_links = set() + + if (pth := Path('i_gdl_ffl.json')).exists(): + files_from_links = json.loads(pth.read_text()) + + if (pth := Path('i_gdl_lnf.json')).exists(): + links_no_files = json.loads(pth.read_text()) + + if (pth := Path('i_gdl_fsz.json')).exists(): + files_sizes = json.loads(pth.read_text()) + + if (pth := Path('i_gdl_spl.json')).exists(): + link_statuses = json.loads(pth.read_text()) + if (p := Path('i_gdl_ignored.txt')).exists(): ignored_links = set(list(filter(len, p.read_text().splitlines()))) + links_no_files = list(filter(lambda a: a not in ignored_links, + links_no_files)) + + link_statuses = dict(filter(lambda a: a[0] not in ignored_links, + link_statuses.items())) + + files_from_links = dict(filter(lambda a: a[0] not in ignored_links, + files_from_links.items())) + + checked_links = list(files_from_links.keys()) + links_no_files + + checked_links = frozenset(checked_links) + max_expected_jobs_for_extractor = 0 for extractor, links in links_by_extractor.items(): - links = [link for link in links if link not in ignored_links] - workers = math.ceil(len(links)/SPLIT_WORKER_AFTER_N_LINKS) + links = [link + for link in links + if + link not in ignored_links + and + ( + link not in checked_links + or + not SKIP_INDEXED_FILES + or + (link_statuses.get(link, 0xFF) & RETRY_ERROR_MASK) != 0 + )] + if len(links) <= 0: + continue + this_worker_split_after_n_links = CUSTOM_WORKER_SPLITS.get( + extractor, SPLIT_WORKER_AFTER_N_LINKS) + workers = math.ceil(len(links)/this_worker_split_after_n_links) if workers <= 1 or extractor in FORBIDDEN_WORKER_SPLITS: workers = 1 max_expected_jobs_for_extractor = max( @@ -142,37 +366,86 @@ def main(): links_to_worker = dict() for extractor, links in links_by_extractor.items(): - links = [link for link in links if link not in ignored_links] - workers = math.ceil(len(links)/SPLIT_WORKER_AFTER_N_LINKS) + links = [link + for link in links + if + link not in ignored_links + and + ( + link not in checked_links + or + not SKIP_INDEXED_FILES + or + (link_statuses.get(link, 0xFF) & RETRY_ERROR_MASK) != 0 + )] + if len(links) <= 0: + continue + this_worker_split_after_n_links = CUSTOM_WORKER_SPLITS.get( + extractor, SPLIT_WORKER_AFTER_N_LINKS) + workers = math.ceil(len(links)/this_worker_split_after_n_links) if workers <= 1 or extractor in FORBIDDEN_WORKER_SPLITS: + if extractor in IGNORE_WORKERS: + continue links_to_worker[extractor] = links worker_by_seq[0].append(extractor) else: digits = math.ceil(math.log10(max(1, workers+1))) fmt = "%%0%dd" % digits for worker_no in range(workers): - lowerlimit = (worker_no+0)*SPLIT_WORKER_AFTER_N_LINKS - upperlimit = (worker_no+1)*SPLIT_WORKER_AFTER_N_LINKS + lowerlimit = (worker_no+0)*this_worker_split_after_n_links + upperlimit = (worker_no+1)*this_worker_split_after_n_links thisrange = links[lowerlimit:upperlimit] worker_nm = extractor + ':' + (fmt % (worker_no)) + if worker_nm in IGNORE_WORKERS: + continue links_to_worker[worker_nm] = thisrange worker_by_seq[worker_no].append(worker_nm) for w in worker_by_seq: w.sort() workers_nicely_grouped = [ - worker + worker for workergroup in worker_by_seq for worker in workergroup if worker != '' ] print(f'{len(links_to_worker)} workers to be spawned') - configure_gdl() - gallery_dl.output.select = lambda: ColoredLineOutput(False) + def save_ending_files(): + nonlocal links_no_files + links_no_files2 = list(map(lambda a: a[0], + filter(lambda a: len(a[1]) <= 0 and a[0] not in links_no_files, + files_from_links.items()))) + links_no_files + files_from_links2 = dict( + filter(lambda a: len(a[1]) > 0, + files_from_links.items())) + links_no_files2_sorted = sorted(links_no_files2) + links_for_files = dict() + for link, files in files_from_links2.items(): + for file in files: + if file not in links_for_files: + links_for_files[file] = list() + links_for_files[file].append(link) + del file + del link + del files + os.sync() + Path('i_gdl_lnf.json').write_text( + json.dumps(links_no_files2_sorted, indent=1)) + Path('i_gdl_ffl.json').write_text(json.dumps( + files_from_links2, indent=1, sort_keys=True)) + Path('i_gdl_lff.json').write_text(json.dumps( + links_for_files, indent=1, sort_keys=True)) + Path('i_gdl_fsz.json').write_text( + json.dumps(files_sizes, indent=1, sort_keys=True)) + Path('i_gdl_spl.json').write_text(json.dumps( + link_statuses, indent=1, sort_keys=True)) + os.sync() - files_from_links = dict() + save_ending_files() + + gallery_dl.output.select = lambda: ColoredLineOutput(False) totalfiles = 0 @@ -180,82 +453,133 @@ def main(): for line, thread_id in enumerate(thread_ids): workers_state_path.joinpath(thread_id+'=line').write_text(str(line)) linkcount = len(links_to_worker[thread_id]) - workers_state_path.joinpath(thread_id).write_text(f'waiting:{linkcount}:{linkcount}:0:0') + workers_state_path.joinpath(thread_id).write_text( + f'waiting:{linkcount}:{linkcount}:0:0') do_fancy_multithreading_panel = False thread_id_count = len(thread_ids) - with PoolExecutor(min(MAX_WORKERS, thread_id_count)) as pe: - if do_fancy_multithreading_panel: - print(f'\033[2J', end='', flush=True) - print(f'\033[0;0H', end='', flush=True) - print('Downloading...', flush=True) - if do_fancy_multithreading_panel: - print(f'\033[0;0H', end='', flush=True) - largest_tid_size = max(map(len, thread_ids)) - line2tid = dict() + if DEBUG_WORKER is not None: + print(f'Will debug {repr(DEBUG_WORKER)}.') + thread_id = DEBUG_WORKER + links_list = links_to_worker[DEBUG_WORKER] + download_link_list( + links_list, + thread_id, + None, + f'Debugging {repr(DEBUG_WORKER)}...', + workers_state_path.joinpath(thread_id), + ) + return + if links_to_worker: + with PoolExecutor(min(MAX_WORKERS, thread_id_count)) as pe: + if do_fancy_multithreading_panel: + print(f'\033[2J', end='', flush=True) + print(f'\033[0;0H', end='', flush=True) + print('Downloading...', flush=True) + if do_fancy_multithreading_panel: + print(f'\033[0;0H', end='', flush=True) + largest_tid_size = max(map(len, thread_ids)) + line2tid = dict() - def done_callback_generator(line): - nonlocal totalfiles - def done_callback(job): + def done_callback_generator(line): nonlocal totalfiles - thread_id = line2tid[line] + + def terminate_process_pool(): + os.system('sync') + os.system("bash -c \"ps -aux | grep './redditgetter.py' | grep -v grep | sed -e 's/ */ /g' | cut -d' ' -f2 | xargs -r -- kill -15\"") + sys.exit(0xFF) + + def done_callback(job): + nonlocal totalfiles + thread_id = line2tid[line] + links_list = links_to_worker[thread_id] + try: + workers_state_path.joinpath(thread_id).write_text( + f'finished:{len(links_list)}:0:0:0') + print(clrlib.stylize( + f'Received job #{line}: {thread_id}', [ + clrlib.fg('white'), + clrlib.bg('green'), + clrlib.attr('bold'), + ] + )) + downloaded_links = list() + totalbytes = 0 + thisfiles = 0 + true = True + downloaded_links = job.result() + for link, files in downloaded_links: + if true: + statusdir = get_path_for_caching( + link, Path('i_gdl_s')) + statusdir.mkdir(parents=True, exist_ok=True) + statusfile = statusdir.joinpath('_gdl_status.json') + statuses = dict() + if statusfile.exists(): + statuses = json.loads(statusfile.read_text()) + link_statuses[link] = statuses.get(link, 0xFF) + if link not in files_from_links: + files_from_links[link] = list() + lenfiles = len(files) + totalfiles += lenfiles + for file in files: + filepath = Path(file) + thisfiles += 1 + if filepath.exists(): + files_from_links[link].append(file) + st_size = filepath.stat().st_size + files_sizes[file] = st_size + totalbytes += st_size + workers_state_path.joinpath(thread_id).write_text( + f'finished:{len(links_list)}:0:{totalbytes}:{thisfiles}') + save_ending_files() + except: + sio = StringIO() + traceback.print_exc(file=sio) + excTxt = sio.getvalue() + try: + workers_state_path.joinpath(thread_id).write_text( + f'failed:{len(links_list)}:0:0:0') + except: + pass + try: + workers_state_path.joinpath(thread_id+'=exc').write_text(excTxt) + except: + pass + try: + pe.shutdown(wait=False) + except: + pass + print(excTxt) + terminate_process_pool() + return + return done_callback + for line, thread_id in enumerate(thread_ids): + line2tid[line] = thread_id links_list = links_to_worker[thread_id] - workers_state_path.joinpath(thread_id).write_text(f'finished:{len(links_list)}:0:0:0') - print(clrlib.stylize( - f'Received job #{line}: {thread_id}', [ - clrlib.fg('white'), - clrlib.bg('green'), - clrlib.attr('bold'), - ] - )) - totalbytes = 0 - thisfiles = 0 - generator = list() - try: - generator = job.result() - except: - with workers_state_path.joinpath(thread_id+'=exc').open('wt') as f: - traceback.print_exc(file=f) - traceback.print_exc() - sys.exit(255) - for link, files in generator: - files_from_links[link] = files - lenfiles = len(files) - totalfiles += lenfiles - for file in files: - st = Path(file).stat() - totalbytes += st.st_size - thisfiles += lenfiles - workers_state_path.joinpath(thread_id).write_text(f'finished:{len(links_list)}:0:{totalbytes}:{thisfiles}') - - - return done_callback - for line, thread_id in enumerate(thread_ids): - line2tid[line] = thread_id - links_list = links_to_worker[thread_id] - workers_state_path.joinpath(thread_id).write_text(f'enqueued:{len(links_list)}:{len(links_list)}:0:0') - print(clrlib.stylize(f'Starting job #{line}: {thread_id}', [ - clrlib.fg('white'), - clrlib.bg('light_red'), - clrlib.attr('bold'), - ])) - jobstardedmsg = clrlib.stylize(f'Starting job #{line}: {thread_id}', [ - clrlib.fg('black'), - clrlib.bg('light_yellow'), - clrlib.attr('bold'), - ]) - thread_id_nmsz = len(thread_id) - thread_id_display = thread_id + ' ' * \ - (largest_tid_size - thread_id_nmsz) - job = pe.submit( - download_link_list, - links_list, - thread_id_display, - line+3 if do_fancy_multithreading_panel else None, - jobstardedmsg, - workers_state_path.joinpath(thread_id), - ) - job.add_done_callback(done_callback_generator(line)) - Path('i_gdl_ffl.json').write_text(json.dumps(files_from_links, indent=1)) + workers_state_path.joinpath(thread_id).write_text( + f'enqueued:{len(links_list)}:{len(links_list)}:0:0') + print(clrlib.stylize(f'Starting job #{line}: {thread_id}', [ + clrlib.fg('white'), + clrlib.bg('light_red'), + clrlib.attr('bold'), + ])) + jobstardedmsg = clrlib.stylize(f'Starting job #{line}: {thread_id}', [ + clrlib.fg('black'), + clrlib.bg('light_yellow'), + clrlib.attr('bold'), + ]) + thread_id_nmsz = len(thread_id) + thread_id_display = thread_id + ' ' * (largest_tid_size - thread_id_nmsz) + job = pe.submit( + download_link_list, + links_list, + thread_id_display, + line+3 if do_fancy_multithreading_panel else None, + jobstardedmsg, + workers_state_path.joinpath(thread_id), + ) + job.add_done_callback(done_callback_generator(line)) + save_ending_files() if (p := Path('latest_image_download.txt')).exists(): p.unlink() if workers_state_path.exists(): @@ -272,6 +596,8 @@ def download_link_list(links: List[str], thread_state_path: Optional[Path] = None, ) -> List[Tuple[str, List[str]]]: '''Downloads a link list inside a ProcessPoolExecutor''' + if STOP_JOBS_FLAG_PATH.exists(): + raise InterruptedError(STOP_JOBS_FLAG_PATH) if job_started_msg is not None: print(job_started_msg) has_its_own_line = line is not None @@ -279,18 +605,19 @@ def download_link_list(links: List[str], remaining_links = link_count configure_gdl() if thread_state_path is not None: - thread_state_path.write_text(f'running:{link_count}:{remaining_links}:0:0') + thread_state_path.write_text( + f'running:{link_count}:{remaining_links}:0:0') def get_printer(): return ColoredLineOutput( has_its_own_line, prefix=(f'\033[{line};0H' if has_its_own_line else '') + - clrlib.stylize('% 9d' % remaining_links, [clrlib.fg('light_cyan')]) + + clrlib.stylize('%9d' % remaining_links, [clrlib.fg('light_cyan')]) + clrlib.stylize('@', [clrlib.fg('light_red')]) + clrlib.stylize(thread_id, [clrlib.fg('yellow')]) + - clrlib.stylize('=', [clrlib.fg('dark_gray')]), + clrlib.stylize('= ', [clrlib.fg('dark_gray')]), suffix=('\033[K\033[0;0H' if has_its_own_line else ''), - prefixsz=len(('% 9d' % 0)+' '+thread_id), + prefixsz=len(('%9d' % 0)+' '+thread_id), suffixsz=0, write_successes_to=Path('latest_image_download.txt'), ) @@ -303,43 +630,104 @@ def download_link_list(links: List[str], for link in links: scrubbing = True cachedir = get_path_for_caching(link, Path('i_gdl_c')) + statusdir = get_path_for_caching(link, Path('i_gdl_s')) cachedir.mkdir(parents=True, exist_ok=True) + statusdir.mkdir(parents=True, exist_ok=True) metafile = cachedir.joinpath('_gdl_meta.json') + statusfile = statusdir.joinpath('_gdl_status.json') meta = dict() + statuses = dict() link_already_downloaded = False if metafile.exists(): - meta = json.loads(metafile.read_text()) - if link in meta: + try: + meta = json.loads(metafile.read_text()) + except json.JSONDecodeError: + pass + if statusfile.exists(): + try: + statuses = json.loads(statusfile.read_text()) + except json.JSONDecodeError: + pass + if link in meta and link in statuses: link_already_downloaded = True - for fl in meta[link]: - pth = Path(fl) - if not pth.exists(): - link_already_downloaded = False - break - if not link_already_downloaded: + rc = statuses.get(link, 0xFF) + if rc == 0: + for fl in meta[link]: + pth = Path(fl) + try: + if not pth.exists(): + link_already_downloaded = False + break + except OSError: + link_already_downloaded = False + break + if len(meta[link]) == 0 and REDOWNLOAD_EMPTIES: + link_already_downloaded = False + if (rc & RETRY_ERROR_MASK) != 0: + link_already_downloaded = False + if not link_already_downloaded or REDOWNLOAD: scrubbing = False if thread_state_path is not None: - thread_state_path.write_text(f'running:{link_count}:{remaining_links}:{totalbytes}:{totalfiles}') + thread_state_path.write_text( + f'running:{link_count}:{remaining_links}:{totalbytes}:{totalfiles}:{link}') job = DownloadJobWithCallSaverPostProcessor(link) job.out = get_printer() job.out.message(link, clrlib.fg('light_magenta')) - job.run() - files = list(map(lambda a: a[0], job.cspp.calls['run_final'])) - files = list(filter(lambda a: Path(a).exists(), files)) + rc = job.run() + os.sync() + # print('FINAL', job.cspp.calls) + # raise Exception(job.cspp.calls) + # files = job.cspp.calls['run_final'].copy() # Only brings the last element + files = job.cspp.calls['prepare'].copy() + files = list(filter(len, files)) + has_changed = True + while has_changed: + has_changed = False + for seq, fl in enumerate(files): + if not (pth := Path(fl)).exists(): + candidates = sorted(list(filter( + lambda p: (p.name.startswith(pth.name) + and + p.suffix != '.part' + and + p.suffix != '.json'), + pth.parent.iterdir())), + key=lambda p: len(p.name) + ) + if len(candidates) > 0: + files[seq] = str(candidates[0]) + has_changed = True + break + else: + rc |= 256 + # raise Exception(pth.name, candidates, files) + del has_changed meta[link] = files + statuses[link] = rc metafile.write_text(json.dumps(meta, indent=1)) + statusfile.write_text(json.dumps(statuses, indent=1)) + os.sync() for fl in meta[link]: + code = statuses[link] pth = Path(fl) if not pth.exists(): - raise FileNotFoundError((link, link_already_downloaded, meta[link])) - st = pth.stat() - totalbytes += st.st_size - totalfiles += 1 + if code != 0: + continue + else: + raise FileNotFoundError((link, + link_already_downloaded, + meta[link])) + else: + totalfiles += 1 + totalbytes += pth.stat().st_size result.append((link, meta[link])) remaining_links -= 1 if thread_state_path is not None: scrubbing_running = 'scrubbing' if scrubbing else 'running' - thread_state_path.write_text(f'{scrubbing_running}:{link_count}:{remaining_links}:{totalbytes}:{totalfiles}') + thread_state_path.write_text( + f'{scrubbing_running}:{link_count}:{remaining_links}:{totalbytes}:{totalfiles}:{link}') + if STOP_JOBS_FLAG_PATH.exists(): + raise InterruptedError(STOP_JOBS_FLAG_PATH) finally: print((f'\033[{line};0H' if has_its_own_line else '') + clrlib.stylize(thread_id.strip(), [clrlib.fg('yellow'), clrlib.attr('bold')]) + @@ -354,14 +742,16 @@ def configure_gdl(): '''Configures Gallery-DL for usage.''' parser = gallery_dl.option.build_parser() args = parser.parse_args([ - '--download-archive=i_gdl/archive.db', + *([] if USE_FIREFOX_COOKIES else ['--cookies=i_gdl/.cookies']), '--dest=i_gdl', '--write-metadata', - '--write-tags', + # '--write-tags', # '--write-log=i_gdl_log.txt', - # '--write-unsupported=i_gdl_unsupported.txt', - '--quiet', - '--retries=15', + '--write-unsupported=i_gdl_unsupported.txt', + # '--quiet', + *(['--verbose'] if DEBUG_WORKER else []), + '--retries=2', + # '--retries=7', # '--limit-rate=1500k', ]) gallery_dl.output.initialize_logging(args.loglevel) @@ -385,11 +775,15 @@ def configure_gdl(): gallery_dl.output.select = ColoredLineOutput + gallery_dl.util.PathFormatOriginal = gdl_pf + gallery_dl.util.PathFormat = OverriddenPathFormat + class DownloadJobWithCallSaverPostProcessor(gallery_dl.job.DownloadJob): def __init__(self, url, parent=None): super().__init__(url, parent) - self.cspp = CallSaverPostProcessor(self) + self.cspp = CallSaverPostProcessor( + self) if parent is None else parent.cspp def initialize(self, kwdict=None): super().initialize(kwdict) @@ -443,6 +837,12 @@ class ColoredLineOutput(gallery_dl.output.TerminalOutput): self.termsize = shutil.get_terminal_size().columns +class OverriddenPathFormat(gdl_pf): + def __init__(self, extractor): + super().__init__(extractor) + self.clean_path = FixFileNameFormatterWrapper(self.clean_path) + + class CallSaverPostProcessor(gallery_dl.postprocessor.common.PostProcessor): def __init__(self, job): super().__init__(job) @@ -454,26 +854,86 @@ class CallSaverPostProcessor(gallery_dl.postprocessor.common.PostProcessor): run_final=list(), ) - def prepare(self, pathfmt): + def prepare(self, pathfmt: gallery_dl.util.PathFormat): """Update file paths, etc.""" - self.calls['prepare'].append((pathfmt.path,)) + directory_formatters = pathfmt.directory_formatters + filename_formatter = pathfmt.filename_formatter + clean_segment = pathfmt.clean_segment + clean_path = pathfmt.clean_path - def run(self, pathfmt): + pathfmt.directory_formatters = None + pathfmt.filename_formatter = None + pathfmt.clean_segment = None + pathfmt.clean_path = None + + cloned_pathfmt: gallery_dl.util.PathFormat = pickle.loads(pickle.dumps(pathfmt)) + + pathfmt.directory_formatters = directory_formatters + pathfmt.filename_formatter = filename_formatter + pathfmt.clean_segment = clean_segment + pathfmt.clean_path = clean_path + + cloned_pathfmt.directory_formatters = directory_formatters + cloned_pathfmt.filename_formatter = filename_formatter + cloned_pathfmt.clean_segment = clean_segment + cloned_pathfmt.clean_path = clean_path + + cloned_pathfmt.build_path() + # print(cloned_pathfmt.path) + # print(cloned_pathfmt.filename) + # print(cloned_pathfmt.kwdict) + # print(cloned_pathfmt) + self.calls['prepare'].append(cloned_pathfmt.path) + + def run(self, pathfmt: gallery_dl.util.PathFormat): """Execute the postprocessor for a file""" - self.calls['run'].append((pathfmt.path,)) + self.calls['run'].append(pathfmt.path) - def run_metadata(self, pathfmt): + def run_metadata(self, pathfmt: gallery_dl.util.PathFormat): """Execute the postprocessor for a file""" - self.calls['run_metadata'].append((pathfmt.path,)) + self.calls['run_metadata'].append(pathfmt.path) - def run_after(self, pathfmt): + def run_after(self, pathfmt: gallery_dl.util.PathFormat): """Execute postprocessor after moving a file to its target location""" - self.calls['run_after'].append((pathfmt.path,)) + self.calls['run_after'].append(pathfmt.path) - def run_final(self, pathfmt, status): + def run_final(self, pathfmt: gallery_dl.util.PathFormat, status: int): """Postprocessor finalization after all files have been downloaded""" self.calls['run_final'].append((pathfmt.path, status)) +class FixFileNameFormatterWrapper: + """Wraps file name formatter for ensuring a valid file name length""" + + def __init__(self, formatter: gallery_dl.util.Formatter): + self.formatter = formatter + + def __call__(self, *args, **kwargs) -> str: + path = self.formatter(*args, **kwargs) + parts = list(map(fix_filename_ending_extension, + map(fix_filename_length, + map(fix_filename_ending_extension, + Path(path).parts)))) + return str(Path(*parts)) + + +def fix_filename_length(filename: str) -> str: + """Ensures a segment has a valid file name length""" + if len(filename.encode()) > 240: + extension = Path(filename).suffix + extension_bytes_length = len(extension.encode()) + stem_bytes = Path(filename).stem.encode() + fixed_stem_bytes = stem_bytes[:240-extension_bytes_length] + fixed_stem = fixed_stem_bytes.decode(errors="ignore") + return fixed_stem + extension + return filename + + +def fix_filename_ending_extension(filename: str) -> str: + if (fp := Path(filename)).stem[-1:] in ('.', ' '): + return str(fp.parent.joinpath(f"{fp.stem.rstrip('. ')}{fp.suffix}")) + return filename + + if __name__ == "__main__": main() diff --git a/reddit_imgs/get_firefox_cookies.sh b/reddit_imgs/get_firefox_cookies.sh new file mode 100755 index 0000000..9328bc3 --- /dev/null +++ b/reddit_imgs/get_firefox_cookies.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +export FIREFOX_VARIANT="default-release" + +list_firefox_cookies(){ + for COOKIEJAR in $HOME/.mozilla/firefox/*.$FIREFOX_VARIANT/cookies.sqlite; + do echo $COOKIEJAR; + done ; +} + +one_firefox_cookie(){ + list_firefox_cookies | head -1 ; +} + +echoerr() { echo "$@" 1>&2; } + +export FIREFOX_COOKIE_LOCATION=$(list_firefox_cookies) + +export_firefox_cookies(){ + export TMPDB=$(uuidgen -r | sed -e 's/-//g') + cp $FIREFOX_COOKIE_LOCATION /tmp/$TMPDB.db + echo "# Netscape HTTP Cookie File" > "$1~" + echo "# http://curl.haxx.se/rfc/cookie_spec.html" >> "$1~" + echo "# This is a generated file! Do not edit." >> "$1~" + echo -ne "\n" >> "$1~" + echo -e ".mode tabs\nselect host, case when host glob '.*' then 'TRUE' else 'FALSE' end, path, case when isSecure then 'TRUE' else 'FALSE' end, expiry, name, value from moz_cookies order by host;" | sqlite3 "/tmp/$TMPDB.db" >> "$1~" + rm /tmp/$TMPDB.db + mv "$1~" "$1" +} + +if [ -z "$FIREFOX_COOKIE_LOCATION" ]; then + echoerr "Firefox cookie database was not found." + exit 1 +else + if [ -z "$1" ]; then + echoerr "Cookie destination file unspecified." + exit 1 + else + export_firefox_cookies "$@" + fi; +fi; diff --git a/reddit_imgs/hashit2.py b/reddit_imgs/hashit2.py new file mode 100644 index 0000000..828d5ae --- /dev/null +++ b/reddit_imgs/hashit2.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + +import hashlib +import json +import multiprocessing +import traceback +from pathlib import Path +from typing import Any, Dict, FrozenSet, Generator, List, Set, TypeVar + +import colored + +from .system.cmdline_parser import parse_cmdline +from .system.flattener import flatten_generator +from .system.format_file_size import format_power10 +from .system.hexhashof import hexhashof + +WORKERS = 4 + +T = TypeVar('T') + + +def cmdline(encoded_args: str = None): + if encoded_args is None: + return run_with_config() + else: + return parse_cmdline(run_with_config, encoded_args) + + +def run_with_config(): + main() + + +def get_first_elem(e): return e[0] +def get_second_elem(e): return e[1] +def reverse_key_value(e): return e[1], e[0] + + +class FileHash: + def __init__(self, file: Path, hash: str): + self.file = file + self.hash = hash + + +class FileHasher(multiprocessing.Process): + def __init__(self, file_queue, hash_queue): + multiprocessing.Process.__init__(self) + self.file_queue = file_queue + self.hash_queue = hash_queue + + def run(self): + proc_name = self.name + try: + while True: + seq, total, file = self.file_queue.get() + if file is not None: + print(colored.stylize( + f'{proc_name}:{colored.fg("cyan")} {seq}/{total}:{colored.attr("reset")}{colored.attr("dim")} {file}', [colored.fg('yellow')])) + self.hash_queue.put(FileHash( + str(file), + hexhashof(Path(file).read_bytes(), hashlib.sha256) + )) + else: + print(colored.stylize(f'{proc_name}: Exiting', [ + colored.fg('red'), colored.attr('bold')])) + self.hash_queue.put(None) + break + except: + traceback.print_exc() + try: + self.hash_queue.put(None) + except: + pass + return + + +def main(): + print(colored.stylize('Reading files...', [ + colored.fg('light_cyan'), colored.attr('bold')])) + hashes_path = Path('i_gdl_hashes.txt') + if not hashes_path.exists(): + hashes_path.write_text('') + hashes_text = list(filter(len, hashes_path.read_text().splitlines())) + hashes: Set[str, str] = frozenset(map(reverse_key_value, filter( + get_second_elem, map(lambda a: a.split('|', 1), hashes_text)))) + # hashes -> link ; hash + files_size: Dict[str, int] = json.loads(Path('i_gdl_fsz.json').read_text()) + hashes = frozenset(filter(lambda a: files_size.get(a[0], 0), hashes)) + non_empty_files_size_dict = dict( + filter(lambda a: a[1], files_size.items())) + non_empty_files = frozenset(non_empty_files_size_dict.keys()) + downloaded_files_to_be_hashed = non_empty_files.difference( + map(get_first_elem, hashes)) + total_file_size_bytes = sum(files_size.values()) + total_file_size = format_power10(total_file_size_bytes) + hashed_size_bytes = sum( + map(lambda l: files_size[l], map(get_first_elem, hashes))) + hashes = set(hashes) + total = len(downloaded_files_to_be_hashed) + files_queue = multiprocessing.Queue() + hashes_queue = multiprocessing.Queue() + print(colored.stylize('Filling queues...', [ + colored.fg('light_cyan'), colored.attr('bold')])) + for enumeration, downloaded_file_to_be_hashed in enumerate(sorted(downloaded_files_to_be_hashed)): + files_queue.put( + (enumeration+1, total, str(downloaded_file_to_be_hashed))) + del enumeration + del total + for _ in range(WORKERS): + files_queue.put((0, 0, None)) + print(colored.stylize('Starting processes...', [ + colored.fg('light_cyan'), colored.attr('bold')])) + workers = list() + for _ in range(WORKERS): + worker = FileHasher(files_queue, hashes_queue) + workers.append(worker) + for worker in workers: + worker.start() + # raise Exception('-'*50) + print(colored.stylize('Listening queues...', [ + colored.fg('light_cyan'), colored.attr('bold')])) + with hashes_path.open('at') as hashes_handler: + active_workers = WORKERS + while active_workers > 0: + file_hash: FileHash = hashes_queue.get() + if file_hash is not None: + hashed_size_bytes += files_size[file_hash.file] + print(colored.stylize( + '%11.6f%% - %s of %s' % ( + 100*hashed_size_bytes / max(1, total_file_size_bytes), + format_power10(hashed_size_bytes), + total_file_size), + [colored.fg('light_green'), colored.attr('bold')])) + hashes_handler.write(f'{file_hash.hash}|{file_hash.file}\n') + else: + active_workers -= 1 + del file_hash + del active_workers + print(colored.stylize('Stopping processes...', [ + colored.fg('light_cyan'), colored.attr('bold')])) + for worker in workers: + worker.join() + del worker + del workers + files_queue.close() + files_queue.join_thread() + hashes_queue.close() + hashes_queue.join_thread() + print(colored.stylize('Sorting output file...', [ + colored.fg('light_cyan'), colored.attr('bold')])) + hashes_path.write_text( + '\n'.join( + list(filter( + lambda a: files_size.get(a.split('|', 1)[1], 0), + sorted(hashes_path.read_text().splitlines()) + )) + ) + '\n') + print(colored.stylize('Pointing out repeated hashes...', [ + colored.fg('light_cyan'), colored.attr('bold')])) + repeated_hashes = dict() + for hashed, location in map(lambda a: a.split('|', 1), sorted(hashes_path.read_text().splitlines())): + if hashed not in repeated_hashes: + repeated_hashes[hashed] = list() + repeated_hashes[hashed].append(location) + Path('i_gdl_rh.json').write_text(json.dumps( + dict(sorted( + list(filter(lambda a: len(a[1]) > 1, repeated_hashes.items())), + key=lambda a: (-len(a[1]), a[0]) + )), + indent=1, + )) + print(colored.stylize( + 'Done', [colored.fg('light_cyan'), colored.attr('bold')])) diff --git a/reddit_imgs/runner.py b/reddit_imgs/runner.py index 2903436..7342e74 100755 --- a/reddit_imgs/runner.py +++ b/reddit_imgs/runner.py @@ -8,27 +8,33 @@ import reddit_imgs.reorganize import reddit_imgs.wallpapers import reddit_imgs.thumbnailize import reddit_imgs.hashit +import reddit_imgs.hashit2 import reddit_imgs.normalizetobmp import reddit_imgs.cachedhash +import reddit_imgs.download_pruner +import reddit_imgs.suggest_subreddits_from_links +import reddit_imgs.condensate_hashes import os import sys import shutil wdir = os.path.abspath('.') + def ensureFolderAvailability(): - if not os.path.exists(os.path.join(wdir,'w')): - os.makedirs(os.path.join(wdir,'w')) - if not os.path.exists(os.path.join(wdir,'d')): - os.makedirs(os.path.join(wdir,'d')) - if not os.path.exists(os.path.join(wdir,'i')): - os.makedirs(os.path.join(wdir,'i')) - if not os.path.exists(os.path.join(wdir,'r')): - os.makedirs(os.path.join(wdir,'r')) + if not os.path.exists(os.path.join(wdir, 'w')): + os.makedirs(os.path.join(wdir, 'w')) + if not os.path.exists(os.path.join(wdir, 'd')): + os.makedirs(os.path.join(wdir, 'd')) + if not os.path.exists(os.path.join(wdir, 'i')): + os.makedirs(os.path.join(wdir, 'i')) + if not os.path.exists(os.path.join(wdir, 'r')): + os.makedirs(os.path.join(wdir, 'r')) + def managesubreddits(): i = '' - while i!='0': + while i != '0': print('\n'*100) print('----------------------------------------------') print(' Subreddit Manager ') @@ -47,74 +53,83 @@ def managesubreddits(): i = i.strip() print() print() - subreddits_dir = os.path.join(wdir,'r') - subreddits_isfolder = lambda sr: os.path.isdir(os.path.join(subreddits_dir,sr)) - subreddits = sorted(filter(subreddits_isfolder, os.listdir(subreddits_dir))) + subreddits_dir = os.path.join(wdir, 'r') + def subreddits_isfolder(sr): return os.path.isdir( + os.path.join(subreddits_dir, sr)) + subreddits = sorted( + filter(subreddits_isfolder, os.listdir(subreddits_dir))) if i in ['1', '3', '4', '5']: print('Subreddits monitored:') for sr in subreddits: - print('/r/{0}'.format(sr),end='') - if os.path.isfile(os.path.join(subreddits_dir,sr,'wallpaper.flag')): + print('/r/{0}'.format(sr), end='') + if os.path.isfile(os.path.join(subreddits_dir, sr, 'wallpaper.flag')): print('\t\t(wallpaper)') else: print() print() - if i=='1': + if i == '1': print('Press enter to continue') input() - if i=='3': + if i == '3': print('Enter the subreddit you want to get rid of:') rem = input('/r/') - try: shutil.rmtree(os.path.join(subreddits_dir,rem)) - except: pass + try: + shutil.rmtree(os.path.join(subreddits_dir, rem)) + except: + pass print() print('Done.') print('Press enter to continue') input() - elif i=='2': + elif i == '2': print('Enter the subreddit you want to add:') add = input('/r/') - try: os.makedirs(os.path.join(subreddits_dir,add)) - except: pass + try: + os.makedirs(os.path.join(subreddits_dir, add)) + except: + pass print() print('Done.') print('Press enter to continue') input() - elif i=='4': + elif i == '4': print('Enter the subreddit you want to set as wallpaper source:') add = input('/r/') try: - dd = os.path.join(subreddits_dir,add) + dd = os.path.join(subreddits_dir, add) if not os.path.exists(dd): os.makedirs(dd) - f = open(os.path.join(dd, 'wallpaper.flag'),'w') + f = open(os.path.join(dd, 'wallpaper.flag'), 'w') f.write('') f.close() - except: pass + except: + pass print() print('Done.') print('Press enter to continue') input() - elif i=='5': + elif i == '5': print('Enter the subreddit you want to unset as wallpaper source:') add = input('/r/') try: - dd = os.path.join(subreddits_dir,add) + dd = os.path.join(subreddits_dir, add) if not os.path.exists(dd): os.makedirs(dd) - f = open(os.path.join(dd, 'wallpaper.flag'),'w') + f = open(os.path.join(dd, 'wallpaper.flag'), 'w') f.write('') f.close() os.remove(os.path.join(dd, 'wallpaper.flag')) - except: pass + except: + pass print() print('Done.') print('Press enter to continue') input() + def mainmenu(): i = '' - while i!='0': + while i != '0': print('\n'*100) print('----------------------------------------------') print(' Reddit Image Downloader ') @@ -133,55 +148,68 @@ def mainmenu(): print('Enter your choice:') i = input() i = i.strip() - if i=='1': + if i == '1': managesubreddits() - elif i=='2': + elif i == '2': reddit_imgs.sync.main() - elif i=='3': + elif i == '3': reddit_imgs.fetch.main() - elif i=='4': + elif i == '4': reddit_imgs.hashit.main() - elif i=='5': + elif i == '5': reddit_imgs.thumbnailize.main() - elif i=='6': + elif i == '6': reddit_imgs.reorganize.main() - elif i=='7': + elif i == '7': reddit_imgs.wallpapers.main() + def main(): - ensureFolderAvailability() - if len(sys.argv)>1: + # ensureFolderAvailability() + if len(sys.argv) > 1: cmdline() else: mainmenu() + def cmdline(): cmds = sys.argv[1:] available_commands = (( - ('sync', reddit_imgs.sync.main), - ('fetch', reddit_imgs.fetch2.main), - ('fetch_old', reddit_imgs.fetch.main), - ('fetchretryingemptyalbuns', reddit_imgs.fetch.retry), - ('cachedhash', reddit_imgs.cachedhash.main), - ('hashit', reddit_imgs.hashit.main), - ('normalizetobmp', reddit_imgs.normalizetobmp.main), - ('thumbnailize', reddit_imgs.thumbnailize.main), - ('reorganize', reddit_imgs.reorganize.main), - ('wallpapers', reddit_imgs.wallpapers.main), + ('sync', reddit_imgs.sync.cmdline), + ('fetch', reddit_imgs.fetch2.cmdline), + ('suggest_subreddits_from_links', reddit_imgs.suggest_subreddits_from_links.cmdline), + ('prune_downloads', reddit_imgs.download_pruner.cmdline), + ('hashit', reddit_imgs.hashit2.cmdline), + ('condensate_hashes', reddit_imgs.condensate_hashes.cmdline), + # ('cachedhash', reddit_imgs.cachedhash.main), + # ('hashit', reddit_imgs.hashit.main), + # ('normalizetobmp', reddit_imgs.normalizetobmp.main), + # ('thumbnailize', reddit_imgs.thumbnailize.main), + # ('reorganize', reddit_imgs.reorganize.main), + # ('wallpapers', reddit_imgs.wallpapers.main), )) - available_commands_names = tuple(list(map(lambda a: a[0], available_commands))) + available_commands_names = tuple( + list(map(lambda a: a[0], available_commands))) for cmd in cmds: - if cmd not in available_commands_names: - print('Usage {0} [{1}]'.format(sys.argv[0], '/'.join(available_commands_names))) + if cmd.split(':', 1)[0] not in available_commands_names: + print('Usage {0} [{1}]'.format(sys.argv[0], + '/'.join(available_commands_names))) return for cmd in cmds: command_ran = False for acmd in available_commands: - if cmd == acmd[0]: + if cmd.split(':', 1)[0] == acmd[0]: + x = cmd.split(':', 1) command_ran = True - acmd[1]() + fcmd = acmd[1] + if len(x) == 1: + fcmd() + else: + fcmd(encoded_args=x[1]) if not command_ran: - print('Usage {0} [{1}]'.format(sys.argv[0], '/'.join(available_commands_names))) + print('Usage {0} [{1}]'.format(sys.argv[0], + '/'.join(available_commands_names))) + if __name__ == '__main__': main() diff --git a/reddit_imgs/suggest_subreddits_from_links.py b/reddit_imgs/suggest_subreddits_from_links.py new file mode 100644 index 0000000..a9d453e --- /dev/null +++ b/reddit_imgs/suggest_subreddits_from_links.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + +import json +from pathlib import Path +from typing import Dict, List + +import colored + +from .system.cmdline_parser import parse_cmdline + + +def cmdline(encoded_args: str = None): + if encoded_args is None: + return run_with_config() + else: + return parse_cmdline(run_with_config, encoded_args) + + +def run_with_config(): + main() + + +def main(): + not_downloadable_links: Dict[str, List[str]] = json.loads( + Path('i_undownloadable.json').read_text()) + subreddits_untreated = not_downloadable_links.get('reddit_subreddit', []) + subreddits_set = set() + subreddits = list() + for subreddit_untreated in subreddits_untreated: + subreddit = subreddit_untreated.split('/r/')[1].split('/')[0].split('?')[0].split('#')[0].lower() + if subreddit not in subreddits_set: + subreddits_set.add(subreddit) + subreddits.append((not Path('r', subreddit).exists(), subreddit)) + del subreddit_untreated + del subreddit + del subreddits_set + subreddits.sort() + for missing, subreddit in subreddits: + print(colored.stylize('https://www.reddit.com/', [colored.fg('cyan'), ]), end='') + print(colored.stylize('r/', [colored.fg('magenta'), ]), end='') + print(colored.stylize(subreddit, [ + colored.fg('red' if missing else 'light_green'), + ])) diff --git a/reddit_imgs/sync.py b/reddit_imgs/sync.py index a2c1db3..6c739ea 100755 --- a/reddit_imgs/sync.py +++ b/reddit_imgs/sync.py @@ -10,21 +10,41 @@ from urllib.error import ContentTooShortError, HTTPError, URLError from bs4 import BeautifulSoup as _BS from .system import simpleDownloader +from .system.cmdline_parser import parse_cmdline from .system.subredditTools import (GATEWAY_LINK_ARGS, build_gateway_link, getEmptySubredditData, getSubredditPageJsonInfo) +MAX_WORKERS = 16 + def BeautifulSoup(data): return _BS(data, 'html5lib') -simpleDownloader.setCookies({'over18':1}) + +def cmdline(encoded_args: str = None): + if encoded_args is None: + return run_with_config() + else: + return parse_cmdline(run_with_config, encoded_args) + + +def run_with_config(max_workers: int = None, + ): + global MAX_WORKERS + if max_workers is not None: + MAX_WORKERS = max_workers + return main() + + +simpleDownloader.setCookies({'over18': 1}) wdir = os.path.abspath('.') + def process_subreddit(subreddit): - simpleDownloader.setCookies({'over18':1}) + simpleDownloader.setCookies({'over18': 1}) srp = os.path.abspath(os.path.join(wdir, 'r', subreddit)) - #if subreddit!='yiff': continue + # if subreddit!='yiff': continue nextpage = build_gateway_link(subreddit) srdt = getEmptySubredditData(subreddit) try: @@ -37,22 +57,32 @@ def process_subreddit(subreddit): ygst = srdt['date_first'] jsonPageSr = None while nextpage: - pageno+=1 - print(('/r/{0:<20} loading page #%05d'%pageno).format(subreddit)) - print(' >> %s'%(nextpage.replace(GATEWAY_LINK_ARGS, '[...]'),)) + pageno += 1 + print(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit)) + print(' >> %s' % (nextpage.replace(GATEWAY_LINK_ARGS, '[...]'),)) redditBytes = None try: redditBytes = simpleDownloader.getUrlBytes(nextpage) except (HTTPError, URLError, ContentTooShortError): + print(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit)) print(" >> HTTP Error with code: Skipping...") break if redditBytes is None: + print(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit)) print(" >> HTTP Error: Skipping...") break # bs = BeautifulSoup(redditBytes) jsonPage = json.loads(redditBytes) - first, last, nextpage, links = getSubredditPageJsonInfo(jsonPage, subreddit, pageno) - if ygst >= first: #if latest stored post is at same age or older than the latest downloaded post, then we are up-to-date + getSubredditPageJsonInfoResult = None + try: + getSubredditPageJsonInfoResult = ( + getSubredditPageJsonInfo(jsonPage, subreddit, pageno)) + except IndexError: + print(('/r/{0:<20} loading page #%05d' % pageno).format(subreddit)) + print(" >> Empty subreddit: Skipping...") + break + first, last, nextpage, links = getSubredditPageJsonInfoResult + if ygst >= first: # if latest stored post is at same age or older than the latest downloaded post, then we are up-to-date nextpage = None srdt['date_first'] = max(first, srdt['date_first']) srdt['date_last'] = min(last, srdt['date_last']) @@ -71,17 +101,18 @@ def process_subreddit(subreddit): about=jsonPage['subredditAboutInfo'][srid], flair=jsonPage['postFlair'][srid], ) - with open(os.path.join(srp,'subreddit.json'),'w') as f: + with open(os.path.join(srp, 'subreddit.json'), 'w') as f: f.write(json.dumps(srdt, sort_keys=True, indent=2)) if jsonPageSr is not None: - with open(os.path.join(srp,'meta.json'),'w') as f: + with open(os.path.join(srp, 'meta.json'), 'w') as f: f.write(json.dumps(jsonPageSr, sort_keys=True, indent=2)) def main(): build_summary() - subreddits = sorted(filter(lambda sr: os.path.isdir(os.path.join(wdir,'r',sr)), os.listdir(os.path.join(wdir,'r')))) - with PoolExecutor(16) as pe: + subreddits = sorted(filter(lambda sr: os.path.isdir( + os.path.join(wdir, 'r', sr)), os.listdir(os.path.join(wdir, 'r')))) + with PoolExecutor(MAX_WORKERS) as pe: q = list() for subreddit in subreddits: job = pe.submit(process_subreddit, subreddit) @@ -93,12 +124,18 @@ def main(): def build_summary(): rjpath = Path(wdir, 'r.json') + rijpath = Path(wdir, 'ri.json') oldsrs = dict() + oldsrsi = dict() if rjpath.exists(): oldsrs = json.loads(rjpath.read_text()) + if rijpath.exists(): + oldsrsi = json.loads(rijpath.read_text()) srs = dict() + srsi = dict() for srp in Path(wdir, 'r').glob('*/subreddit.json'): sr = srp.parent.name.lower() + srip = srp.parent.joinpath('meta.json') try: srs[sr] = json.loads(srp.read_text()) except json.decoder.JSONDecodeError: @@ -108,7 +145,18 @@ def build_summary(): print('Restoring old data for corrupted subrredit %r' % sr) srs[sr] = oldsrs[sr] srp.write_text(json.dumps(oldsrs[sr], indent=1)) + if srip.exists(): + try: + srsi[sr] = json.loads(srip.read_text()) + except json.decoder.JSONDecodeError: + if sr not in oldsrsi: + raise + else: + print('Restoring old data for corrupted subrredit %r' % sr) + srsi[sr] = oldsrsi[sr] + srip.write_text(json.dumps(oldsrsi[sr], indent=1)) rjpath.write_text(json.dumps(srs, indent=1)) + rijpath.write_text(json.dumps(srsi, indent=1)) if __name__ == '__main__': diff --git a/reddit_imgs/system/cmdline_parser.py b/reddit_imgs/system/cmdline_parser.py new file mode 100644 index 0000000..f453d81 --- /dev/null +++ b/reddit_imgs/system/cmdline_parser.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + +import re +from inspect import _empty, getfullargspec, signature +from typing import Callable, Dict, List, Optional, Set, Type, TypeVar + +from .table_fmt import table_fmt + +SPLITTER_RGX = re.compile(r'(? str: + return txt.replace('~:', ':') + + +T = TypeVar('T') + + +def parse_cmdline(func: Callable[..., T], encoded_args: str) -> Optional[T]: + ''' + Transforms a colon-separated key-pairs into callable arguments + + Type-annotated fields will be converted. + ''' + split_args = SPLITTER_RGX.split(encoded_args) + split_args = list(map(unescape_after_splitter, split_args)) + brute_dict = dict(zip(split_args[0::2], split_args[1::2])) + full_arg_spec = getfullargspec(func) + sig = signature(func) + func_args = full_arg_spec.args + func_annotations = full_arg_spec.annotations + str_args = {k: v + for k, v in + brute_dict.items() + if k in func_args} + unknown_args = {k: v + for k, v in + brute_dict.items() + if k not in func_args} + if encoded_args == 'help' or len(unknown_args) > 0: + if len(unknown_args) > 0: + print('Unknown arguments found:') + for k, v in unknown_args.items(): + print(f' {k}: {repr(v)}') + print() + print(f'Usage help for: {func.__module__}.{func.__name__}') + tbl = list() + for name, parameter in sig.parameters.items(): + annotation = parameter.annotation if parameter.annotation != _empty else str + tbl.append(( + str(name), + repr(annotation), + repr(parameter.default) if parameter.default != _empty else '-unset-', + )) + print(table_fmt( + 'name,type,default'.split(','), + tbl, + alignment='^'*3, + )) + return None + kwargs = dict() + for key in str_args: + kwargs[key] = convert_type( + func_annotations.get(key, str), + str_args[key] + ) + print(f'Calling {func.__module__}.{func.__name__} with arguments:') + if len(kwargs) <= 0: + print(' --- no arguments given ---') + else: + for k, v in kwargs.items(): + print(f' {k}: {repr(v)}') + return func(**kwargs) + + +K = TypeVar('K') + + +def convert_type(cls: Type[K], data: str) -> K: + if cls not in (str, int, float): + cls = eval + return cls(data) diff --git a/reddit_imgs/system/downloader/cache.py b/reddit_imgs/system/downloader/cache.py index 5d0b365..c9fd22f 100644 --- a/reddit_imgs/system/downloader/cache.py +++ b/reddit_imgs/system/downloader/cache.py @@ -6,11 +6,14 @@ import json import hashlib import shutil + def get_normalized_link(link: str) -> str: if link.startswith('/r/'): link = 'https://www.reddit.com'+link if link.startswith('/user/'): link = 'https://www.reddit.com'+link + if link.startswith('/u/'): + link = 'https://www.reddit.com'+link return link @@ -18,6 +21,13 @@ def limit_filename_lenght(target: Path) -> Path: return Path(*[part[:255] for part in target.parts]) +def limit_path_max(target: Path) -> Path: + bytes_to_remove = 2000 - len(str(target.absolute()).encode()) + if bytes_to_remove < 0: + target = Path((str(target).encode()[:bytes_to_remove]).decode(errors='ignore')) + return target + + def get_domain(link: str) -> str: return get_path_for_caching(link).parts[1] @@ -25,7 +35,8 @@ def get_domain(link: str) -> str: def get_path_for_caching(link: str, prefix: Path = Path('i_c')) -> Path: link = get_normalized_link(link) target = prefix.joinpath(link.split('://', 1)[1]) - return limit_filename_lenght(target) + return limit_path_max(limit_filename_lenght(target)) + def has_file_cache(cached: Path) -> bool: if not cached.exists(): @@ -70,7 +81,7 @@ def replicate_from_cache(download_target, link): return False else: target_metajson = json.loads(target_meta.read_text()) - if target_metajson['type']=='file': + if target_metajson['type'] == 'file': download_path.mkdir(parents=True, exist_ok=True) ext = target_metajson['ext'] ffl = f"0000.{ext}" @@ -84,7 +95,7 @@ def replicate_from_cache(download_target, link): 'ext': ext, 'link': link, }], sort_keys=True, indent=2)) - elif target_metajson['type']=='album': + elif target_metajson['type'] == 'album': download_path.mkdir(parents=True, exist_ok=True) files = list() for i, lnk in enumerate(target_metajson['link']): @@ -165,11 +176,11 @@ def fix_cache_relocate_single_file_from_download(download_path, download, target shutil.rmtree(download_path) break raise Exception("Specified cached file does not exist.\n" + - f"Download path: {repr(download_path)}\n" + - f"Target: {repr(target)}") + f"Download path: {repr(download_path)}\n" + + f"Target: {repr(target)}") if not target_hashfile.exists(): m = hashlib.sha256() m.update(target_file.read_bytes()) target_hashfile.write_text(m.hexdigest()) thumbs_dict['file'] = str(target_file) - return thumbs_dict \ No newline at end of file + return thumbs_dict diff --git a/reddit_imgs/system/flattener.py b/reddit_imgs/system/flattener.py new file mode 100644 index 0000000..bbea80b --- /dev/null +++ b/reddit_imgs/system/flattener.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + +from typing import Collection, Generator, TypeVar + +T = TypeVar('T') + + +def flatten_generator(list_of_lists: Collection[Collection[T]]) -> Generator[T, None, None]: + yield from [] + for sublist in list_of_lists: + for item in sublist: + yield item diff --git a/reddit_imgs/system/format_file_size.py b/reddit_imgs/system/format_file_size.py new file mode 100644 index 0000000..c793a39 --- /dev/null +++ b/reddit_imgs/system/format_file_size.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + +from typing import Union + +Numeric = Union[int, float] + + +def format_power10(count: Numeric, intervals=1000, cutoff=2000, labels=',k,M,G,T,P,E,Z,Y'.split(','), suffix='B') -> str: + times_cut = 0 + while count > cutoff: + count /= intervals + times_cut += 1 + return '%.1f %s%s' % (count, labels[times_cut], suffix) + + +def format_power2(count: Numeric, intervals=1024, cutoff=2048, labels=',ki,Mi,Gi,Ti,Pi,Ei,Zi,Yi'.split(','), suffix='B') -> str: + return format_power10(count, intervals, cutoff, labels, suffix) diff --git a/reddit_imgs/system/hexhashof.py b/reddit_imgs/system/hexhashof.py index 3d3ed8d..3876ff7 100644 --- a/reddit_imgs/system/hexhashof.py +++ b/reddit_imgs/system/hexhashof.py @@ -1,11 +1,9 @@ #!/usr/bin/env python3 # -*- encoding: utf-8 -*- -from hashlib import _DataType -from hashlib import _Hash -from typing import Type +from typing import Any, Callable, Type -def hexhashof(bts: _DataType, using: Type[_Hash]) -> str: +def hexhashof(bts: bytes, using: Callable[[], Any]) -> str: m = using() m.update(bts) return m.hexdigest() \ No newline at end of file diff --git a/reddit_imgs/system/table_fmt.py b/reddit_imgs/system/table_fmt.py index accdbef..a693faf 100644 --- a/reddit_imgs/system/table_fmt.py +++ b/reddit_imgs/system/table_fmt.py @@ -41,7 +41,7 @@ def table_fmt(labels: List[str], list(map(lambda l: list(map(len, l)), tbl)) ) row_widths: List[int] = list(map(max, zip(*tbl_szs))) - print(row_widths) + # print(row_widths) labels_tpl = ( '| ' + ' | '.join([f'{{{e}:^{l}}}' for e, l in enumerate(row_widths)]) + @@ -66,7 +66,7 @@ def table_fmt(labels: List[str], s += hugelinesep + '\n' if title: s += title_tpl.format(title) - s += hugelinesep + '\n' + s += hugelinesep + '\n' # row label section s += labels_tpl.format(*lbl) + '\n' s += linesep + '\n' @@ -79,8 +79,8 @@ def table_fmt(labels: List[str], s += linesep + '\n' s += labels_tpl.format(*lbl) + '\n' # title section - s += hugelinesep + '\n' if title: + s += hugelinesep + '\n' s += title_tpl.format(title) s += hugelinesep + '\n' return s diff --git a/reddit_imgs/system/urlmatcher.py b/reddit_imgs/system/urlmatcher.py index 6ee758f..d5ceaba 100644 --- a/reddit_imgs/system/urlmatcher.py +++ b/reddit_imgs/system/urlmatcher.py @@ -4,9 +4,24 @@ import re from typing import List +HREF_RGX1_PTTRN = r'href=\"([^"]*)\"' +HREF_RGX2_PTTRN = r"href=\'([^']*)\'" +SRC_RGX1_PTTRN = r'src=\"([^"]*)\"' +SRC_RGX2_PTTRN = r"src=\'([^']*)\'" URL_RGX_PTTRN = r'(http|ftp|https)://([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:/~+#-]*[\w@?^=%&/~+#-])?' +HREF_RGX1 = re.compile(HREF_RGX1_PTTRN) +HREF_RGX2 = re.compile(HREF_RGX2_PTTRN) +SRC_RGX1 = re.compile(SRC_RGX1_PTTRN) +SRC_RGX2 = re.compile(SRC_RGX2_PTTRN) URL_RGX = re.compile(URL_RGX_PTTRN) def search_urls(text: str) -> List[str]: - return list(map(lambda l: l.group(0), URL_RGX.finditer(text))) + return list(set( + list(map(lambda l: l.group(0), URL_RGX.finditer(text))) + + list(map(lambda l: l.group(1), SRC_RGX1.finditer(text))) + + list(map(lambda l: l.group(1), SRC_RGX2.finditer(text))) + + list(map(lambda l: l.group(1), HREF_RGX1.finditer(text))) + + list(map(lambda l: l.group(1), HREF_RGX2.finditer(text))) + + [] + ))