accessibility-checker-api/webproj/accessibility_checker/services.py

316 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
import shutil
import subprocess
import time
import unicodedata
import requests
from concurrent.futures import ProcessPoolExecutor as PoolExecutor
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from uuid import uuid4
from . import models
SEVERITY_TABLE = {
'critical': logging.CRITICAL,
'fatal': logging.FATAL,
'error': logging.ERROR,
'warning': logging.WARNING,
'warn': logging.WARN,
'info': logging.INFO,
'debug': logging.DEBUG,
'notset': logging.NOTSET,
}
ACHECKER_TABLE = {
'violation': logging.FATAL,
'potentialviolation': logging.ERROR,
'recommendation': logging.WARNING,
'potentialrecommendation': logging.INFO,
'manual': logging.DEBUG,
'pass': logging.NOTSET
}
OVER_THE_TOP_SEVERITY = max(max(SEVERITY_TABLE.values()),
max(ACHECKER_TABLE.values())) + 10
def remove_accents(input_str: str) -> str:
nfkd_form = unicodedata.normalize(
'NFKD', input_str).replace('', '"').replace('', '"').replace("", "'").replace("", "'")
return "".join([c for c in nfkd_form if not unicodedata.combining(c)])
def remove_stuff_inside_quotes(s):
l: List[str] = list()
m = True
for c in remove_accents(s).lower():
if c in ('"', "'"):
m = not m
l.append(c)
elif m:
l.append(c)
return ''.join(l)
class FakeFilePath:
def __init__(self, contents):
self._contents = contents
def read_text(self):
return self._contents
def read_bytes(self):
return self._contents
def run_check_links(url: str) -> Tuple[int, List[str], List[dict], List[dict]]:
rq = requests.get('http://localhost:64161/', params=dict(url=url))
j = rq.json()
if rq.status_code != 200:
raise ValueError('\r\n'.join(
list(map(lambda x: x['stack'], j['errors']))
))
return (j['element_count'], j['links'], j['results'], j['vnu_report'])
class AccessibilityCheckerServiceBackgroundChecks:
def __call__(self, url):
print(f'Reaching {url=}')
element_count, links, achecker_results, ran_vnu = run_check_links(url)
return element_count, links, achecker_results, ran_vnu
class AccessibilityCheckerService:
def __init__(self):
self._pec = 0
self._c1 = dict()
self._c2 = dict()
self._c3 = dict()
def assess_many(self, urls: List[str], recursion_depth: int) -> models.RequestedReport:
requested_report = models.RequestedReport(depth=recursion_depth)
requested_report.save()
visited: Dict[str, models.PageReport] = dict()
print('ENTERING POOL EXECUTOR')
with PoolExecutor(8) as pe:
print('ENTERED POOL EXECUTOR')
for url in urls:
self.assess(
url,
recursion_depth,
requested_report,
visited,
None,
pe
)
while self._pec > 0:
time.sleep(0.1)
print('EXITING POOL EXECUTOR')
print('EXITED POOL EXECUTOR')
return requested_report
def assess(
self,
url: str,
recursion_depth: int,
requested_report: models.RequestedReport,
visited_urls_reports: Dict[str, models.PageReport],
parent_page_report: Optional[models.PageReport],
pe: PoolExecutor,
):
if recursion_depth < 0:
return
if url in visited_urls_reports:
return
class DoAfterBackground:
def __init__(self,
acs: AccessibilityCheckerService,
url: str,
recursion_depth: int,
requested_report: models.RequestedReport,
visited_urls_reports: Dict[str, models.PageReport],
parent_page_report: Optional[models.PageReport],
pe: PoolExecutor,
retry: int
):
self.acs = acs
self.url = url
self.recursion_depth = recursion_depth
self.requested_report = requested_report
self.visited_urls_reports = visited_urls_reports
self.parent_page_report = parent_page_report
self.pe = pe
self.retry = retry
def __call__(self, fut):
self.retry -= 1
pe = self.pe
url = self.url
recursion_depth = self.recursion_depth
requested_report = self.requested_report
visited_urls_reports = self.visited_urls_reports
parent_page_report = self.parent_page_report
element_count, links, achecker_results, ran_vnu = [None] * 4
try:
element_count, links, achecker_results, ran_vnu = fut.result()
except subprocess.CalledProcessError as e:
retry = self.retry
if retry > 0:
print(e)
print(f'Retrying {url=} ({retry=})')
self.submit()
else:
print(f'Ignoring {url=} ({retry=})')
self.acs._pec -= 1
return
print(f'Reached {url=}')
web_url = models.WebURL.get_or_create(url)
page_report = models.PageReport(
url=web_url,
request=requested_report,
parent=parent_page_report,
found_urls='\r\n'.join(links),
element_count=element_count,
remaining_depth=recursion_depth,
)
page_report.save()
page_report_lines = list()
if True:
rlg_iac = self.acs.get_report_line_generator(
'achecker')
rlsoc_a = self.acs.get_report_line_subject_of_concern(
'wcag')
for wcag_consideration in achecker_results:
severity = wcag_consideration['value'][0].lower()
severity_int = ACHECKER_TABLE.get(severity,
OVER_THE_TOP_SEVERITY)
line = models.PageReportLine(
report=page_report,
generator=rlg_iac,
pattern=self.acs.get_report_line_issue_pattern(
wcag_consideration['ruleId']),
message=wcag_consideration['message'],
snippet=wcag_consideration['snippet'],
location_aria=wcag_consideration['path']['aria'],
location_xpath=wcag_consideration['path']['dom'],
location_css=None,
line_start=None,
line_end=None,
column_start=None,
column_end=None,
highlight_start=None,
highlight_end=None,
passed=wcag_consideration['level'] == 'pass',
severity=severity,
severity_int=severity_int,
subject_of_concern=rlsoc_a
)
page_report_lines.append(line)
del wcag_consideration
if True:
rlg_vnu = self.acs.get_report_line_generator('vnu')
rlsoc_s = self.acs.get_report_line_subject_of_concern(
'syntax')
for syntax_consideration in ran_vnu:
severity = syntax_consideration.get(
'subType', syntax_consideration['type']).lower()
severity_int = SEVERITY_TABLE.get(severity,
OVER_THE_TOP_SEVERITY)
line = models.PageReportLine(
report=page_report,
generator=rlg_vnu,
pattern=self.acs.get_report_line_issue_pattern(
remove_stuff_inside_quotes(
syntax_consideration['message']
)
),
message=syntax_consideration.get('message', ''),
snippet=syntax_consideration.get('extract', ''),
location_aria=None,
location_xpath=None,
location_css=None,
line_start=(
syntax_consideration.get('line') if
'lastLine' not in syntax_consideration
else syntax_consideration['lastLine'] + 1 -
len(syntax_consideration.get(
'extract', '').splitlines())
),
line_end=syntax_consideration.get(
'lastLine',
syntax_consideration.get('line')
),
column_start=syntax_consideration.get(
'firstColumn'),
column_end=syntax_consideration.get('lastColumn'),
highlight_start=syntax_consideration.get(
'hiliteStart'),
highlight_end=None if
('hiliteStart' not in syntax_consideration or
'hiliteLength' not in syntax_consideration) else
syntax_consideration['hiliteStart'] +
syntax_consideration['hiliteLength'],
passed=severity != 'error',
severity=severity,
severity_int=severity_int,
subject_of_concern=rlsoc_s
)
page_report_lines.append(line)
del syntax_consideration
models.PageReportLine.objects.bulk_create(
page_report_lines,
batch_size=1 << 15
)
visited_urls_reports[url] = page_report
url_part = url.split('/')[2]
for link in links:
link_part = link.split('/')[2]
if url_part == link_part:
self.acs.assess(
link,
recursion_depth - 1,
requested_report,
visited_urls_reports,
page_report,
pe,
)
self.acs._pec -= 1
def submit(self):
self.acs._pec += 1
pe.submit(
AccessibilityCheckerServiceBackgroundChecks(),
url
).add_done_callback(
self
)
DoAfterBackground(
self,
url,
recursion_depth,
requested_report,
visited_urls_reports,
parent_page_report,
pe,
5,
).submit()
def get_report_line_issue_pattern(self, name: str) -> models.ReportLineIssuePattern:
if name not in self._c1:
self._c1[name] = models.ReportLineIssuePattern.get_or_create(name)
return self._c1[name]
def get_report_line_generator(self, name: str) -> models.ReportLineGenerator:
if name not in self._c2:
self._c2[name] = models.ReportLineGenerator.get_or_create(name)
return self._c2[name]
def get_report_line_subject_of_concern(self, name: str) -> models.ReportLineSubjectOfConcern:
if name not in self._c3:
self._c3[name] = models.ReportLineSubjectOfConcern.get_or_create(
name)
return self._c3[name]