#!/usr/bin/env python3 # -*- encoding: utf-8 -*- import datetime import re from typing import Dict, List, Optional, Tuple MONTHS: Dict[str, int] = { 'enero': 1, 'janeiro': 1, 'january': 1, 'febrero': 2, 'fevereiro': 2, 'feburary': 2, 'marzo': 3, 'março': 3, 'marco': 3, 'march': 3, 'abril': 4, 'april': 4, 'mayo': 5, 'maio': 5, 'may': 5, 'junio': 6, 'juño': 6, 'juno': 6, 'junho': 6, 'june': 6, 'julio': 7, 'julho': 7, 'july': 7, 'agosto': 8, 'august': 8, 'septiembre': 9, 'setembro': 9, 'september': 9, 'octubre': 10, 'outubro': 10, 'october': 10, 'noviembre': 11, 'novembro': 11, 'november': 11, 'diciembre': 12, 'dezembro': 12, 'december': 12, } RGX_HOUR_AND_MINUTES = re.compile( r'(\d{1,2}):(\d{1,2})(?::(\d{1,2}))?(?:\s*(AM|PM))?', re.IGNORECASE ) RGX_DAY_OF_MONTH = re.compile( r'\s+?(0*(1(?:\d|)|2(?:\d|)|3(?:0|1|)|4|5|6|7|8|9))\s+?' ) class Table: def __init__(self, matrix: List[List[Tuple[str, str, str]]]): self._matrix = matrix if not isinstance(matrix, list): raise ValueError('Table root container must be list') else: for line in matrix: if not isinstance(line, list): raise ValueError('Table line container must be list') else: for i, cell in enumerate(line): if not isinstance(cell, tuple): raise ValueError( 'Table cell container must be tuple') elif len(cell) != 3: raise ValueError( 'Table cell container must have 3 elements') else: text, tooltip, lang = cell if not isinstance(text, str): raise ValueError( 'Table cell container 1st element must be str') if not isinstance(tooltip, str): raise ValueError( 'Table cell container 2nd element must be str') if not isinstance(lang, str): raise ValueError( 'Table cell container 2nd element must be str') line[i] = ( text.strip(), tooltip.strip(), lang.strip()) if len(matrix) < 2: raise ValueError( 'Table should have at least 2 lines') if len(matrix[0]) < 2: raise ValueError( 'Table should have at least 2 columns') if not all(map(lambda a: a == len(matrix[0]), map(len, matrix))): raise ValueError( 'Table should have all lines with the same size') def clone(self) -> 'Table': return type(self)([[x for x in y] for y in self._matrix]) @classmethod def merge_from(cls, tables: List['Table']) -> 'Table': if not isinstance(tables, list): raise ValueError('tables argument must be list') if len(tables) <= 0: raise ValueError('tables argument must not be empty') if not all(map(lambda table: isinstance(table, Table), tables)): raise ValueError('tables argument must be list of Table objects') table, *tables = list(map(cls.clone, tables)) while len(tables) > 0: next_table, *tables = tables table.merge_with(next_table) return table def merge_with(self, other: 'Table') -> 'Table': if not isinstance(other, Table): raise ValueError('other argument bust be a Table object') for cl in other.column_labels(): self.add_column(cl) for labeled_data_line in other.labeled_data_lines(): self.append_labeled_data_line(labeled_data_line) return self def column_labels(self) -> List[str]: return list(map(lambda a: a[0], self._matrix[0])) def column_label_at(self, index: int) -> str: return self.column_labels()[index] def add_column(self, name: str) -> 'Table': if name not in self.column_labels(): self._matrix[0].append((name, '', '')) for mln in self._matrix[1:]: mln.append(('', '', '')) return self def labeled_data_lines(self) -> List[List[Tuple[str, Tuple[str, str, str]]]]: lines = list() for mln in self._matrix[1:]: line = list() for idx, mcl in enumerate(mln): line.append((self.column_label_at(idx), mcl)) lines.append(line) return lines def append_labeled_data_line(self, labeled_data_line: List[Tuple[str, Tuple[str, str, str]]]) -> 'Table': column_labels = self.column_labels() new_data_line = [('', '', '') for _ in column_labels] for label, data in labeled_data_line: index = column_labels.index(label) new_data_line[index] = data self._matrix.append(new_data_line) return self def line_labels(self) -> List[str]: return [x[0][0] for x in self._matrix[1:]] def replace_line_labels(self, line_labels: List[str]) -> 'Table': if len(line_labels) != len(self.line_labels()): raise ValueError('line labels must not differ in size') for i, l in enumerate(line_labels): self._matrix[i+1][0] = (l, '', '') return self def __repr__(self): return f'{type(self).__name__}({self._matrix})' def interpret_lang(self, langs: List[str], lang: str, index: Optional[int]) -> 'Table': lang_lower: str = lang.lower() langs_lower: List[str] = list(map(str.lower, langs)) di = langs_lower.index(lang_lower) for i, mln in enumerate(self._matrix): for j, _ in enumerate(mln): if i == 0 or j == 0: self._matrix[i][j] = ( self._matrix[i][j][0], '', '' ) else: lower_string_alpha = ''.join( [x if x.isalpha() else ' ' for x in self._matrix[i][j][0]] ).lower() lower_words_alpha = list(filter( len, lower_string_alpha.split(' ') )) if index is None: for k, ll in enumerate(langs_lower): if ll in lower_words_alpha: self._matrix[i][j] = ( self._matrix[i][j][0], self._matrix[i][j][1], langs[k] ) break if self._matrix[i][j][2] == '': self._matrix[i][j] = ( self._matrix[i][j][0], self._matrix[i][j][1], langs[di] ) elif index == 0: self._matrix[i][j] = ( self._matrix[i][j][0], self._matrix[i][j][1], langs[di] ) elif index == 1: lc = (lower_words_alpha + [''])[0] xi = langs_lower.index(lc) if lc in langs_lower else di txt = self._matrix[i][j][0] if lc in langs_lower: txt = txt[ lower_string_alpha.index(lc) + len(lc): ].strip() while len(txt) > 0 and not txt[0].isalnum(): txt = txt[1:] self._matrix[i][j] = ( txt, self._matrix[i][j][1], langs[xi] ) elif index == -1: lc = ([''] + lower_words_alpha)[-1] xi = langs_lower.index(lc) if lc in langs_lower else di txt = self._matrix[i][j][0] if lc in langs_lower: txt = txt[ lower_string_alpha.rindex(lc): ].strip() while len(txt) > 0 and not txt[-1].isalnum(): txt = txt[:-1] self._matrix[i][j] = ( txt, self._matrix[i][j][1], langs[xi] ) else: raise ValueError( 'Index for intepreting language is out of bounds') return self def _interpret_line_labels_as_datetimes(self, start_date: Tuple[int, int, int]) -> List[datetime.datetime]: line_labels = list(map(str.lower, self.line_labels())) last_timestamp = datetime.datetime(*start_date, 0, 0) last_pace = datetime.timedelta(hours=1) parsed_timestamps: List[datetime.datetime] = list() for line_label in line_labels: remaining_line_label = ''.join( [' '] + [x if x.isalnum() or x == ':' else ' ' for x in line_label] + [' '] ).lower() line_hour_and_minutes = next( RGX_HOUR_AND_MINUTES.finditer(remaining_line_label), None) remaining_line_label = RGX_HOUR_AND_MINUTES.sub( " ", remaining_line_label ) this_timestamp = last_timestamp + last_pace if line_hour_and_minutes is not None: hs, ms, ss, ds = line_hour_and_minutes.groups() h = int(hs) m = int(ms) s = int(ss or 0) if ds is not None: if ds.lower() == 'am': if h == 12: h = 0 elif ds.lower() == 'pm': if h != 12: h += 12 this_timestamp = datetime.datetime( last_timestamp.year, last_timestamp.month, last_timestamp.day, h, m, s ) while this_timestamp < last_timestamp: this_timestamp += datetime.timedelta(days=1) override_month_number: Optional[int] = None for month_name, month_number in MONTHS.items(): if month_name in remaining_line_label: override_month_number = month_number remaining_line_label = remaining_line_label.replace( month_name, '') if override_month_number is not None: this_timestamp = datetime.datetime(this_timestamp.year, override_month_number, this_timestamp.day, this_timestamp.hour, this_timestamp.minute, this_timestamp.second, ) override_day_number = next( RGX_DAY_OF_MONTH.finditer(remaining_line_label), None) remaining_line_label = RGX_DAY_OF_MONTH.sub( " ", remaining_line_label) if override_day_number is not None: ds = override_day_number.group(1) d = int(ds) this_timestamp = datetime.datetime(this_timestamp.year, this_timestamp.month, d, this_timestamp.hour, this_timestamp.minute, this_timestamp.second, ) parsed_timestamps.append(this_timestamp) last_pace = this_timestamp - last_timestamp last_timestamp = this_timestamp return parsed_timestamps def interpret_line_labels_as_time_series(self, start_date: Tuple[int, int, int]) -> 'Table': parsed_timestamps = self._interpret_line_labels_as_datetimes( start_date ) frequent_timedeltas = dict() for i in range(len(parsed_timestamps) - 1): td = parsed_timestamps[i + 1] - parsed_timestamps[i] if td not in frequent_timedeltas: frequent_timedeltas[td] = 0 frequent_timedeltas[td] += 1 del td del i default_duration: datetime.timedelta = sorted( [(b, a) for a, b in frequent_timedeltas.items()] )[-1][1] interval_duration = default_duration * 4 time_ranges: List[Tuple[datetime.datetime, datetime.datetime]] = list() for idx, parsed_timestamp in enumerate(parsed_timestamps): timestamp_end: datetime.datetime = ( (parsed_timestamp + default_duration) if (idx + 1) == len(parsed_timestamps) else (parsed_timestamps[idx + 1]) ) if (timestamp_end - parsed_timestamp) > interval_duration: timestamp_end = parsed_timestamp + default_duration time_ranges.append((parsed_timestamp, timestamp_end)) del idx del parsed_timestamp del timestamp_end self.replace_line_labels( [f'{s.isoformat().strip()}#{e.isoformat().strip()}' for s, e in time_ranges]) return self def get_column_sequences(self) -> List[Tuple[Tuple[datetime.datetime, datetime.datetime], str, Tuple[str, str, str]]]: sequences: List[Tuple[Tuple[datetime.datetime, datetime.datetime], str, Tuple[str, str, str]]] = list() for j in range(1, len(self._matrix[0])): start = datetime.datetime.fromisoformat( self._matrix[1][0][0].split('#')[0]) last_parsed = ('', '', self._matrix[1][1][2]) end = datetime.datetime.fromisoformat( self._matrix[1][0][0].split('#')[1]) for i in range(1, len(self._matrix)): this_start = datetime.datetime.fromisoformat( self._matrix[i][0][0].split('#')[0]) this_end = datetime.datetime.fromisoformat( self._matrix[i][0][0].split('#')[1]) this_parsed = self._matrix[i][j] if this_parsed == last_parsed: end = this_end else: end = this_start if len(last_parsed[0]) > 0: sequences.append( ((start, end), self.column_label_at(j), last_parsed) ) start = this_start last_parsed = this_parsed if len(last_parsed[0]) > 0: sequences.append(( (start, end), self.column_label_at(j), last_parsed )) return sorted(sequences, key=lambda e: (e[0][0].timestamp(), e[0][1].timestamp(), e[1], e[2][0], e[2][1], e[2][2]))