reddit-image-wall-getter/reddit_imgs/condensate_hashes.py

239 lines
8.1 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import datetime
import json
import multiprocessing
from pathlib import Path
from typing import Any, Collection, Dict, FrozenSet, List, Optional, Tuple
import colored as clrlib
from .system.cmdline_parser import parse_cmdline
from .system.flattener import flatten_generator
from .system.format_file_size import format_power10
HISTORICAL_EXPORT = False
EXTENSION_FILTER = None
NSFW_NESS_FILTER = None
SUBREDDIT_FILTER = None
def cmdline(encoded_args: str = None):
if encoded_args is None:
return run_with_config()
else:
return parse_cmdline(run_with_config, encoded_args)
def run_with_config(historical: bool = False,
nsfw_ness_filter: bool = None,
extension_filter: list = None,
subreddit_filter: frozenset = None):
global HISTORICAL_EXPORT
global EXTENSION_FILTER
global NSFW_NESS_FILTER
global SUBREDDIT_FILTER
EXTENSION_FILTER = extension_filter
HISTORICAL_EXPORT = historical
NSFW_NESS_FILTER = nsfw_ness_filter
SUBREDDIT_FILTER = (None
if subreddit_filter is None else
frozenset(sr.lower() for sr in subreddit_filter))
return main()
class ExtensionFilter(multiprocessing.Process):
def __init__(self,
allowed_extensions: Collection[str],
input_queue: multiprocessing.Queue,
output_queue: multiprocessing.Queue):
multiprocessing.Process.__init__(self)
self.allowed_extensions = allowed_extensions
self.input_queue = input_queue
self.output_queue = output_queue
def run(self):
while True:
next_item: str = self.input_queue.get()
if next_item is None:
self.output_queue.put(None)
break
if self.allowed_extensions is None:
self.output_queue.put(next_item)
elif Path(next_item).suffix in self.allowed_extensions:
self.output_queue.put(next_item)
class NsfwNessFilter(multiprocessing.Process):
def __init__(self,
nsfw_ness: Optional[bool],
file2link: Dict[str, List[str]],
link2post: Dict[str, List[str]],
post_info: Dict[str, Dict[str, Any]],
input_queue: multiprocessing.Queue,
output_queue: multiprocessing.Queue):
multiprocessing.Process.__init__(self)
self.nsfw_ness = nsfw_ness
self.file2link = file2link
self.link2post = link2post
self.post_info = post_info
self.input_queue = input_queue
self.output_queue = output_queue
def run(self):
while True:
next_item: str = self.input_queue.get()
if next_item is None:
self.output_queue.put(None)
break
if (
(self.nsfw_ness is None)
or
all(map(
lambda post: (
self
.post_info
.get(post, {})
.get('nsfw', None)
==
self.nsfw_ness
),
flatten_generator(map(
lambda link: self.link2post.get(link, []),
self.file2link.get(next_item, [])
))
))
):
self.output_queue.put(next_item)
class SubredditFilter(multiprocessing.Process):
def __init__(self,
subreddits: Optional[FrozenSet[str]],
file2link: Dict[str, List[str]],
link2post: Dict[str, List[str]],
post_info: Dict[str, Dict[str, Any]],
input_queue: multiprocessing.Queue,
output_queue: multiprocessing.Queue):
multiprocessing.Process.__init__(self)
self.subreddits = subreddits
self.file2link = file2link
self.link2post = link2post
self.post_info = post_info
self.input_queue = input_queue
self.output_queue = output_queue
def run(self):
while True:
next_item: str = self.input_queue.get()
if next_item is None:
self.output_queue.put(None)
break
if (
(self.subreddits is None)
or
all(map(
lambda subreddit: subreddit in self.subreddits,
flatten_generator(map(
lambda post: (
self
.post_info
.get(post, {})
.get('subreddits', [])
),
flatten_generator(map(
lambda link: self.link2post.get(link, []),
self.file2link.get(next_item, [])
))
))
))
):
self.output_queue.put(next_item)
class FileExistsFilter(multiprocessing.Process):
def __init__(self,
input_queue: multiprocessing.Queue,
output_queue: multiprocessing.Queue):
multiprocessing.Process.__init__(self)
self.input_queue = input_queue
self.output_queue = output_queue
def run(self):
while True:
next_item: str = self.input_queue.get()
if next_item is None:
self.output_queue.put(None)
break
if Path(next_item).exists():
self.output_queue.put(next_item)
def main():
exported_name = 'h_gdl.txt' if not HISTORICAL_EXPORT else f'h_gdl_{datetime.datetime.now()}.txt'
exported_path = Path(exported_name)
exported_path.write_text('')
hashes_list: List[Tuple[str, str]] = list(map(
lambda a: (a[1], a[0]),
map(
lambda a: a.split('|', 1),
Path('i_gdl_hashes.txt').read_text().splitlines())))
hashes_dict: Dict[str, str] = dict(hashes_list)
r_gdl_p = json.loads(Path('r_gdl_p.json').read_text())
r_gdl_lp = json.loads(Path('r_gdl_lp.json').read_text())
i_gdl_lff = json.loads(Path('i_gdl_lff.json').read_text())
file_sizes = json.loads(Path('i_gdl_fsz.json').read_text())
general_size = 0
general_count = 0
existing_files_queue = multiprocessing.Queue()
intermediary_queue = multiprocessing.Queue()
intermediary_queue2 = multiprocessing.Queue()
remaining_queue = multiprocessing.Queue()
ffp = ExtensionFilter(EXTENSION_FILTER, existing_files_queue,
intermediary_queue)
nfp = NsfwNessFilter(NSFW_NESS_FILTER, i_gdl_lff, r_gdl_lp, r_gdl_p,
intermediary_queue, intermediary_queue2)
srp = SubredditFilter(SUBREDDIT_FILTER, i_gdl_lff, r_gdl_lp, r_gdl_p,
intermediary_queue2, remaining_queue)
# fep = FileExistsFilter(intermediary_queue, remaining_queue)
ffp.start()
nfp.start()
srp.start()
# fep.start()
for file, _ in hashes_list:
existing_files_queue.put(file)
existing_files_queue.put(None)
known_hashes = set()
with exported_path.open('at') as fd:
while True:
next_item: str = remaining_queue.get()
if next_item is None:
break
else:
hsh = hashes_dict[next_item]
known_hashes.add(hsh)
fd.write(f'{hsh}|{next_item}\n')
general_size += file_sizes.get(next_item, 0)
general_count += 1
ffp.join()
nfp.join()
srp.join()
# fep.join()
existing_files_queue.close()
existing_files_queue.join_thread()
intermediary_queue.close()
intermediary_queue.join_thread()
intermediary_queue2.close()
intermediary_queue2.join_thread()
remaining_queue.close()
remaining_queue.join_thread()
print(f'Found {general_count} files')
print(f'Found {len(known_hashes)} unique hashes')
print(f'Size: {general_size} bytes ({format_power10(general_size)})')
if __name__ == "__main__":
HISTORICAL_EXPORT = True
main()