316 lines
12 KiB
Python
316 lines
12 KiB
Python
import json
|
||
import logging
|
||
import shutil
|
||
import subprocess
|
||
import time
|
||
import unicodedata
|
||
import requests
|
||
from concurrent.futures import ProcessPoolExecutor as PoolExecutor
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Tuple
|
||
from uuid import uuid4
|
||
|
||
from . import models
|
||
|
||
SEVERITY_TABLE = {
|
||
'critical': logging.CRITICAL,
|
||
'fatal': logging.FATAL,
|
||
'error': logging.ERROR,
|
||
'warning': logging.WARNING,
|
||
'warn': logging.WARN,
|
||
'info': logging.INFO,
|
||
'debug': logging.DEBUG,
|
||
'notset': logging.NOTSET,
|
||
}
|
||
|
||
ACHECKER_TABLE = {
|
||
'violation': logging.FATAL,
|
||
'potentialviolation': logging.ERROR,
|
||
'recommendation': logging.WARNING,
|
||
'potentialrecommendation': logging.INFO,
|
||
'manual': logging.DEBUG,
|
||
'pass': logging.NOTSET
|
||
}
|
||
|
||
OVER_THE_TOP_SEVERITY = max(max(SEVERITY_TABLE.values()),
|
||
max(ACHECKER_TABLE.values())) + 10
|
||
|
||
|
||
def remove_accents(input_str: str) -> str:
|
||
nfkd_form = unicodedata.normalize(
|
||
'NFKD', input_str).replace('“', '"').replace('”', '"').replace("‘", "'").replace("’", "'")
|
||
return "".join([c for c in nfkd_form if not unicodedata.combining(c)])
|
||
|
||
|
||
def remove_stuff_inside_quotes(s):
|
||
l: List[str] = list()
|
||
m = True
|
||
for c in remove_accents(s).lower():
|
||
if c in ('"', "'"):
|
||
m = not m
|
||
l.append(c)
|
||
elif m:
|
||
l.append(c)
|
||
return ''.join(l)
|
||
|
||
|
||
class FakeFilePath:
|
||
def __init__(self, contents):
|
||
self._contents = contents
|
||
|
||
def read_text(self):
|
||
return self._contents
|
||
|
||
def read_bytes(self):
|
||
return self._contents
|
||
|
||
|
||
def run_check_links(url: str) -> Tuple[int, List[str], List[dict], List[dict]]:
|
||
rq = requests.get('http://localhost:64161/', params=dict(url=url))
|
||
j = rq.json()
|
||
if rq.status_code != 200:
|
||
raise ValueError('\r\n'.join(
|
||
list(map(lambda x: x['stack'], j['errors']))
|
||
))
|
||
return (j['element_count'], j['links'], j['results'], j['vnu_report'])
|
||
|
||
|
||
class AccessibilityCheckerServiceBackgroundChecks:
|
||
def __call__(self, url):
|
||
print(f'Reaching {url=}')
|
||
element_count, links, achecker_results, ran_vnu = run_check_links(url)
|
||
return element_count, links, achecker_results, ran_vnu
|
||
|
||
|
||
class AccessibilityCheckerService:
|
||
def __init__(self):
|
||
self._pec = 0
|
||
self._c1 = dict()
|
||
self._c2 = dict()
|
||
self._c3 = dict()
|
||
|
||
def assess_many(self, urls: List[str], recursion_depth: int) -> models.RequestedReport:
|
||
requested_report = models.RequestedReport(depth=recursion_depth)
|
||
requested_report.save()
|
||
visited: Dict[str, models.PageReport] = dict()
|
||
print('ENTERING POOL EXECUTOR')
|
||
with PoolExecutor(8) as pe:
|
||
print('ENTERED POOL EXECUTOR')
|
||
for url in urls:
|
||
self.assess(
|
||
url,
|
||
recursion_depth,
|
||
requested_report,
|
||
visited,
|
||
None,
|
||
pe
|
||
)
|
||
while self._pec > 0:
|
||
time.sleep(0.1)
|
||
print('EXITING POOL EXECUTOR')
|
||
print('EXITED POOL EXECUTOR')
|
||
return requested_report
|
||
|
||
def assess(
|
||
self,
|
||
url: str,
|
||
recursion_depth: int,
|
||
requested_report: models.RequestedReport,
|
||
visited_urls_reports: Dict[str, models.PageReport],
|
||
parent_page_report: Optional[models.PageReport],
|
||
pe: PoolExecutor,
|
||
):
|
||
if recursion_depth < 0:
|
||
return
|
||
if url in visited_urls_reports:
|
||
return
|
||
|
||
class DoAfterBackground:
|
||
def __init__(self,
|
||
acs: AccessibilityCheckerService,
|
||
url: str,
|
||
recursion_depth: int,
|
||
requested_report: models.RequestedReport,
|
||
visited_urls_reports: Dict[str, models.PageReport],
|
||
parent_page_report: Optional[models.PageReport],
|
||
pe: PoolExecutor,
|
||
retry: int
|
||
):
|
||
self.acs = acs
|
||
self.url = url
|
||
self.recursion_depth = recursion_depth
|
||
self.requested_report = requested_report
|
||
self.visited_urls_reports = visited_urls_reports
|
||
self.parent_page_report = parent_page_report
|
||
self.pe = pe
|
||
self.retry = retry
|
||
|
||
def __call__(self, fut):
|
||
self.retry -= 1
|
||
pe = self.pe
|
||
url = self.url
|
||
recursion_depth = self.recursion_depth
|
||
requested_report = self.requested_report
|
||
visited_urls_reports = self.visited_urls_reports
|
||
parent_page_report = self.parent_page_report
|
||
element_count, links, achecker_results, ran_vnu = [None] * 4
|
||
try:
|
||
element_count, links, achecker_results, ran_vnu = fut.result()
|
||
except subprocess.CalledProcessError as e:
|
||
retry = self.retry
|
||
if retry > 0:
|
||
print(e)
|
||
print(f'Retrying {url=} ({retry=})')
|
||
self.submit()
|
||
else:
|
||
print(f'Ignoring {url=} ({retry=})')
|
||
self.acs._pec -= 1
|
||
return
|
||
print(f'Reached {url=}')
|
||
web_url = models.WebURL.get_or_create(url)
|
||
page_report = models.PageReport(
|
||
url=web_url,
|
||
request=requested_report,
|
||
parent=parent_page_report,
|
||
found_urls='\r\n'.join(links),
|
||
element_count=element_count,
|
||
remaining_depth=recursion_depth,
|
||
)
|
||
page_report.save()
|
||
page_report_lines = list()
|
||
if True:
|
||
rlg_iac = self.acs.get_report_line_generator(
|
||
'achecker')
|
||
rlsoc_a = self.acs.get_report_line_subject_of_concern(
|
||
'wcag')
|
||
for wcag_consideration in achecker_results:
|
||
severity = wcag_consideration['value'][0].lower()
|
||
severity_int = ACHECKER_TABLE.get(severity,
|
||
OVER_THE_TOP_SEVERITY)
|
||
line = models.PageReportLine(
|
||
report=page_report,
|
||
generator=rlg_iac,
|
||
pattern=self.acs.get_report_line_issue_pattern(
|
||
wcag_consideration['ruleId']),
|
||
message=wcag_consideration['message'],
|
||
snippet=wcag_consideration['snippet'],
|
||
location_aria=wcag_consideration['path']['aria'],
|
||
location_xpath=wcag_consideration['path']['dom'],
|
||
location_css=None,
|
||
line_start=None,
|
||
line_end=None,
|
||
column_start=None,
|
||
column_end=None,
|
||
highlight_start=None,
|
||
highlight_end=None,
|
||
passed=wcag_consideration['level'] == 'pass',
|
||
severity=severity,
|
||
severity_int=severity_int,
|
||
subject_of_concern=rlsoc_a
|
||
)
|
||
page_report_lines.append(line)
|
||
del wcag_consideration
|
||
if True:
|
||
rlg_vnu = self.acs.get_report_line_generator('vnu')
|
||
rlsoc_s = self.acs.get_report_line_subject_of_concern(
|
||
'syntax')
|
||
for syntax_consideration in ran_vnu:
|
||
severity = syntax_consideration.get(
|
||
'subType', syntax_consideration['type']).lower()
|
||
severity_int = SEVERITY_TABLE.get(severity,
|
||
OVER_THE_TOP_SEVERITY)
|
||
line = models.PageReportLine(
|
||
report=page_report,
|
||
generator=rlg_vnu,
|
||
pattern=self.acs.get_report_line_issue_pattern(
|
||
remove_stuff_inside_quotes(
|
||
syntax_consideration['message']
|
||
)
|
||
),
|
||
message=syntax_consideration.get('message', ''),
|
||
snippet=syntax_consideration.get('extract', ''),
|
||
location_aria=None,
|
||
location_xpath=None,
|
||
location_css=None,
|
||
line_start=(
|
||
syntax_consideration.get('line') if
|
||
'lastLine' not in syntax_consideration
|
||
else syntax_consideration['lastLine'] + 1 -
|
||
len(syntax_consideration.get(
|
||
'extract', '').splitlines())
|
||
),
|
||
line_end=syntax_consideration.get(
|
||
'lastLine',
|
||
syntax_consideration.get('line')
|
||
),
|
||
column_start=syntax_consideration.get(
|
||
'firstColumn'),
|
||
column_end=syntax_consideration.get('lastColumn'),
|
||
highlight_start=syntax_consideration.get(
|
||
'hiliteStart'),
|
||
highlight_end=None if
|
||
('hiliteStart' not in syntax_consideration or
|
||
'hiliteLength' not in syntax_consideration) else
|
||
syntax_consideration['hiliteStart'] +
|
||
syntax_consideration['hiliteLength'],
|
||
passed=severity != 'error',
|
||
severity=severity,
|
||
severity_int=severity_int,
|
||
subject_of_concern=rlsoc_s
|
||
)
|
||
page_report_lines.append(line)
|
||
del syntax_consideration
|
||
models.PageReportLine.objects.bulk_create(
|
||
page_report_lines,
|
||
batch_size=1 << 15
|
||
)
|
||
visited_urls_reports[url] = page_report
|
||
url_part = url.split('/')[2]
|
||
for link in links:
|
||
link_part = link.split('/')[2]
|
||
if url_part == link_part:
|
||
self.acs.assess(
|
||
link,
|
||
recursion_depth - 1,
|
||
requested_report,
|
||
visited_urls_reports,
|
||
page_report,
|
||
pe,
|
||
)
|
||
self.acs._pec -= 1
|
||
|
||
def submit(self):
|
||
self.acs._pec += 1
|
||
pe.submit(
|
||
AccessibilityCheckerServiceBackgroundChecks(),
|
||
url
|
||
).add_done_callback(
|
||
self
|
||
)
|
||
DoAfterBackground(
|
||
self,
|
||
url,
|
||
recursion_depth,
|
||
requested_report,
|
||
visited_urls_reports,
|
||
parent_page_report,
|
||
pe,
|
||
5,
|
||
).submit()
|
||
|
||
def get_report_line_issue_pattern(self, name: str) -> models.ReportLineIssuePattern:
|
||
if name not in self._c1:
|
||
self._c1[name] = models.ReportLineIssuePattern.get_or_create(name)
|
||
return self._c1[name]
|
||
|
||
def get_report_line_generator(self, name: str) -> models.ReportLineGenerator:
|
||
if name not in self._c2:
|
||
self._c2[name] = models.ReportLineGenerator.get_or_create(name)
|
||
return self._c2[name]
|
||
|
||
def get_report_line_subject_of_concern(self, name: str) -> models.ReportLineSubjectOfConcern:
|
||
if name not in self._c3:
|
||
self._c3[name] = models.ReportLineSubjectOfConcern.get_or_create(
|
||
name)
|
||
return self._c3[name]
|