ufes-20191-redes-mininet/proj-impl/topoautostandalonetest.py

112 lines
3.4 KiB
Python
Raw Normal View History

2019-06-28 08:33:02 +00:00
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import sys
import json
import importlib
import subprocess
import configparser
from pathlib import Path
from topoautotest import main as run_single_test
ALGOS = [
'ospf',
'ecmp',
'ldr',
'minmax',
]
CONTROLLER_VARIABLES = Path('variables.ini')
def run_many_tests(topo, module, results, test_count, returns_result=False):
sucessfulTests = list()
while len(sucessfulTests) < test_count:
ryuController = subprocess.Popen(
['ryu-manager', 'latencycontroller.py', f'{module}.json'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
result = run_single_test(topo, module, results, True)
ryuController.kill()
subprocess.run(f"rm -f {module}.state", shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if result is not None:
sucessfulTests.append(result)
if returns_result:
return sucessfulTests
else:
results.write_text(json.dumps(
sucessfulTests,
indent=4
))
def run_many_tests_bulk(out_path, call_args, returns_result=False):
tests = dict()
for call_arg in call_args:
tests[call_arg[1]] = run_many_tests(*call_arg, True)
if returns_result:
return tests
else:
out_path.write_text(json.dumps(
tests,
indent=4
))
def run_many_tests_bulk_all_algos(out_path, call_args, returns_result=False):
original_config = CONTROLLER_VARIABLES.read_bytes()
try:
tests = dict()
cfg = configparser.ConfigParser()
cfg.read_string(CONTROLLER_VARIABLES.read_text())
for algo in ALGOS:
cfg['GENERAL']['algo'] = algo
with CONTROLLER_VARIABLES.open('w') as f:
cfg.write(f)
tests[algo] = run_many_tests_bulk(out_path, call_args, True)
if returns_result:
return tests
else:
out_path.write_text(json.dumps(
tests,
indent=4
))
finally:
CONTROLLER_VARIABLES.write_bytes(original_config)
def main(out_path, call_args, returns_result=False):
return run_many_tests_bulk_all_algos(out_path, call_args, returns_result)
if __name__ == "__main__":
if len(sys.argv) < 3:
print('Usage:')
print(
f' {sys.argv[0]} <how_many_tests> <toponame1> [<toponame2> [... [<toponameN>]]]')
print()
print(' Where toponame is will be resolved to')
print(' toponame.json')
else:
prog, tests, *modnames = sys.argv
tests = int(tests)
call_args = list()
for modname in modnames:
modname = f'{sys.argv[2]}'
modfile = Path(f'{modname}.py')
topopath = Path(f'{modname}.json')
resultspath = Path(f'{modname}.autotests.json')
if not topopath.exists():
print(f'Topology {topopath}.json does not exist.')
if not modfile.exists():
print(f'Topology {topopath}.py does not exist.')
print(f'You might want to use toporender.py to generate required files.')
else:
topo = json.loads(topopath.read_text())
mod = importlib.import_module(modname)
call_args.append((topo, mod, resultspath, tests))
main(Path('__all__.autotests.json'), call_args)