#!/usr/bin/env python3 # -*- encoding: utf-8 -*- import configparser import datetime import json import math import os import re import smtplib import time import traceback import uuid from io import BytesIO from pathlib import Path from typing import Dict, FrozenSet, Generator, List, Tuple import telegram import PIL.Image import PIL.ImageDraw import PIL.ImageFont import pytz import requests import tweepy from bs4 import BeautifulSoup from bs4.element import Tag IMAGE_FONT = PIL.ImageFont.truetype( r'fonts/Montserrat/Montserrat-Medium.ttf', 20) IMAGE_FONT2 = PIL.ImageFont.truetype( r'fonts/Montserrat/Montserrat-SemiBold.ttf', 32) DEBUG = bool(os.environ.get('DEV')) RGX_DATE_CERTAIN = re.compile( r'''([A-Za-z]+) (\d{1,2})(?:-(?:([A-Za-z]+) )?(\d{1,2})(?: (\d{4}))?)?''') RGX_DATE_TBA_MONTH_RANGE = re.compile( r'''([A-Za-z]+)(?:-([A-Za-z]+))?(?: (\d{4}))?''') RGX_DATE_TBA_YEAR = re.compile(r'''(\d{4})''') SEASON_DURATION = 3 MONTHS_COMMON_NA = { 'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12, 'Spring': (((3+SEASON_DURATION) - 1) % 12) + 1, 'Summer': (((6+SEASON_DURATION) - 1) % 12) + 1, 'Autumn': (((9+SEASON_DURATION) - 1) % 12) + 1, 'Winter': (((12+SEASON_DURATION) - 1) % 12) + 1, } MONTHS_LAST_DAY = [*([31, 30]*3), 31, *([31, 30]*2), 31] MONTHS_LAST_DAY[1] -= 2 class TempFile(object): def __init__(self, *args, **kwargs): self.path = Path(f'/tmp/{uuid.uuid4().hex}', *args, **kwargs) self.path.touch() def __enter__(self): return self.path def __exit__(self, exc_type, exc_value, traceback): if self.path.is_file(): self.path.unlink() return self class PlannedFurCon: def __init__(self, country_iso3166_a3: str, date_start: datetime.date, date_end: datetime.date, tba_status: bool, name: str): self.country_iso3166_a3: str = country_iso3166_a3 self.date_start: datetime.date = date_start self.date_end: datetime.date = date_end self.tba_status: bool = tba_status self.name: str = name def to_json(self) -> dict: return dict( country_iso3166_a3=self.country_iso3166_a3, date_start=[self.date_start.year, self.date_start.month, self.date_start.day], date_end=[self.date_end.year, self.date_end.month, self.date_end.day], tba_status=self.tba_status, name=self.name, ) @ classmethod def from_json(cls, d) -> 'PlannedFurCon': d = d.copy() d['date_start'] = datetime.date(*d['date_start']) d['date_end'] = datetime.date(*d['date_end']) return cls(**d) def __repr__(self) -> str: return str(f'{type(self).__name__}{(self.country_iso3166_a3, self.date_start, self.date_end, self.tba_status, self.name,)}') def _tuple_sort_repr(self) -> tuple: return (self.date_start, self.date_end, self.tba_status, self.country_iso3166_a3, self.name) def _cmp(self, o, op) -> bool: if hasattr(o, '_tuple_sort_repr'): return getattr(self._tuple_sort_repr(), op)(o._tuple_sort_repr()) return False def __eq__(self, o) -> bool: return self._cmp(o, '__eq__') def __ne__(self, o) -> bool: return self._cmp(o, '__ne__') def __lt__(self, o) -> bool: return self._cmp(o, '__lt__') def __le__(self, o) -> bool: return self._cmp(o, '__le__') def __gt__(self, o) -> bool: return self._cmp(o, '__gt__') def __ge__(self, o) -> bool: return self._cmp(o, '__ge__') def __hash__(self): return hash(self._tuple_sort_repr()) def _identifier(self): return self.name def derive_date_en(date_str: str, year_hint: int, tba: bool) -> Tuple[datetime.date, datetime.date, bool]: date_start = None date_end = None if (match := RGX_DATE_CERTAIN.match(date_str)) is not None: groups = match.groups() ms = MONTHS_COMMON_NA[groups[0]] ds = int(groups[1]) ys = ye = int(groups[4]) if groups[4] else year_hint me = MONTHS_COMMON_NA[groups[2]] if groups[2] else ms de = int(groups[3]) if groups[3] else ds while ye < ys: ye += 1 while (ye, me) < (ys, ms): ye += 1 while (ye, me, de) < (ys, ms, ds): me += 1 if de > MONTHS_LAST_DAY[me-1]: de = 1 me += 1 if me > 12: ye += 1 me = 1 date_start = datetime.date(ys, ms, ds) date_end = datetime.date(ye, me, de) elif (match := RGX_DATE_TBA_MONTH_RANGE.match(date_str)) is not None: tba = True groups = match.groups() me = ms = MONTHS_COMMON_NA[groups[0]] if groups[1] is not None: me = MONTHS_COMMON_NA[groups[1]] yrs = yre = int(groups[2]) if groups[2] else year_hint if ms > me: yre += 1 date_start = datetime.date(yrs, ms, MONTHS_LAST_DAY[ms-1]) date_end = datetime.date(yre, me, MONTHS_LAST_DAY[me-1]) elif (match := RGX_DATE_TBA_YEAR.match(date_str)) is not None: tba = True date_start = date_end = datetime.date(int(match.group(1)), 12, 31) else: raise NotImplementedError(f'There is no regex for parsing: {date_str}') return date_start, date_end, tba def _get_future_cons() -> Generator[PlannedFurCon, None, None]: yield from [] http_response = requests.get( 'https://en.wikifur.com/wiki/Template:Upcoming_events') http_response.raise_for_status() bs_page: BeautifulSoup = BeautifulSoup(http_response.text, 'html5lib') Path('_last.html').write_text(bs_page.prettify()) bs_table: Tag = bs_page.find('table') year_hint = datetime.datetime.now().year tba_hint = False for bs_td in bs_table.findAll('td'): # bs_td: Tag = bs_td for bs_item in bs_td.findChildren(recursive=False): # bs_item: Tag = bs_item if bs_item.name == 'dl': year_hint_candidate = next( iter(list(filter( len, map( lambda x: x.strip().strip('…').strip('.').strip(), bs_item.strings ) ))), str(year_hint) + (' TBA' if tba_hint else '') ) if year_hint_candidate.endswith('.'): year_hint_candidate = year_hint_candidate[:-1] year_hint_candidate = year_hint_candidate.strip() if year_hint_candidate.lower().endswith('cont'): year_hint_candidate = year_hint_candidate[:-4] year_hint_candidate = year_hint_candidate.strip() if year_hint_candidate.endswith(' TBA'): tba_hint = True year_hint_candidate = year_hint_candidate[:-4] if year_hint_candidate.strip().lower().endswith(' to be announced'): tba_hint = True year_hint_candidate = year_hint_candidate.strip()[:-16] if len(year_hint_candidate) == 4: year_hint = int(year_hint_candidate) else: raise NotImplementedError( f'Doesn\'t know how to convert {year_hint_candidate} to a year') elif bs_item.name == 'div': strings = list(bs_item.stripped_strings) if strings[-1].startswith(', '): del strings[-1] if len(strings) != 2: raise NotImplementedError( f'Cannot split {strings} into (date, convention)') country = next( filter(lambda a: a != 'f', bs_item['class']), 'unk') date, convention = strings date = date.strip(':') tba = tba_hint if date.endswith('?'): tba = True date = date.strip('?') date_start, date_end, tba = derive_date_en( date, year_hint, tba) yield PlannedFurCon(country, date_start, date_end, tba, convention) elif bs_item.name == 'p' and ''.join(bs_item.stripped_strings).strip() == '': pass # visual whitespace for wiki else: raise NotImplementedError( f'Cannot handle tag name {bs_item.name}') def get_future_cons() -> List[PlannedFurCon]: return sorted([*_get_future_cons()]) def lpad(data, pad: int, char=' ') -> str: data = str(data) if (missing := pad - len(data)) > 0: data = (str(char)*missing) + data return data def mkjsonfn(seq: int) -> str: return lpad(seq, 13, 0)+'.json' def in_order_unique(identifiables): added = set() filtered = list() for identifiable in identifiables: identifier = identifiable._identifier() if identifier not in added: added.add(identifier) filtered.append(identifiable) return filtered def dictify_identifiables(identifiables): d = dict() for identifiable in identifiables: d[identifiable._identifier()] = identifiable return d class FurConProgress: def __init__(self, end: datetime.datetime, stt: datetime.datetime, pct: int, ctr: str, enm: str ): self.end = end self.stt = stt self.pct = pct self.ctr = ctr self.enm = enm def __str__(self): return f'{type(self).__name__}{(self.end, self.stt, self.pct, self.ctr, self.enm)}' def _tuple_sort_repr(self): return (self.end, self.stt, self.pct, self.ctr, self.enm) def _cmp(self, o, op) -> bool: if hasattr(o, '_tuple_sort_repr'): return getattr(self._tuple_sort_repr(), op)(o._tuple_sort_repr()) return False def __eq__(self, o) -> bool: return self._cmp(o, '__eq__') def __ne__(self, o) -> bool: return self._cmp(o, '__ne__') def __lt__(self, o) -> bool: return self._cmp(o, '__lt__') def __le__(self, o) -> bool: return self._cmp(o, '__le__') def __gt__(self, o) -> bool: return self._cmp(o, '__gt__') def __ge__(self, o) -> bool: return self._cmp(o, '__ge__') def __hash__(self): return hash(self._tuple_sort_repr()) def _identifier(self): return self.name def main_non_sleeper() -> float: midnight = datetime.time(0, 0, 0, tzinfo=pytz.UTC) keys = configparser.ConfigParser() keys.read(Path('keys.ini')) auth = tweepy.OAuthHandler( keys['twitter']['api_key_public'], keys['twitter']['api_key_secret']) auth.set_access_token( keys['twitter_furconprogress']['oauth_token'], keys['twitter_furconprogress']['oauth_secret']) api = tweepy.API(auth) telebot = telegram.Bot(keys['telegram_furconprogress']['token']) bg_state_path = Path('bg_state.ini') bg_data_snt_path = Path('bg_data_snt') bg_data_evo_path = Path('bg_data_evo') bg_data_snt_path.mkdir(exist_ok=True, parents=True) bg_data_evo_path.mkdir(exist_ok=True, parents=True) now = datetime.datetime.now(tz=pytz.UTC) now_iso = now.isoformat() if not (seed := bg_data_evo_path.joinpath(mkjsonfn(0))).exists(): seed.write_text(json.dumps( dict(added=[], changed=[], deleted=[], dump={}, timestamp=datetime.datetime( 1970, 1, 1, tzinfo=pytz.UTC).isoformat(),), indent=1, sort_keys=True)) if not (seed := bg_data_snt_path.joinpath(mkjsonfn(0))).exists(): seed.write_text(json.dumps( dict(percentages={}, timestamp=datetime.datetime( 1970, 1, 1, tzinfo=pytz.UTC).isoformat(),), indent=1, sort_keys=True)) bg_state_cfg = configparser.ConfigParser() if bg_state_path.is_file(): bg_state_cfg.read(bg_state_path) if not bg_state_cfg.has_section('timings'): bg_state_cfg.add_section('timings') last_ran = datetime.datetime.fromisoformat( bg_state_cfg['timings'].get('last_ran') or now_iso) bg_state_cfg['timings']['last_last_ran'] = last_ran.isoformat() bg_state_cfg['timings']['last_ran'] = now_iso if bg_state_cfg['timings'].get('minimum_cooldown') is None: bg_state_cfg['timings']['minimum_cooldown'] = f'{3*3600}' minimum_cooldown = int(bg_state_cfg['timings']['minimum_cooldown']) if not bg_state_cfg.has_section('sync'): bg_state_cfg.add_section('sync') old_evo_id = int(bg_state_cfg['sync'].get('evolution') or '0') old_snt_id = int(bg_state_cfg['sync'].get('sent') or '0') bg_state_cfg['sync']['evolution'] = str(old_evo_id) bg_state_cfg['sync']['sent'] = str(old_snt_id) new_evo_id = old_evo_id new_snt_id = old_snt_id with bg_state_path.open('wt') as bg_state_fh: bg_state_cfg.write(bg_state_fh) if True: old_evo = json.loads(Path('bg_data_evo').joinpath( mkjsonfn(old_evo_id)).read_bytes()) old_cons: List[PlannedFurCon] = in_order_unique(sorted([ PlannedFurCon.from_json(x['data']) for x in old_evo['dump'].values()])) old_cons_dict: Dict[str, PlannedFurCon] = dictify_identifiables(old_cons) new_cons: List[PlannedFurCon] = in_order_unique(get_future_cons()) new_cons_dict: Dict[str, PlannedFurCon] = dictify_identifiables(new_cons) old_cons_frzset: FrozenSet[str] = frozenset( [x._identifier() for x in old_cons]) new_cons_frzset: FrozenSet[str] = frozenset( [x._identifier() for x in new_cons]) protected_old_cons: List[PlannedFurCon] = [x for x in old_cons if (not x.tba_status) and ( (datetime.datetime.combine(x.date_start, midnight) - now) < datetime.timedelta(days=5)) and ( (now - datetime.datetime.combine(x.date_start, midnight)) < datetime.timedelta(seconds=minimum_cooldown))] protected_old_cons_frzset: FrozenSet[str] = frozenset( [x._identifier() for x in protected_old_cons]) dropped_cons_frzset = old_cons_frzset.difference( new_cons_frzset).difference(protected_old_cons_frzset) created_cons_frzset = new_cons_frzset.difference( old_cons_frzset).difference(protected_old_cons_frzset) possibly_changed_frzset = old_cons_frzset.intersection( new_cons_frzset).difference(protected_old_cons_frzset) changed_frzset = frozenset( [x for x in possibly_changed_frzset if old_cons_dict[x] != new_cons_dict[x]]) if any(map(len, [changed_frzset, dropped_cons_frzset, created_cons_frzset])): new_evo_id += 1 bg_state_cfg['sync']['evolution'] = str(new_evo_id) new_dump = old_evo['dump'].copy() for dropped_con in [*dropped_cons_frzset, *changed_frzset]: new_dump.pop(dropped_con) for added_con in [*changed_frzset, *created_cons_frzset]: new_dump[added_con] = dict(data=new_cons_dict[added_con].to_json(), timestamp=now_iso) update = dict(added=sorted(list(created_cons_frzset)), changed=sorted(list(changed_frzset)), deleted=sorted(list(dropped_cons_frzset)), dump=new_dump, timestamp=now_iso,) bg_data_evo_path.joinpath(mkjsonfn(new_evo_id)).write_text( json.dumps(update, indent=1, sort_keys=True)) with bg_state_path.open('wt') as bg_state_fh: bg_state_cfg.write(bg_state_fh) latest_evo_values: List[Tuple[PlannedFurCon, datetime.datetime]] = [ (PlannedFurCon.from_json(x['data']), datetime.datetime.fromisoformat(x['timestamp'])) for x in ( json.loads(Path('bg_data_evo').joinpath( mkjsonfn(new_evo_id)).read_bytes())['dump'].values() ) ] old_snt = json.loads(bg_data_snt_path.joinpath( mkjsonfn(old_snt_id)).read_text()) old_snt_percentages: Dict[str, int] = old_snt['percentages'] del old_snt valid_evos: List[FurConProgress] = sorted([x for x in [ FurConProgress( end=end, stt=start, pct=100 if start >= end or now >= end else math.floor( int(100*min(1, max(0, (now - start)/(end - start))))), ctr=pfc2.country_iso3166_a3, enm=pfc2._identifier() ) for (pfc2, end, start) in [ ( pfc, datetime.datetime.combine(pfc.date_start, midnight), ts ) for (pfc, ts) in latest_evo_values if not pfc.tba_status ] ] if x.pct != old_snt_percentages.get(x.enm)]) latest_evo_values_len = len(latest_evo_values) del latest_evo_values if len(valid_evos) < max(0, min(5, latest_evo_values_len)): return minimum_cooldown selected_evos = valid_evos[:5] if (sevoslen := len(selected_evos)): new_snt_id = old_snt_id+1 bg_state_cfg['sync']['sent'] = str(new_snt_id) new_snt_percentages = old_snt_percentages.copy() im = PIL.Image.new('RGB', (1024, 512), color=(236, 239, 241)) draw = PIL.ImageDraw.Draw(im) tweet = '' for i, valid_evo in enumerate(selected_evos): ct = valid_evo.ctr nm = valid_evo.enm pc = valid_evo.pct subtweet = f'FurCon [{ct.upper()}] {nm} has been {pc}% awaited\n' if len((tweet+subtweet).encode('utf-8')) > 280: selected_evos = selected_evos[:i] sevoslen = i break else: tweet += subtweet for i in range(5): if i < sevoslen: valid_evo = selected_evos[i] i += (5 - sevoslen) / 2 ct = valid_evo.ctr nm = valid_evo.enm pc = valid_evo.pct st = f'{str(valid_evo.end.date())}' if pc < 100: dtt = '' dt = valid_evo.end - now dts = dt.total_seconds() if (dd := math.floor(dts / (24*3600))) > 0: dtt = f'{dd+1} days' elif (hh := math.ceil(dts / 3600)) > 0: dtt = f'{hh} hour' + ('s' if hh > 1 else '') if dtt: st = f'{dtt} until {str(valid_evo.end.date())}' new_snt_percentages[nm] = pc draw.text((8, round(102.4*i)+4), f'[{ct.upper()}] {nm}', font=IMAGE_FONT, fill=(38, 50, 56), align='left') ssx = IMAGE_FONT.getsize(st)[0] draw.text((1024-8-ssx, round(102.4*i)+4), st, font=IMAGE_FONT, fill=(38, 50, 56), align='left') draw.rectangle(((6, round(102.4*i)+32), (1018, round(102.4*(i+1))-6)), fill=(55, 71, 79)) draw.rectangle(((8, round(102.4*i)+34), (1016, round(102.4*(i+1))-8)), fill=(176, 190, 197)) draw.rectangle(((8, round(102.4*i)+34), (1016, round(102.4*(i+1))-8)), fill=(176, 190, 197)) draw.rectangle(((8, round(102.4*i)+34), (int(8+1008*(pc/100)), round(102.4*(i+1))-8)), fill=(85, 139, 47)) sx, sy = IMAGE_FONT2.getsize(f'{pc}%', stroke_width=2) draw.text((512-(sx//2), round(102.4*i)+62-(sy//2)), f'{pc}%', font=IMAGE_FONT2, fill=(38, 50, 56), align='left', stroke_width=2, stroke_fill=(236, 239, 241)) bio = BytesIO() im.save(bio, format='png') bim = bio.getvalue() if DEBUG: Path('tweet.png').write_bytes(bim) Path('tweet.txt').write_text(tweet) else: with TempFile() as path: path.write_bytes(bim) media = api.media_upload(filename=str(path)) status = api.update_status( status=tweet, media_ids=[media.media_id]) telebot.send_photo( chat_id=keys['telegram_furconprogress']['chat_id'], photo=bim, caption=tweet, ) update = dict(percentages=new_snt_percentages, timestamp=now_iso) bg_data_snt_path.joinpath(mkjsonfn(new_snt_id)).write_text( json.dumps(update, indent=1, sort_keys=True)) if DEBUG: return 1.0 with bg_state_path.open('wt') as bg_state_fh: bg_state_cfg.write(bg_state_fh) return minimum_cooldown def main(): if DEBUG: main_non_sleeper() else: while True: time.sleep(main_non_sleeper()) def safe_main(): try: main() except Exception as e: trace = ''.join(traceback.format_exception(None, e, e.__traceback__)) smtp_server = None smtp_working = False try: smtp_server = smtplib.SMTP('localhost', 25) smtp_server.noop() smtp_working = True except Exception: raise e if smtp_working: smtp_server.sendmail( 'http@localhost', 'root@localhost', 'Subject:[Fix me] Python backtrace\n' + trace ) time.sleep(3*3600) if __name__ == '__main__': safe_main()