582 lines
22 KiB
Python
Executable File
582 lines
22 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- encoding: utf-8 -*-
|
|
|
|
import configparser
|
|
import datetime
|
|
import json
|
|
import math
|
|
import os
|
|
import re
|
|
import smtplib
|
|
import time
|
|
import traceback
|
|
import uuid
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
from typing import Dict, FrozenSet, Generator, List, Tuple
|
|
import telegram
|
|
|
|
import PIL.Image
|
|
import PIL.ImageDraw
|
|
import PIL.ImageFont
|
|
import pytz
|
|
import requests
|
|
import tweepy
|
|
from bs4 import BeautifulSoup
|
|
from bs4.element import Tag
|
|
|
|
IMAGE_FONT = PIL.ImageFont.truetype(
|
|
r'fonts/Montserrat/Montserrat-Medium.ttf', 20)
|
|
IMAGE_FONT2 = PIL.ImageFont.truetype(
|
|
r'fonts/Montserrat/Montserrat-SemiBold.ttf', 32)
|
|
|
|
DEBUG = bool(os.environ.get('DEV'))
|
|
|
|
RGX_DATE_CERTAIN = re.compile(
|
|
r'''([A-Za-z]+) (\d{1,2})(?:-(?:([A-Za-z]+) )?(\d{1,2})(?: (\d{4}))?)?''')
|
|
RGX_DATE_TBA_MONTH_RANGE = re.compile(
|
|
r'''([A-Za-z]+)(?:-([A-Za-z]+))?(?: (\d{4}))?''')
|
|
RGX_DATE_TBA_YEAR = re.compile(r'''(\d{4})''')
|
|
|
|
SEASON_DURATION = 3
|
|
|
|
MONTHS_COMMON_NA = {
|
|
'Jan': 1,
|
|
'Feb': 2,
|
|
'Mar': 3,
|
|
'Apr': 4,
|
|
'May': 5,
|
|
'Jun': 6,
|
|
'Jul': 7,
|
|
'Aug': 8,
|
|
'Sep': 9,
|
|
'Oct': 10,
|
|
'Nov': 11,
|
|
'Dec': 12,
|
|
'Spring': (((3+SEASON_DURATION) - 1) % 12) + 1,
|
|
'Summer': (((6+SEASON_DURATION) - 1) % 12) + 1,
|
|
'Autumn': (((9+SEASON_DURATION) - 1) % 12) + 1,
|
|
'Winter': (((12+SEASON_DURATION) - 1) % 12) + 1,
|
|
}
|
|
|
|
MONTHS_LAST_DAY = [*([31, 30]*3), 31, *([31, 30]*2), 31]
|
|
MONTHS_LAST_DAY[1] -= 2
|
|
|
|
|
|
class TempFile(object):
|
|
def __init__(self, *args, **kwargs):
|
|
self.path = Path(f'/tmp/{uuid.uuid4().hex}', *args, **kwargs)
|
|
self.path.touch()
|
|
|
|
def __enter__(self):
|
|
return self.path
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
if self.path.is_file():
|
|
self.path.unlink()
|
|
return self
|
|
|
|
|
|
class PlannedFurCon:
|
|
def __init__(self, country_iso3166_a3: str, date_start: datetime.date, date_end: datetime.date, tba_status: bool, name: str):
|
|
self.country_iso3166_a3: str = country_iso3166_a3
|
|
self.date_start: datetime.date = date_start
|
|
self.date_end: datetime.date = date_end
|
|
self.tba_status: bool = tba_status
|
|
self.name: str = name
|
|
|
|
def to_json(self) -> dict:
|
|
return dict(
|
|
country_iso3166_a3=self.country_iso3166_a3,
|
|
date_start=[self.date_start.year,
|
|
self.date_start.month,
|
|
self.date_start.day],
|
|
date_end=[self.date_end.year,
|
|
self.date_end.month,
|
|
self.date_end.day],
|
|
tba_status=self.tba_status,
|
|
name=self.name,
|
|
)
|
|
|
|
@ classmethod
|
|
def from_json(cls, d) -> 'PlannedFurCon':
|
|
d = d.copy()
|
|
d['date_start'] = datetime.date(*d['date_start'])
|
|
d['date_end'] = datetime.date(*d['date_end'])
|
|
return cls(**d)
|
|
|
|
def __repr__(self) -> str:
|
|
return str(f'{type(self).__name__}{(self.country_iso3166_a3, self.date_start, self.date_end, self.tba_status, self.name,)}')
|
|
|
|
def _tuple_sort_repr(self) -> tuple:
|
|
return (self.date_start, self.date_end, self.tba_status,
|
|
self.country_iso3166_a3, self.name)
|
|
|
|
def _cmp(self, o, op) -> bool:
|
|
if hasattr(o, '_tuple_sort_repr'):
|
|
return getattr(self._tuple_sort_repr(), op)(o._tuple_sort_repr())
|
|
return False
|
|
|
|
def __eq__(self, o) -> bool:
|
|
return self._cmp(o, '__eq__')
|
|
|
|
def __ne__(self, o) -> bool:
|
|
return self._cmp(o, '__ne__')
|
|
|
|
def __lt__(self, o) -> bool:
|
|
return self._cmp(o, '__lt__')
|
|
|
|
def __le__(self, o) -> bool:
|
|
return self._cmp(o, '__le__')
|
|
|
|
def __gt__(self, o) -> bool:
|
|
return self._cmp(o, '__gt__')
|
|
|
|
def __ge__(self, o) -> bool:
|
|
return self._cmp(o, '__ge__')
|
|
|
|
def __hash__(self):
|
|
return hash(self._tuple_sort_repr())
|
|
|
|
def _identifier(self):
|
|
return self.name
|
|
|
|
|
|
def derive_date_en(date_str: str, year_hint: int, tba: bool) -> Tuple[datetime.date, datetime.date, bool]:
|
|
date_start = None
|
|
date_end = None
|
|
if (match := RGX_DATE_CERTAIN.match(date_str)) is not None:
|
|
groups = match.groups()
|
|
ms = MONTHS_COMMON_NA[groups[0]]
|
|
ds = int(groups[1])
|
|
ys = ye = int(groups[4]) if groups[4] else year_hint
|
|
me = MONTHS_COMMON_NA[groups[2]] if groups[2] else ms
|
|
de = int(groups[3]) if groups[3] else ds
|
|
while ye < ys:
|
|
ye += 1
|
|
while (ye, me) < (ys, ms):
|
|
ye += 1
|
|
while (ye, me, de) < (ys, ms, ds):
|
|
me += 1
|
|
if de > MONTHS_LAST_DAY[me-1]:
|
|
de = 1
|
|
me += 1
|
|
if me > 12:
|
|
ye += 1
|
|
me = 1
|
|
date_start = datetime.date(ys, ms, ds)
|
|
date_end = datetime.date(ye, me, de)
|
|
elif (match := RGX_DATE_TBA_MONTH_RANGE.match(date_str)) is not None:
|
|
tba = True
|
|
groups = match.groups()
|
|
me = ms = MONTHS_COMMON_NA[groups[0]]
|
|
if groups[1] is not None:
|
|
me = MONTHS_COMMON_NA[groups[1]]
|
|
yrs = yre = int(groups[2]) if groups[2] else year_hint
|
|
if ms > me:
|
|
yre += 1
|
|
date_start = datetime.date(yrs, ms, MONTHS_LAST_DAY[ms-1])
|
|
date_end = datetime.date(yre, me, MONTHS_LAST_DAY[me-1])
|
|
elif (match := RGX_DATE_TBA_YEAR.match(date_str)) is not None:
|
|
tba = True
|
|
date_start = date_end = datetime.date(int(match.group(1)), 12, 31)
|
|
else:
|
|
raise NotImplementedError(f'There is no regex for parsing: {date_str}')
|
|
return date_start, date_end, tba
|
|
|
|
|
|
def _get_future_cons() -> Generator[PlannedFurCon, None, None]:
|
|
yield from []
|
|
http_response = requests.get(
|
|
'https://en.wikifur.com/wiki/Template:Upcoming_events')
|
|
http_response.raise_for_status()
|
|
bs_page: BeautifulSoup = BeautifulSoup(http_response.text, 'html5lib')
|
|
Path('_last.html').write_text(bs_page.prettify())
|
|
bs_table: Tag = bs_page.find('table')
|
|
year_hint = datetime.datetime.now().year
|
|
tba_hint = False
|
|
for bs_td in bs_table.findAll('td'):
|
|
# bs_td: Tag = bs_td
|
|
for bs_item in bs_td.findChildren(recursive=False):
|
|
# bs_item: Tag = bs_item
|
|
if bs_item.name == 'dl':
|
|
year_hint_candidate = next(
|
|
iter(list(filter(
|
|
len,
|
|
map(
|
|
lambda x: x.strip().strip('…').strip('.').strip(),
|
|
bs_item.strings
|
|
)
|
|
))),
|
|
str(year_hint) + (' TBA' if tba_hint else '')
|
|
)
|
|
if year_hint_candidate.endswith('.'):
|
|
year_hint_candidate = year_hint_candidate[:-1]
|
|
year_hint_candidate = year_hint_candidate.strip()
|
|
if year_hint_candidate.lower().endswith('cont'):
|
|
year_hint_candidate = year_hint_candidate[:-4]
|
|
year_hint_candidate = year_hint_candidate.strip()
|
|
if year_hint_candidate.endswith(' TBA'):
|
|
tba_hint = True
|
|
year_hint_candidate = year_hint_candidate[:-4]
|
|
if year_hint_candidate.strip().lower().endswith(' to be announced'):
|
|
tba_hint = True
|
|
year_hint_candidate = year_hint_candidate.strip()[:-16]
|
|
if len(year_hint_candidate) == 4:
|
|
year_hint = int(year_hint_candidate)
|
|
else:
|
|
raise NotImplementedError(
|
|
f'Doesn\'t know how to convert {year_hint_candidate} to a year')
|
|
elif bs_item.name == 'div':
|
|
strings = list(bs_item.stripped_strings)
|
|
if strings[-1].startswith(', '):
|
|
del strings[-1]
|
|
if len(strings) != 2:
|
|
raise NotImplementedError(
|
|
f'Cannot split {strings} into (date, convention)')
|
|
country = next(
|
|
filter(lambda a: a != 'f', bs_item['class']), 'unk')
|
|
date, convention = strings
|
|
date = date.strip(':')
|
|
tba = tba_hint
|
|
if date.endswith('?'):
|
|
tba = True
|
|
date = date.strip('?')
|
|
date_start, date_end, tba = derive_date_en(
|
|
date, year_hint, tba)
|
|
yield PlannedFurCon(country, date_start, date_end, tba, convention)
|
|
elif bs_item.name == 'p' and ''.join(bs_item.stripped_strings).strip() == '':
|
|
pass # visual whitespace for wiki
|
|
else:
|
|
raise NotImplementedError(
|
|
f'Cannot handle tag name {bs_item.name}')
|
|
|
|
|
|
def get_future_cons() -> List[PlannedFurCon]:
|
|
return sorted([*_get_future_cons()])
|
|
|
|
|
|
def lpad(data, pad: int, char=' ') -> str:
|
|
data = str(data)
|
|
if (missing := pad - len(data)) > 0:
|
|
data = (str(char)*missing) + data
|
|
return data
|
|
|
|
|
|
def mkjsonfn(seq: int) -> str:
|
|
return lpad(seq, 13, 0)+'.json'
|
|
|
|
|
|
def in_order_unique(identifiables):
|
|
added = set()
|
|
filtered = list()
|
|
for identifiable in identifiables:
|
|
identifier = identifiable._identifier()
|
|
if identifier not in added:
|
|
added.add(identifier)
|
|
filtered.append(identifiable)
|
|
return filtered
|
|
|
|
|
|
def dictify_identifiables(identifiables):
|
|
d = dict()
|
|
for identifiable in identifiables:
|
|
d[identifiable._identifier()] = identifiable
|
|
return d
|
|
|
|
|
|
class FurConProgress:
|
|
def __init__(self,
|
|
end: datetime.datetime,
|
|
stt: datetime.datetime,
|
|
pct: int,
|
|
ctr: str,
|
|
enm: str
|
|
):
|
|
self.end = end
|
|
self.stt = stt
|
|
self.pct = pct
|
|
self.ctr = ctr
|
|
self.enm = enm
|
|
|
|
def __str__(self):
|
|
return f'{type(self).__name__}{(self.end, self.stt, self.pct, self.ctr, self.enm)}'
|
|
|
|
def _tuple_sort_repr(self):
|
|
return (self.end, self.stt, self.pct, self.ctr, self.enm)
|
|
|
|
def _cmp(self, o, op) -> bool:
|
|
if hasattr(o, '_tuple_sort_repr'):
|
|
return getattr(self._tuple_sort_repr(), op)(o._tuple_sort_repr())
|
|
return False
|
|
|
|
def __eq__(self, o) -> bool:
|
|
return self._cmp(o, '__eq__')
|
|
|
|
def __ne__(self, o) -> bool:
|
|
return self._cmp(o, '__ne__')
|
|
|
|
def __lt__(self, o) -> bool:
|
|
return self._cmp(o, '__lt__')
|
|
|
|
def __le__(self, o) -> bool:
|
|
return self._cmp(o, '__le__')
|
|
|
|
def __gt__(self, o) -> bool:
|
|
return self._cmp(o, '__gt__')
|
|
|
|
def __ge__(self, o) -> bool:
|
|
return self._cmp(o, '__ge__')
|
|
|
|
def __hash__(self):
|
|
return hash(self._tuple_sort_repr())
|
|
|
|
def _identifier(self):
|
|
return self.name
|
|
|
|
|
|
def main_non_sleeper() -> float:
|
|
midnight = datetime.time(0, 0, 0, tzinfo=pytz.UTC)
|
|
keys = configparser.ConfigParser()
|
|
keys.read(Path('keys.ini'))
|
|
auth = tweepy.OAuthHandler(
|
|
keys['twitter']['api_key_public'],
|
|
keys['twitter']['api_key_secret'])
|
|
auth.set_access_token(
|
|
keys['twitter_furconprogress']['oauth_token'],
|
|
keys['twitter_furconprogress']['oauth_secret'])
|
|
api = tweepy.API(auth)
|
|
telebot = telegram.Bot(keys['telegram_furconprogress']['token'])
|
|
bg_state_path = Path('bg_state.ini')
|
|
bg_data_snt_path = Path('bg_data_snt')
|
|
bg_data_evo_path = Path('bg_data_evo')
|
|
bg_data_snt_path.mkdir(exist_ok=True, parents=True)
|
|
bg_data_evo_path.mkdir(exist_ok=True, parents=True)
|
|
now = datetime.datetime.now(tz=pytz.UTC)
|
|
now_iso = now.isoformat()
|
|
if not (seed := bg_data_evo_path.joinpath(mkjsonfn(0))).exists():
|
|
seed.write_text(json.dumps(
|
|
dict(added=[], changed=[], deleted=[], dump={},
|
|
timestamp=datetime.datetime(
|
|
1970, 1, 1, tzinfo=pytz.UTC).isoformat(),),
|
|
indent=1, sort_keys=True))
|
|
if not (seed := bg_data_snt_path.joinpath(mkjsonfn(0))).exists():
|
|
seed.write_text(json.dumps(
|
|
dict(percentages={},
|
|
timestamp=datetime.datetime(
|
|
1970, 1, 1, tzinfo=pytz.UTC).isoformat(),),
|
|
indent=1, sort_keys=True))
|
|
bg_state_cfg = configparser.ConfigParser()
|
|
if bg_state_path.is_file():
|
|
bg_state_cfg.read(bg_state_path)
|
|
if not bg_state_cfg.has_section('timings'):
|
|
bg_state_cfg.add_section('timings')
|
|
last_ran = datetime.datetime.fromisoformat(
|
|
bg_state_cfg['timings'].get('last_ran') or now_iso)
|
|
bg_state_cfg['timings']['last_last_ran'] = last_ran.isoformat()
|
|
bg_state_cfg['timings']['last_ran'] = now_iso
|
|
if bg_state_cfg['timings'].get('minimum_cooldown') is None:
|
|
bg_state_cfg['timings']['minimum_cooldown'] = f'{3*3600}'
|
|
minimum_cooldown = int(bg_state_cfg['timings']['minimum_cooldown'])
|
|
if not bg_state_cfg.has_section('sync'):
|
|
bg_state_cfg.add_section('sync')
|
|
old_evo_id = int(bg_state_cfg['sync'].get('evolution') or '0')
|
|
old_snt_id = int(bg_state_cfg['sync'].get('sent') or '0')
|
|
bg_state_cfg['sync']['evolution'] = str(old_evo_id)
|
|
bg_state_cfg['sync']['sent'] = str(old_snt_id)
|
|
new_evo_id = old_evo_id
|
|
new_snt_id = old_snt_id
|
|
with bg_state_path.open('wt') as bg_state_fh:
|
|
bg_state_cfg.write(bg_state_fh)
|
|
if True:
|
|
old_evo = json.loads(Path('bg_data_evo').joinpath(
|
|
mkjsonfn(old_evo_id)).read_bytes())
|
|
old_cons: List[PlannedFurCon] = in_order_unique(sorted([
|
|
PlannedFurCon.from_json(x['data']) for x in old_evo['dump'].values()]))
|
|
old_cons_dict: Dict[str,
|
|
PlannedFurCon] = dictify_identifiables(old_cons)
|
|
new_cons: List[PlannedFurCon] = in_order_unique(get_future_cons())
|
|
new_cons_dict: Dict[str,
|
|
PlannedFurCon] = dictify_identifiables(new_cons)
|
|
old_cons_frzset: FrozenSet[str] = frozenset(
|
|
[x._identifier() for x in old_cons])
|
|
new_cons_frzset: FrozenSet[str] = frozenset(
|
|
[x._identifier() for x in new_cons])
|
|
protected_old_cons: List[PlannedFurCon] = [x for x in old_cons if (not x.tba_status) and (
|
|
(datetime.datetime.combine(x.date_start, midnight) - now) < datetime.timedelta(days=5)) and (
|
|
(now - datetime.datetime.combine(x.date_start, midnight)) < datetime.timedelta(seconds=minimum_cooldown))]
|
|
protected_old_cons_frzset: FrozenSet[str] = frozenset(
|
|
[x._identifier() for x in protected_old_cons])
|
|
dropped_cons_frzset = old_cons_frzset.difference(
|
|
new_cons_frzset).difference(protected_old_cons_frzset)
|
|
created_cons_frzset = new_cons_frzset.difference(
|
|
old_cons_frzset).difference(protected_old_cons_frzset)
|
|
possibly_changed_frzset = old_cons_frzset.intersection(
|
|
new_cons_frzset).difference(protected_old_cons_frzset)
|
|
changed_frzset = frozenset(
|
|
[x for x in possibly_changed_frzset if old_cons_dict[x] != new_cons_dict[x]])
|
|
if any(map(len, [changed_frzset, dropped_cons_frzset, created_cons_frzset])):
|
|
new_evo_id += 1
|
|
bg_state_cfg['sync']['evolution'] = str(new_evo_id)
|
|
new_dump = old_evo['dump'].copy()
|
|
for dropped_con in [*dropped_cons_frzset, *changed_frzset]:
|
|
new_dump.pop(dropped_con)
|
|
for added_con in [*changed_frzset, *created_cons_frzset]:
|
|
new_dump[added_con] = dict(data=new_cons_dict[added_con].to_json(),
|
|
timestamp=now_iso)
|
|
update = dict(added=sorted(list(created_cons_frzset)), changed=sorted(list(changed_frzset)),
|
|
deleted=sorted(list(dropped_cons_frzset)), dump=new_dump, timestamp=now_iso,)
|
|
bg_data_evo_path.joinpath(mkjsonfn(new_evo_id)).write_text(
|
|
json.dumps(update, indent=1, sort_keys=True))
|
|
with bg_state_path.open('wt') as bg_state_fh:
|
|
bg_state_cfg.write(bg_state_fh)
|
|
latest_evo_values: List[Tuple[PlannedFurCon, datetime.datetime]] = [
|
|
(PlannedFurCon.from_json(x['data']),
|
|
datetime.datetime.fromisoformat(x['timestamp'])) for x in (
|
|
json.loads(Path('bg_data_evo').joinpath(
|
|
mkjsonfn(new_evo_id)).read_bytes())['dump'].values()
|
|
)
|
|
]
|
|
old_snt = json.loads(bg_data_snt_path.joinpath(
|
|
mkjsonfn(old_snt_id)).read_text())
|
|
old_snt_percentages: Dict[str, int] = old_snt['percentages']
|
|
del old_snt
|
|
valid_evos: List[FurConProgress] = sorted([x for x in [
|
|
FurConProgress(
|
|
end=end,
|
|
stt=start,
|
|
pct=100 if start >= end or now >= end else math.floor(
|
|
int(100*min(1, max(0, (now - start)/(end - start))))),
|
|
ctr=pfc2.country_iso3166_a3,
|
|
enm=pfc2._identifier()
|
|
)
|
|
for (pfc2, end, start) in [
|
|
(
|
|
pfc,
|
|
datetime.datetime.combine(pfc.date_start, midnight),
|
|
ts
|
|
)
|
|
for (pfc, ts) in latest_evo_values
|
|
if not pfc.tba_status
|
|
]
|
|
] if x.pct != old_snt_percentages.get(x.enm)])
|
|
latest_evo_values_len = len(latest_evo_values)
|
|
del latest_evo_values
|
|
if len(valid_evos) < max(0, min(5, latest_evo_values_len)):
|
|
return minimum_cooldown
|
|
selected_evos = valid_evos[:5]
|
|
if (sevoslen := len(selected_evos)):
|
|
new_snt_id = old_snt_id+1
|
|
bg_state_cfg['sync']['sent'] = str(new_snt_id)
|
|
new_snt_percentages = old_snt_percentages.copy()
|
|
im = PIL.Image.new('RGB', (1024, 512),
|
|
color=(236, 239, 241))
|
|
draw = PIL.ImageDraw.Draw(im)
|
|
tweet = ''
|
|
for i, valid_evo in enumerate(selected_evos):
|
|
ct = valid_evo.ctr
|
|
nm = valid_evo.enm
|
|
pc = valid_evo.pct
|
|
subtweet = f'FurCon [{ct.upper()}] {nm} has been {pc}% awaited\n'
|
|
if len((tweet+subtweet).encode('utf-8')) > 280:
|
|
selected_evos = selected_evos[:i]
|
|
sevoslen = i
|
|
break
|
|
else:
|
|
tweet += subtweet
|
|
for i in range(5):
|
|
if i < sevoslen:
|
|
valid_evo = selected_evos[i]
|
|
i += (5 - sevoslen) / 2
|
|
ct = valid_evo.ctr
|
|
nm = valid_evo.enm
|
|
pc = valid_evo.pct
|
|
st = f'{str(valid_evo.end.date())}'
|
|
if pc < 100:
|
|
dtt = ''
|
|
dt = valid_evo.end - now
|
|
dts = dt.total_seconds()
|
|
if (dd := math.floor(dts / (24*3600))) > 0:
|
|
dtt = f'{dd+1} days'
|
|
elif (hh := math.ceil(dts / 3600)) > 0:
|
|
dtt = f'{hh} hour' + ('s' if hh > 1 else '')
|
|
if dtt:
|
|
st = f'{dtt} until {str(valid_evo.end.date())}'
|
|
new_snt_percentages[nm] = pc
|
|
draw.text((8, round(102.4*i)+4), f'[{ct.upper()}] {nm}', font=IMAGE_FONT,
|
|
fill=(38, 50, 56), align='left')
|
|
ssx = IMAGE_FONT.getsize(st)[0]
|
|
draw.text((1024-8-ssx, round(102.4*i)+4), st, font=IMAGE_FONT,
|
|
fill=(38, 50, 56), align='left')
|
|
draw.rectangle(((6, round(102.4*i)+32), (1018, round(102.4*(i+1))-6)),
|
|
fill=(55, 71, 79))
|
|
draw.rectangle(((8, round(102.4*i)+34), (1016, round(102.4*(i+1))-8)),
|
|
fill=(176, 190, 197))
|
|
draw.rectangle(((8, round(102.4*i)+34), (1016, round(102.4*(i+1))-8)),
|
|
fill=(176, 190, 197))
|
|
draw.rectangle(((8, round(102.4*i)+34), (int(8+1008*(pc/100)), round(102.4*(i+1))-8)),
|
|
fill=(85, 139, 47))
|
|
sx, sy = IMAGE_FONT2.getsize(f'{pc}%', stroke_width=2)
|
|
draw.text((512-(sx//2), round(102.4*i)+62-(sy//2)), f'{pc}%', font=IMAGE_FONT2,
|
|
fill=(38, 50, 56), align='left', stroke_width=2,
|
|
stroke_fill=(236, 239, 241))
|
|
bio = BytesIO()
|
|
im.save(bio, format='png')
|
|
bim = bio.getvalue()
|
|
if DEBUG:
|
|
Path('tweet.png').write_bytes(bim)
|
|
Path('tweet.txt').write_text(tweet)
|
|
else:
|
|
with TempFile() as path:
|
|
path.write_bytes(bim)
|
|
media = api.media_upload(filename=str(path))
|
|
status = api.update_status(
|
|
status=tweet, media_ids=[media.media_id])
|
|
telebot.send_photo(
|
|
chat_id=keys['telegram_furconprogress']['chat_id'],
|
|
photo=bim,
|
|
caption=tweet,
|
|
)
|
|
update = dict(percentages=new_snt_percentages, timestamp=now_iso)
|
|
bg_data_snt_path.joinpath(mkjsonfn(new_snt_id)).write_text(
|
|
json.dumps(update, indent=1, sort_keys=True))
|
|
if DEBUG:
|
|
return 1.0
|
|
with bg_state_path.open('wt') as bg_state_fh:
|
|
bg_state_cfg.write(bg_state_fh)
|
|
return minimum_cooldown
|
|
|
|
|
|
def main():
|
|
if DEBUG:
|
|
main_non_sleeper()
|
|
else:
|
|
while True:
|
|
time.sleep(main_non_sleeper())
|
|
|
|
|
|
def safe_main():
|
|
try:
|
|
main()
|
|
except Exception as e:
|
|
trace = ''.join(traceback.format_exception(None, e, e.__traceback__))
|
|
smtp_server = None
|
|
smtp_working = False
|
|
try:
|
|
smtp_server = smtplib.SMTP('localhost', 25)
|
|
smtp_server.noop()
|
|
smtp_working = True
|
|
except Exception:
|
|
raise e
|
|
if smtp_working:
|
|
smtp_server.sendmail(
|
|
'http@localhost', 'root@localhost',
|
|
'Subject:[Fix me] Python backtrace\n' +
|
|
trace
|
|
)
|
|
time.sleep(3*3600)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
safe_main()
|