furmeet_events_self/timetable_parser/table.py

384 lines
16 KiB
Python

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import datetime
import re
from typing import Dict, List, Optional, Tuple
MONTHS: Dict[str, int] = {
'enero': 1,
'janeiro': 1,
'january': 1,
'febrero': 2,
'fevereiro': 2,
'feburary': 2,
'marzo': 3,
'março': 3,
'marco': 3,
'march': 3,
'abril': 4,
'april': 4,
'mayo': 5,
'maio': 5,
'may': 5,
'junio': 6,
'juño': 6,
'juno': 6,
'junho': 6,
'june': 6,
'julio': 7,
'julho': 7,
'july': 7,
'agosto': 8,
'august': 8,
'septiembre': 9,
'setembro': 9,
'september': 9,
'octubre': 10,
'outubro': 10,
'october': 10,
'noviembre': 11,
'novembro': 11,
'november': 11,
'diciembre': 12,
'dezembro': 12,
'december': 12,
}
RGX_HOUR_AND_MINUTES = re.compile(
r'(\d{1,2}):(\d{1,2})(?::(\d{1,2}))?(?:\s*(AM|PM))?',
re.IGNORECASE
)
RGX_DAY_OF_MONTH = re.compile(
r'\s+?(0*(1(?:\d|)|2(?:\d|)|3(?:0|1|)|4|5|6|7|8|9))\s+?'
)
class Table:
def __init__(self, matrix: List[List[Tuple[str, str, str]]]):
self._matrix = matrix
if not isinstance(matrix, list):
raise ValueError('Table root container must be list')
else:
for line in matrix:
if not isinstance(line, list):
raise ValueError('Table line container must be list')
else:
for i, cell in enumerate(line):
if not isinstance(cell, tuple):
raise ValueError(
'Table cell container must be tuple')
elif len(cell) != 3:
raise ValueError(
'Table cell container must have 3 elements')
else:
text, tooltip, lang = cell
if not isinstance(text, str):
raise ValueError(
'Table cell container 1st element must be str')
if not isinstance(tooltip, str):
raise ValueError(
'Table cell container 2nd element must be str')
if not isinstance(lang, str):
raise ValueError(
'Table cell container 2nd element must be str')
line[i] = (
text.strip(), tooltip.strip(), lang.strip())
if len(matrix) < 2:
raise ValueError(
'Table should have at least 2 lines')
if len(matrix[0]) < 2:
raise ValueError(
'Table should have at least 2 columns')
if not all(map(lambda a: a == len(matrix[0]), map(len, matrix))):
raise ValueError(
'Table should have all lines with the same size')
def clone(self) -> 'Table':
return type(self)([[x for x in y] for y in self._matrix])
@classmethod
def merge_from(cls, tables: List['Table']) -> 'Table':
if not isinstance(tables, list):
raise ValueError('tables argument must be list')
if len(tables) <= 0:
raise ValueError('tables argument must not be empty')
if not all(map(lambda table: isinstance(table, Table), tables)):
raise ValueError('tables argument must be list of Table objects')
table, *tables = list(map(cls.clone, tables))
while len(tables) > 0:
next_table, *tables = tables
table.merge_with(next_table)
return table
def merge_with(self, other: 'Table') -> 'Table':
if not isinstance(other, Table):
raise ValueError('other argument bust be a Table object')
for cl in other.column_labels():
self.add_column(cl)
for labeled_data_line in other.labeled_data_lines():
self.append_labeled_data_line(labeled_data_line)
return self
def column_labels(self) -> List[str]:
return list(map(lambda a: a[0], self._matrix[0]))
def column_label_at(self, index: int) -> str:
return self.column_labels()[index]
def add_column(self, name: str) -> 'Table':
if name not in self.column_labels():
self._matrix[0].append((name, '', ''))
for mln in self._matrix[1:]:
mln.append(('', '', ''))
return self
def labeled_data_lines(self) -> List[List[Tuple[str, Tuple[str, str, str]]]]:
lines = list()
for mln in self._matrix[1:]:
line = list()
for idx, mcl in enumerate(mln):
line.append((self.column_label_at(idx), mcl))
lines.append(line)
return lines
def append_labeled_data_line(self, labeled_data_line: List[Tuple[str, Tuple[str, str, str]]]) -> 'Table':
column_labels = self.column_labels()
new_data_line = [('', '', '') for _ in column_labels]
for label, data in labeled_data_line:
index = column_labels.index(label)
new_data_line[index] = data
self._matrix.append(new_data_line)
return self
def line_labels(self) -> List[str]:
return [x[0][0] for x in self._matrix[1:]]
def replace_line_labels(self, line_labels: List[str]) -> 'Table':
if len(line_labels) != len(self.line_labels()):
raise ValueError('line labels must not differ in size')
for i, l in enumerate(line_labels):
self._matrix[i+1][0] = (l, '', '')
return self
def __repr__(self):
return f'{type(self).__name__}({self._matrix})'
def interpret_lang(self, langs: List[str], lang: str, index: Optional[int]) -> 'Table':
lang_lower: str = lang.lower()
langs_lower: List[str] = list(map(str.lower, langs))
di = langs_lower.index(lang_lower)
for i, mln in enumerate(self._matrix):
for j, _ in enumerate(mln):
if i == 0 or j == 0:
self._matrix[i][j] = (
self._matrix[i][j][0],
'',
''
)
else:
lower_string_alpha = ''.join(
[x if x.isalpha() else ' ' for x in self._matrix[i][j][0]]
).lower()
lower_words_alpha = list(filter(
len,
lower_string_alpha.split(' ')
))
if index is None:
for k, ll in enumerate(langs_lower):
if ll in lower_words_alpha:
self._matrix[i][j] = (
self._matrix[i][j][0],
self._matrix[i][j][1],
langs[k]
)
break
if self._matrix[i][j][2] == '':
self._matrix[i][j] = (
self._matrix[i][j][0],
self._matrix[i][j][1],
langs[di]
)
elif index == 0:
self._matrix[i][j] = (
self._matrix[i][j][0],
self._matrix[i][j][1],
langs[di]
)
elif index == 1:
lc = (lower_words_alpha + [''])[0]
xi = langs_lower.index(lc) if lc in langs_lower else di
txt = self._matrix[i][j][0]
if lc in langs_lower:
txt = txt[
lower_string_alpha.index(lc) + len(lc):
].strip()
while len(txt) > 0 and not txt[0].isalnum():
txt = txt[1:]
self._matrix[i][j] = (
txt,
self._matrix[i][j][1],
langs[xi]
)
elif index == -1:
lc = ([''] + lower_words_alpha)[-1]
xi = langs_lower.index(lc) if lc in langs_lower else di
txt = self._matrix[i][j][0]
if lc in langs_lower:
txt = txt[
lower_string_alpha.rindex(lc):
].strip()
while len(txt) > 0 and not txt[-1].isalnum():
txt = txt[:-1]
self._matrix[i][j] = (
txt,
self._matrix[i][j][1],
langs[xi]
)
else:
raise ValueError(
'Index for intepreting language is out of bounds')
return self
def _interpret_line_labels_as_datetimes(self, start_date: Tuple[int, int, int]) -> List[datetime.datetime]:
line_labels = list(map(str.lower, self.line_labels()))
last_timestamp = datetime.datetime(*start_date, 0, 0)
last_pace = datetime.timedelta(hours=1)
parsed_timestamps: List[datetime.datetime] = list()
for line_label in line_labels:
remaining_line_label = ''.join(
[' '] + [x if x.isalnum() or x == ':' else ' ' for x in line_label] + [' ']
).lower()
line_hour_and_minutes = next(
RGX_HOUR_AND_MINUTES.finditer(remaining_line_label), None)
remaining_line_label = RGX_HOUR_AND_MINUTES.sub(
" ",
remaining_line_label
)
this_timestamp = last_timestamp + last_pace
if line_hour_and_minutes is not None:
hs, ms, ss, ds = line_hour_and_minutes.groups()
h = int(hs)
m = int(ms)
s = int(ss or 0)
if ds is not None:
if ds.lower() == 'am':
if h == 12:
h = 0
elif ds.lower() == 'pm':
if h != 12:
h += 12
this_timestamp = datetime.datetime(
last_timestamp.year,
last_timestamp.month,
last_timestamp.day,
h,
m,
s
)
while this_timestamp < last_timestamp:
this_timestamp += datetime.timedelta(days=1)
override_month_number: Optional[int] = None
for month_name, month_number in MONTHS.items():
if month_name in remaining_line_label:
override_month_number = month_number
remaining_line_label = remaining_line_label.replace(
month_name, '')
if override_month_number is not None:
this_timestamp = datetime.datetime(this_timestamp.year,
override_month_number,
this_timestamp.day,
this_timestamp.hour,
this_timestamp.minute,
this_timestamp.second,
)
override_day_number = next(
RGX_DAY_OF_MONTH.finditer(remaining_line_label), None)
remaining_line_label = RGX_DAY_OF_MONTH.sub(
" ", remaining_line_label)
if override_day_number is not None:
ds = override_day_number.group(1)
d = int(ds)
this_timestamp = datetime.datetime(this_timestamp.year,
this_timestamp.month,
d,
this_timestamp.hour,
this_timestamp.minute,
this_timestamp.second,
)
parsed_timestamps.append(this_timestamp)
last_pace = this_timestamp - last_timestamp
last_timestamp = this_timestamp
return parsed_timestamps
def interpret_line_labels_as_time_series(self, start_date: Tuple[int, int, int]) -> 'Table':
parsed_timestamps = self._interpret_line_labels_as_datetimes(
start_date
)
frequent_timedeltas = dict()
for i in range(len(parsed_timestamps) - 1):
td = parsed_timestamps[i + 1] - parsed_timestamps[i]
if td not in frequent_timedeltas:
frequent_timedeltas[td] = 0
frequent_timedeltas[td] += 1
del td
del i
default_duration: datetime.timedelta = sorted(
[(b, a) for a, b in frequent_timedeltas.items()]
)[-1][1]
interval_duration = default_duration * 4
time_ranges: List[Tuple[datetime.datetime, datetime.datetime]] = list()
for idx, parsed_timestamp in enumerate(parsed_timestamps):
timestamp_end: datetime.datetime = (
(parsed_timestamp + default_duration)
if (idx + 1) == len(parsed_timestamps) else
(parsed_timestamps[idx + 1])
)
if (timestamp_end - parsed_timestamp) > interval_duration:
timestamp_end = parsed_timestamp + default_duration
time_ranges.append((parsed_timestamp, timestamp_end))
del idx
del parsed_timestamp
del timestamp_end
self.replace_line_labels(
[f'{s.isoformat().strip()}#{e.isoformat().strip()}' for s, e in time_ranges])
return self
def get_column_sequences(self) -> List[Tuple[Tuple[datetime.datetime, datetime.datetime], str, Tuple[str, str, str]]]:
sequences: List[Tuple[Tuple[datetime.datetime,
datetime.datetime],
str,
Tuple[str,
str,
str]]] = list()
for j in range(1, len(self._matrix[0])):
start = datetime.datetime.fromisoformat(
self._matrix[1][0][0].split('#')[0])
last_parsed = ('', '', self._matrix[1][1][2])
end = datetime.datetime.fromisoformat(
self._matrix[1][0][0].split('#')[1])
for i in range(1, len(self._matrix)):
this_start = datetime.datetime.fromisoformat(
self._matrix[i][0][0].split('#')[0])
this_end = datetime.datetime.fromisoformat(
self._matrix[i][0][0].split('#')[1])
this_parsed = self._matrix[i][j]
if this_parsed == last_parsed:
end = this_end
else:
end = this_start
if len(last_parsed[0]) > 0:
sequences.append(
((start, end), self.column_label_at(j), last_parsed)
)
start = this_start
last_parsed = this_parsed
if len(last_parsed[0]) > 0:
sequences.append((
(start, end),
self.column_label_at(j),
last_parsed
))
return sorted(sequences, key=lambda e: (e[0][0].timestamp(), e[0][1].timestamp(), e[1], e[2][0], e[2][1], e[2][2]))