import json import logging import shutil import subprocess import time import unicodedata import requests from concurrent.futures import ProcessPoolExecutor as PoolExecutor from pathlib import Path from typing import Dict, List, Optional, Tuple from uuid import uuid4 from . import models SEVERITY_TABLE = { 'critical': logging.CRITICAL, 'fatal': logging.FATAL, 'error': logging.ERROR, 'warning': logging.WARNING, 'warn': logging.WARN, 'info': logging.INFO, 'debug': logging.DEBUG, 'notset': logging.NOTSET, } ACHECKER_TABLE = { 'violation': logging.FATAL, 'potentialviolation': logging.ERROR, 'recommendation': logging.WARNING, 'potentialrecommendation': logging.INFO, 'manual': logging.DEBUG, 'pass': logging.NOTSET } OVER_THE_TOP_SEVERITY = max(max(SEVERITY_TABLE.values()), max(ACHECKER_TABLE.values())) + 10 def remove_accents(input_str: str) -> str: nfkd_form = unicodedata.normalize( 'NFKD', input_str).replace('“', '"').replace('”', '"').replace("‘", "'").replace("’", "'") return "".join([c for c in nfkd_form if not unicodedata.combining(c)]) def remove_stuff_inside_quotes(s): l: List[str] = list() m = True for c in remove_accents(s).lower(): if c in ('"', "'"): m = not m l.append(c) elif m: l.append(c) return ''.join(l) class FakeFilePath: def __init__(self, contents): self._contents = contents def read_text(self): return self._contents def read_bytes(self): return self._contents def run_check_links(url: str) -> Tuple[int, List[str], List[dict], List[dict]]: rq = requests.get('http://localhost:64161/', params=dict(url=url)) j = rq.json() if rq.status_code != 200: raise ValueError('\r\n'.join( list(map(lambda x: x['stack'], j['errors'])) )) return (j['element_count'], j['links'], j['results'], j['vnu_report']) class AccessibilityCheckerServiceBackgroundChecks: def __call__(self, url): print(f'Reaching {url=}') element_count, links, achecker_results, ran_vnu = run_check_links(url) return element_count, links, achecker_results, ran_vnu class AccessibilityCheckerService: def __init__(self): self._pec = 0 self._c1 = dict() self._c2 = dict() self._c3 = dict() def assess_many(self, urls: List[str], recursion_depth: int) -> models.RequestedReport: requested_report = models.RequestedReport(depth=recursion_depth) requested_report.save() visited: Dict[str, models.PageReport] = dict() print('ENTERING POOL EXECUTOR') with PoolExecutor(8) as pe: print('ENTERED POOL EXECUTOR') for url in urls: self.assess( url, recursion_depth, requested_report, visited, None, pe ) while self._pec > 0: time.sleep(0.1) print('EXITING POOL EXECUTOR') print('EXITED POOL EXECUTOR') return requested_report def assess( self, url: str, recursion_depth: int, requested_report: models.RequestedReport, visited_urls_reports: Dict[str, models.PageReport], parent_page_report: Optional[models.PageReport], pe: PoolExecutor, ): if recursion_depth < 0: return if url in visited_urls_reports: return class DoAfterBackground: def __init__(self, acs: AccessibilityCheckerService, url: str, recursion_depth: int, requested_report: models.RequestedReport, visited_urls_reports: Dict[str, models.PageReport], parent_page_report: Optional[models.PageReport], pe: PoolExecutor, retry: int ): self.acs = acs self.url = url self.recursion_depth = recursion_depth self.requested_report = requested_report self.visited_urls_reports = visited_urls_reports self.parent_page_report = parent_page_report self.pe = pe self.retry = retry def __call__(self, fut): self.retry -= 1 pe = self.pe url = self.url recursion_depth = self.recursion_depth requested_report = self.requested_report visited_urls_reports = self.visited_urls_reports parent_page_report = self.parent_page_report element_count, links, achecker_results, ran_vnu = [None] * 4 try: element_count, links, achecker_results, ran_vnu = fut.result() except subprocess.CalledProcessError as e: retry = self.retry if retry > 0: print(e) print(f'Retrying {url=} ({retry=})') self.submit() else: print(f'Ignoring {url=} ({retry=})') self.acs._pec -= 1 return print(f'Reached {url=}') web_url = models.WebURL.get_or_create(url) page_report = models.PageReport( url=web_url, request=requested_report, parent=parent_page_report, found_urls='\r\n'.join(links), element_count=element_count, remaining_depth=recursion_depth, ) page_report.save() page_report_lines = list() if True: rlg_iac = self.acs.get_report_line_generator( 'achecker') rlsoc_a = self.acs.get_report_line_subject_of_concern( 'wcag') for wcag_consideration in achecker_results: severity = wcag_consideration['value'][0].lower() severity_int = ACHECKER_TABLE.get(severity, OVER_THE_TOP_SEVERITY) line = models.PageReportLine( report=page_report, generator=rlg_iac, pattern=self.acs.get_report_line_issue_pattern( wcag_consideration['ruleId']), message=wcag_consideration['message'], snippet=wcag_consideration['snippet'], location_aria=wcag_consideration['path']['aria'], location_xpath=wcag_consideration['path']['dom'], location_css=None, line_start=None, line_end=None, column_start=None, column_end=None, highlight_start=None, highlight_end=None, passed=wcag_consideration['level'] == 'pass', severity=severity, severity_int=severity_int, subject_of_concern=rlsoc_a ) page_report_lines.append(line) del wcag_consideration if True: rlg_vnu = self.acs.get_report_line_generator('vnu') rlsoc_s = self.acs.get_report_line_subject_of_concern( 'syntax') for syntax_consideration in ran_vnu: severity = syntax_consideration.get( 'subType', syntax_consideration['type']).lower() severity_int = SEVERITY_TABLE.get(severity, OVER_THE_TOP_SEVERITY) line = models.PageReportLine( report=page_report, generator=rlg_vnu, pattern=self.acs.get_report_line_issue_pattern( remove_stuff_inside_quotes( syntax_consideration['message'] ) ), message=syntax_consideration.get('message', ''), snippet=syntax_consideration.get('extract', ''), location_aria=None, location_xpath=None, location_css=None, line_start=( syntax_consideration.get('line') if 'lastLine' not in syntax_consideration else syntax_consideration['lastLine'] + 1 - len(syntax_consideration.get( 'extract', '').splitlines()) ), line_end=syntax_consideration.get( 'lastLine', syntax_consideration.get('line') ), column_start=syntax_consideration.get( 'firstColumn'), column_end=syntax_consideration.get('lastColumn'), highlight_start=syntax_consideration.get( 'hiliteStart'), highlight_end=None if ('hiliteStart' not in syntax_consideration or 'hiliteLength' not in syntax_consideration) else syntax_consideration['hiliteStart'] + syntax_consideration['hiliteLength'], passed=severity != 'error', severity=severity, severity_int=severity_int, subject_of_concern=rlsoc_s ) page_report_lines.append(line) del syntax_consideration models.PageReportLine.objects.bulk_create( page_report_lines, batch_size=1 << 15 ) visited_urls_reports[url] = page_report url_part = url.split('/')[2] for link in links: link_part = link.split('/')[2] if url_part == link_part: self.acs.assess( link, recursion_depth - 1, requested_report, visited_urls_reports, page_report, pe, ) self.acs._pec -= 1 def submit(self): self.acs._pec += 1 pe.submit( AccessibilityCheckerServiceBackgroundChecks(), url ).add_done_callback( self ) DoAfterBackground( self, url, recursion_depth, requested_report, visited_urls_reports, parent_page_report, pe, 5, ).submit() def get_report_line_issue_pattern(self, name: str) -> models.ReportLineIssuePattern: if name not in self._c1: self._c1[name] = models.ReportLineIssuePattern.get_or_create(name) return self._c1[name] def get_report_line_generator(self, name: str) -> models.ReportLineGenerator: if name not in self._c2: self._c2[name] = models.ReportLineGenerator.get_or_create(name) return self._c2[name] def get_report_line_subject_of_concern(self, name: str) -> models.ReportLineSubjectOfConcern: if name not in self._c3: self._c3[name] = models.ReportLineSubjectOfConcern.get_or_create( name) return self._c3[name]