site-socialmedia-progress-bars/bg_tweeter.py

582 lines
22 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import configparser
import datetime
import json
import math
import os
import re
import smtplib
import time
import traceback
import uuid
from io import BytesIO
from pathlib import Path
from typing import Dict, FrozenSet, Generator, List, Tuple
import telegram
import PIL.Image
import PIL.ImageDraw
import PIL.ImageFont
import pytz
import requests
import tweepy
from bs4 import BeautifulSoup
from bs4.element import Tag
IMAGE_FONT = PIL.ImageFont.truetype(
r'fonts/Montserrat/Montserrat-Medium.ttf', 20)
IMAGE_FONT2 = PIL.ImageFont.truetype(
r'fonts/Montserrat/Montserrat-SemiBold.ttf', 32)
DEBUG = bool(os.environ.get('DEV'))
RGX_DATE_CERTAIN = re.compile(
r'''([A-Za-z]+) (\d{1,2})(?:-(?:([A-Za-z]+) )?(\d{1,2})(?: (\d{4}))?)?''')
RGX_DATE_TBA_MONTH_RANGE = re.compile(
r'''([A-Za-z]+)(?:-([A-Za-z]+))?(?: (\d{4}))?''')
RGX_DATE_TBA_YEAR = re.compile(r'''(\d{4})''')
SEASON_DURATION = 3
MONTHS_COMMON_NA = {
'Jan': 1,
'Feb': 2,
'Mar': 3,
'Apr': 4,
'May': 5,
'Jun': 6,
'Jul': 7,
'Aug': 8,
'Sep': 9,
'Oct': 10,
'Nov': 11,
'Dec': 12,
'Spring': (((3+SEASON_DURATION) - 1) % 12) + 1,
'Summer': (((6+SEASON_DURATION) - 1) % 12) + 1,
'Autumn': (((9+SEASON_DURATION) - 1) % 12) + 1,
'Winter': (((12+SEASON_DURATION) - 1) % 12) + 1,
}
MONTHS_LAST_DAY = [*([31, 30]*3), 31, *([31, 30]*2), 31]
MONTHS_LAST_DAY[1] -= 2
class TempFile(object):
def __init__(self, *args, **kwargs):
self.path = Path(f'/tmp/{uuid.uuid4().hex}', *args, **kwargs)
self.path.touch()
def __enter__(self):
return self.path
def __exit__(self, exc_type, exc_value, traceback):
if self.path.is_file():
self.path.unlink()
return self
class PlannedFurCon:
def __init__(self, country_iso3166_a3: str, date_start: datetime.date, date_end: datetime.date, tba_status: bool, name: str):
self.country_iso3166_a3: str = country_iso3166_a3
self.date_start: datetime.date = date_start
self.date_end: datetime.date = date_end
self.tba_status: bool = tba_status
self.name: str = name
def to_json(self) -> dict:
return dict(
country_iso3166_a3=self.country_iso3166_a3,
date_start=[self.date_start.year,
self.date_start.month,
self.date_start.day],
date_end=[self.date_end.year,
self.date_end.month,
self.date_end.day],
tba_status=self.tba_status,
name=self.name,
)
@ classmethod
def from_json(cls, d) -> 'PlannedFurCon':
d = d.copy()
d['date_start'] = datetime.date(*d['date_start'])
d['date_end'] = datetime.date(*d['date_end'])
return cls(**d)
def __repr__(self) -> str:
return str(f'{type(self).__name__}{(self.country_iso3166_a3, self.date_start, self.date_end, self.tba_status, self.name,)}')
def _tuple_sort_repr(self) -> tuple:
return (self.date_start, self.date_end, self.tba_status,
self.country_iso3166_a3, self.name)
def _cmp(self, o, op) -> bool:
if hasattr(o, '_tuple_sort_repr'):
return getattr(self._tuple_sort_repr(), op)(o._tuple_sort_repr())
return False
def __eq__(self, o) -> bool:
return self._cmp(o, '__eq__')
def __ne__(self, o) -> bool:
return self._cmp(o, '__ne__')
def __lt__(self, o) -> bool:
return self._cmp(o, '__lt__')
def __le__(self, o) -> bool:
return self._cmp(o, '__le__')
def __gt__(self, o) -> bool:
return self._cmp(o, '__gt__')
def __ge__(self, o) -> bool:
return self._cmp(o, '__ge__')
def __hash__(self):
return hash(self._tuple_sort_repr())
def _identifier(self):
return self.name
def derive_date_en(date_str: str, year_hint: int, tba: bool) -> Tuple[datetime.date, datetime.date, bool]:
date_start = None
date_end = None
if (match := RGX_DATE_CERTAIN.match(date_str)) is not None:
groups = match.groups()
ms = MONTHS_COMMON_NA[groups[0]]
ds = int(groups[1])
ys = ye = int(groups[4]) if groups[4] else year_hint
me = MONTHS_COMMON_NA[groups[2]] if groups[2] else ms
de = int(groups[3]) if groups[3] else ds
while ye < ys:
ye += 1
while (ye, me) < (ys, ms):
ye += 1
while (ye, me, de) < (ys, ms, ds):
me += 1
if de > MONTHS_LAST_DAY[me-1]:
de = 1
me += 1
if me > 12:
ye += 1
me = 1
date_start = datetime.date(ys, ms, ds)
date_end = datetime.date(ye, me, de)
elif (match := RGX_DATE_TBA_MONTH_RANGE.match(date_str)) is not None:
tba = True
groups = match.groups()
me = ms = MONTHS_COMMON_NA[groups[0]]
if groups[1] is not None:
me = MONTHS_COMMON_NA[groups[1]]
yrs = yre = int(groups[2]) if groups[2] else year_hint
if ms > me:
yre += 1
date_start = datetime.date(yrs, ms, MONTHS_LAST_DAY[ms-1])
date_end = datetime.date(yre, me, MONTHS_LAST_DAY[me-1])
elif (match := RGX_DATE_TBA_YEAR.match(date_str)) is not None:
tba = True
date_start = date_end = datetime.date(int(match.group(1)), 12, 31)
else:
raise NotImplementedError(f'There is no regex for parsing: {date_str}')
return date_start, date_end, tba
def _get_future_cons() -> Generator[PlannedFurCon, None, None]:
yield from []
http_response = requests.get(
'https://en.wikifur.com/wiki/Template:Upcoming_events')
http_response.raise_for_status()
bs_page: BeautifulSoup = BeautifulSoup(http_response.text, 'html5lib')
Path('_last.html').write_text(bs_page.prettify())
bs_table: Tag = bs_page.find('table')
year_hint = datetime.datetime.now().year
tba_hint = False
for bs_td in bs_table.findAll('td'):
# bs_td: Tag = bs_td
for bs_item in bs_td.findChildren(recursive=False):
# bs_item: Tag = bs_item
if bs_item.name == 'dl':
year_hint_candidate = next(
iter(list(filter(
len,
map(
lambda x: x.strip().strip('').strip('.').strip(),
bs_item.strings
)
))),
str(year_hint) + (' TBA' if tba_hint else '')
)
if year_hint_candidate.endswith('.'):
year_hint_candidate = year_hint_candidate[:-1]
year_hint_candidate = year_hint_candidate.strip()
if year_hint_candidate.lower().endswith('cont'):
year_hint_candidate = year_hint_candidate[:-4]
year_hint_candidate = year_hint_candidate.strip()
if year_hint_candidate.endswith(' TBA'):
tba_hint = True
year_hint_candidate = year_hint_candidate[:-4]
if year_hint_candidate.strip().lower().endswith(' to be announced'):
tba_hint = True
year_hint_candidate = year_hint_candidate.strip()[:-16]
if len(year_hint_candidate) == 4:
year_hint = int(year_hint_candidate)
else:
raise NotImplementedError(
f'Doesn\'t know how to convert {year_hint_candidate} to a year')
elif bs_item.name == 'div':
strings = list(bs_item.stripped_strings)
if strings[-1].startswith(', '):
del strings[-1]
if len(strings) != 2:
raise NotImplementedError(
f'Cannot split {strings} into (date, convention)')
country = next(
filter(lambda a: a != 'f', bs_item['class']), 'unk')
date, convention = strings
date = date.strip(':')
tba = tba_hint
if date.endswith('?'):
tba = True
date = date.strip('?')
date_start, date_end, tba = derive_date_en(
date, year_hint, tba)
yield PlannedFurCon(country, date_start, date_end, tba, convention)
elif bs_item.name == 'p' and ''.join(bs_item.stripped_strings).strip() == '':
pass # visual whitespace for wiki
else:
raise NotImplementedError(
f'Cannot handle tag name {bs_item.name}')
def get_future_cons() -> List[PlannedFurCon]:
return sorted([*_get_future_cons()])
def lpad(data, pad: int, char=' ') -> str:
data = str(data)
if (missing := pad - len(data)) > 0:
data = (str(char)*missing) + data
return data
def mkjsonfn(seq: int) -> str:
return lpad(seq, 13, 0)+'.json'
def in_order_unique(identifiables):
added = set()
filtered = list()
for identifiable in identifiables:
identifier = identifiable._identifier()
if identifier not in added:
added.add(identifier)
filtered.append(identifiable)
return filtered
def dictify_identifiables(identifiables):
d = dict()
for identifiable in identifiables:
d[identifiable._identifier()] = identifiable
return d
class FurConProgress:
def __init__(self,
end: datetime.datetime,
stt: datetime.datetime,
pct: int,
ctr: str,
enm: str
):
self.end = end
self.stt = stt
self.pct = pct
self.ctr = ctr
self.enm = enm
def __str__(self):
return f'{type(self).__name__}{(self.end, self.stt, self.pct, self.ctr, self.enm)}'
def _tuple_sort_repr(self):
return (self.end, self.stt, self.pct, self.ctr, self.enm)
def _cmp(self, o, op) -> bool:
if hasattr(o, '_tuple_sort_repr'):
return getattr(self._tuple_sort_repr(), op)(o._tuple_sort_repr())
return False
def __eq__(self, o) -> bool:
return self._cmp(o, '__eq__')
def __ne__(self, o) -> bool:
return self._cmp(o, '__ne__')
def __lt__(self, o) -> bool:
return self._cmp(o, '__lt__')
def __le__(self, o) -> bool:
return self._cmp(o, '__le__')
def __gt__(self, o) -> bool:
return self._cmp(o, '__gt__')
def __ge__(self, o) -> bool:
return self._cmp(o, '__ge__')
def __hash__(self):
return hash(self._tuple_sort_repr())
def _identifier(self):
return self.name
def main_non_sleeper() -> float:
midnight = datetime.time(0, 0, 0, tzinfo=pytz.UTC)
keys = configparser.ConfigParser()
keys.read(Path('keys.ini'))
auth = tweepy.OAuthHandler(
keys['twitter']['api_key_public'],
keys['twitter']['api_key_secret'])
auth.set_access_token(
keys['twitter_furconprogress']['oauth_token'],
keys['twitter_furconprogress']['oauth_secret'])
api = tweepy.API(auth)
telebot = telegram.Bot(keys['telegram_furconprogress']['token'])
bg_state_path = Path('bg_state.ini')
bg_data_snt_path = Path('bg_data_snt')
bg_data_evo_path = Path('bg_data_evo')
bg_data_snt_path.mkdir(exist_ok=True, parents=True)
bg_data_evo_path.mkdir(exist_ok=True, parents=True)
now = datetime.datetime.now(tz=pytz.UTC)
now_iso = now.isoformat()
if not (seed := bg_data_evo_path.joinpath(mkjsonfn(0))).exists():
seed.write_text(json.dumps(
dict(added=[], changed=[], deleted=[], dump={},
timestamp=datetime.datetime(
1970, 1, 1, tzinfo=pytz.UTC).isoformat(),),
indent=1, sort_keys=True))
if not (seed := bg_data_snt_path.joinpath(mkjsonfn(0))).exists():
seed.write_text(json.dumps(
dict(percentages={},
timestamp=datetime.datetime(
1970, 1, 1, tzinfo=pytz.UTC).isoformat(),),
indent=1, sort_keys=True))
bg_state_cfg = configparser.ConfigParser()
if bg_state_path.is_file():
bg_state_cfg.read(bg_state_path)
if not bg_state_cfg.has_section('timings'):
bg_state_cfg.add_section('timings')
last_ran = datetime.datetime.fromisoformat(
bg_state_cfg['timings'].get('last_ran') or now_iso)
bg_state_cfg['timings']['last_last_ran'] = last_ran.isoformat()
bg_state_cfg['timings']['last_ran'] = now_iso
if bg_state_cfg['timings'].get('minimum_cooldown') is None:
bg_state_cfg['timings']['minimum_cooldown'] = f'{3*3600}'
minimum_cooldown = int(bg_state_cfg['timings']['minimum_cooldown'])
if not bg_state_cfg.has_section('sync'):
bg_state_cfg.add_section('sync')
old_evo_id = int(bg_state_cfg['sync'].get('evolution') or '0')
old_snt_id = int(bg_state_cfg['sync'].get('sent') or '0')
bg_state_cfg['sync']['evolution'] = str(old_evo_id)
bg_state_cfg['sync']['sent'] = str(old_snt_id)
new_evo_id = old_evo_id
new_snt_id = old_snt_id
with bg_state_path.open('wt') as bg_state_fh:
bg_state_cfg.write(bg_state_fh)
if True:
old_evo = json.loads(Path('bg_data_evo').joinpath(
mkjsonfn(old_evo_id)).read_bytes())
old_cons: List[PlannedFurCon] = in_order_unique(sorted([
PlannedFurCon.from_json(x['data']) for x in old_evo['dump'].values()]))
old_cons_dict: Dict[str,
PlannedFurCon] = dictify_identifiables(old_cons)
new_cons: List[PlannedFurCon] = in_order_unique(get_future_cons())
new_cons_dict: Dict[str,
PlannedFurCon] = dictify_identifiables(new_cons)
old_cons_frzset: FrozenSet[str] = frozenset(
[x._identifier() for x in old_cons])
new_cons_frzset: FrozenSet[str] = frozenset(
[x._identifier() for x in new_cons])
protected_old_cons: List[PlannedFurCon] = [x for x in old_cons if (not x.tba_status) and (
(datetime.datetime.combine(x.date_start, midnight) - now) < datetime.timedelta(days=5)) and (
(now - datetime.datetime.combine(x.date_start, midnight)) < datetime.timedelta(seconds=minimum_cooldown))]
protected_old_cons_frzset: FrozenSet[str] = frozenset(
[x._identifier() for x in protected_old_cons])
dropped_cons_frzset = old_cons_frzset.difference(
new_cons_frzset).difference(protected_old_cons_frzset)
created_cons_frzset = new_cons_frzset.difference(
old_cons_frzset).difference(protected_old_cons_frzset)
possibly_changed_frzset = old_cons_frzset.intersection(
new_cons_frzset).difference(protected_old_cons_frzset)
changed_frzset = frozenset(
[x for x in possibly_changed_frzset if old_cons_dict[x] != new_cons_dict[x]])
if any(map(len, [changed_frzset, dropped_cons_frzset, created_cons_frzset])):
new_evo_id += 1
bg_state_cfg['sync']['evolution'] = str(new_evo_id)
new_dump = old_evo['dump'].copy()
for dropped_con in [*dropped_cons_frzset, *changed_frzset]:
new_dump.pop(dropped_con)
for added_con in [*changed_frzset, *created_cons_frzset]:
new_dump[added_con] = dict(data=new_cons_dict[added_con].to_json(),
timestamp=now_iso)
update = dict(added=sorted(list(created_cons_frzset)), changed=sorted(list(changed_frzset)),
deleted=sorted(list(dropped_cons_frzset)), dump=new_dump, timestamp=now_iso,)
bg_data_evo_path.joinpath(mkjsonfn(new_evo_id)).write_text(
json.dumps(update, indent=1, sort_keys=True))
with bg_state_path.open('wt') as bg_state_fh:
bg_state_cfg.write(bg_state_fh)
latest_evo_values: List[Tuple[PlannedFurCon, datetime.datetime]] = [
(PlannedFurCon.from_json(x['data']),
datetime.datetime.fromisoformat(x['timestamp'])) for x in (
json.loads(Path('bg_data_evo').joinpath(
mkjsonfn(new_evo_id)).read_bytes())['dump'].values()
)
]
old_snt = json.loads(bg_data_snt_path.joinpath(
mkjsonfn(old_snt_id)).read_text())
old_snt_percentages: Dict[str, int] = old_snt['percentages']
del old_snt
valid_evos: List[FurConProgress] = sorted([x for x in [
FurConProgress(
end=end,
stt=start,
pct=100 if start >= end or now >= end else math.floor(
int(100*min(1, max(0, (now - start)/(end - start))))),
ctr=pfc2.country_iso3166_a3,
enm=pfc2._identifier()
)
for (pfc2, end, start) in [
(
pfc,
datetime.datetime.combine(pfc.date_start, midnight),
ts
)
for (pfc, ts) in latest_evo_values
if not pfc.tba_status
]
] if x.pct != old_snt_percentages.get(x.enm)])
latest_evo_values_len = len(latest_evo_values)
del latest_evo_values
if len(valid_evos) < max(0, min(5, latest_evo_values_len)):
return minimum_cooldown
selected_evos = valid_evos[:5]
if (sevoslen := len(selected_evos)):
new_snt_id = old_snt_id+1
bg_state_cfg['sync']['sent'] = str(new_snt_id)
new_snt_percentages = old_snt_percentages.copy()
im = PIL.Image.new('RGB', (1024, 512),
color=(236, 239, 241))
draw = PIL.ImageDraw.Draw(im)
tweet = ''
for i, valid_evo in enumerate(selected_evos):
ct = valid_evo.ctr
nm = valid_evo.enm
pc = valid_evo.pct
subtweet = f'FurCon [{ct.upper()}] {nm} has been {pc}% awaited\n'
if len((tweet+subtweet).encode('utf-8')) > 280:
selected_evos = selected_evos[:i]
sevoslen = i
break
else:
tweet += subtweet
for i in range(5):
if i < sevoslen:
valid_evo = selected_evos[i]
i += (5 - sevoslen) / 2
ct = valid_evo.ctr
nm = valid_evo.enm
pc = valid_evo.pct
st = f'{str(valid_evo.end.date())}'
if pc < 100:
dtt = ''
dt = valid_evo.end - now
dts = dt.total_seconds()
if (dd := math.floor(dts / (24*3600))) > 0:
dtt = f'{dd+1} days'
elif (hh := math.ceil(dts / 3600)) > 0:
dtt = f'{hh} hour' + ('s' if hh > 1 else '')
if dtt:
st = f'{dtt} until {str(valid_evo.end.date())}'
new_snt_percentages[nm] = pc
draw.text((8, round(102.4*i)+4), f'[{ct.upper()}] {nm}', font=IMAGE_FONT,
fill=(38, 50, 56), align='left')
ssx = IMAGE_FONT.getsize(st)[0]
draw.text((1024-8-ssx, round(102.4*i)+4), st, font=IMAGE_FONT,
fill=(38, 50, 56), align='left')
draw.rectangle(((6, round(102.4*i)+32), (1018, round(102.4*(i+1))-6)),
fill=(55, 71, 79))
draw.rectangle(((8, round(102.4*i)+34), (1016, round(102.4*(i+1))-8)),
fill=(176, 190, 197))
draw.rectangle(((8, round(102.4*i)+34), (1016, round(102.4*(i+1))-8)),
fill=(176, 190, 197))
draw.rectangle(((8, round(102.4*i)+34), (int(8+1008*(pc/100)), round(102.4*(i+1))-8)),
fill=(85, 139, 47))
sx, sy = IMAGE_FONT2.getsize(f'{pc}%', stroke_width=2)
draw.text((512-(sx//2), round(102.4*i)+62-(sy//2)), f'{pc}%', font=IMAGE_FONT2,
fill=(38, 50, 56), align='left', stroke_width=2,
stroke_fill=(236, 239, 241))
bio = BytesIO()
im.save(bio, format='png')
bim = bio.getvalue()
if DEBUG:
Path('tweet.png').write_bytes(bim)
Path('tweet.txt').write_text(tweet)
else:
with TempFile() as path:
path.write_bytes(bim)
media = api.media_upload(filename=str(path))
status = api.update_status(
status=tweet, media_ids=[media.media_id])
telebot.send_photo(
chat_id=keys['telegram_furconprogress']['chat_id'],
photo=bim,
caption=tweet,
)
update = dict(percentages=new_snt_percentages, timestamp=now_iso)
bg_data_snt_path.joinpath(mkjsonfn(new_snt_id)).write_text(
json.dumps(update, indent=1, sort_keys=True))
if DEBUG:
return 1.0
with bg_state_path.open('wt') as bg_state_fh:
bg_state_cfg.write(bg_state_fh)
return minimum_cooldown
def main():
if DEBUG:
main_non_sleeper()
else:
while True:
time.sleep(main_non_sleeper())
def safe_main():
try:
main()
except Exception as e:
trace = ''.join(traceback.format_exception(None, e, e.__traceback__))
smtp_server = None
smtp_working = False
try:
smtp_server = smtplib.SMTP('localhost', 25)
smtp_server.noop()
smtp_working = True
except Exception:
raise e
if smtp_working:
smtp_server.sendmail(
'http@localhost', 'root@localhost',
'Subject:[Fix me] Python backtrace\n' +
trace
)
time.sleep(3*3600)
if __name__ == '__main__':
safe_main()