ufes-20191-redes-mininet/proj-impl/latencycontroller.py

558 lines
18 KiB
Python

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import os
import sys
import json
import datetime
from time import sleep
from id2ip import id2ip, ip2id
from typing import Any
from typing import Union
from typing import Tuple
from typing import List
from typing import Dict
from configparser import ConfigParser
from ryu.lib import hub
from ryu.base import app_manager
from ryu.lib import ofctl_v1_3
from ryu.lib.packet import ether_types
from ryu.lib.packet import ethernet
from ryu.ofproto import ofproto_v1_3
from ryu.controller.handler import set_ev_cls
from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller import ofp_event
from ryu.lib.packet import packet
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
EMPTY_ITER = iter(list())
print(f"loading network {sys.argv[2]}", file=sys.stderr)
network_topo = ''
with open(sys.argv[2]) as f:
network_topo = json.loads(f.read())
# ryu uses eventlet, which, on some versions, breaks pathlib's read_text
print(f"reading network configuration variables.ini", file=sys.stderr)
network_config = ConfigParser()
with open('variables.ini') as f:
network_config.read_string(f.read())
m1 = float(network_config['linearalgconst']['m1'])
m2 = float(network_config['linearalgconst']['m2'])
hop_delay = float(network_config['linearalgconst']['hop_delay'])
network_graph = dict()
for h in network_topo[0]:
network_graph[h] = dict()
for s in network_topo[1]:
network_graph[s] = dict()
for l in network_topo[2]:
network_graph[l[0]][l[1]] = l[2]
network_graph[l[1]][l[0]] = l[2]
def jsonload_list2tuple(x):
if isinstance(x, type(None)) or isinstance(x, int) or isinstance(x, float) or isinstance(x, str):
return x
elif isinstance(x, list) or isinstance(x, tuple):
return tuple([jsonload_list2tuple(k) for k in x])
elif isinstance(x, dict):
return {jsonload_list2tuple(k): jsonload_list2tuple(v) for k, v in x.items()}
else:
raise ValueError("Input didn't come from a standard JSON")
def _get_transition_map(graph):
transitions = [(source, target) for source, lnks in graph.items()
for target in lnks.keys()]
transmap = dict()
for s, t in transitions:
if t not in transmap:
transmap[t] = list()
transmap[t].append(s)
return {k: tuple(v) for k, v in transmap.items()}
def _find_all_paths(tm, initial, target, accumulator=None):
if accumulator is None:
accumulator = list()
accumulator = [*accumulator, initial]
if initial == target:
yield tuple(accumulator)
else:
for intermediate in tm[initial]:
if intermediate not in accumulator:
yield from _find_all_paths(tm, intermediate, target, accumulator)
yield from EMPTY_ITER
def find_all_paths(graph, initial, target, accumulator=None):
if initial == target:
yield tuple([])
else:
tm = _get_transition_map(graph)
yield from _find_all_paths(tm, initial, target)
def prepare_pop_pair_alternative_paths_for_availability(graph, hosts):
pop_apa = [[None for x in hosts] for y in hosts]
for i, h1 in enumerate(hosts):
for j, h2 in enumerate(hosts):
if pop_apa[i][j] is None:
apa = tuple(list(find_all_paths(graph, h1, h2)))
pop_apa[i][j] = apa
pop_apa[j][i] = apa
return pop_apa
def dijkstra(graph, initial):
visited = {initial: 0}
path = dict()
nodes = set(graph.keys())
mentions = {node: list(graph[node].keys()) for node in nodes}
while len(nodes) > 0:
min_node = None
for node in nodes:
if node in visited:
if min_node is None:
min_node = node
elif visited[node] < visited[min_node]:
min_node = node
if min_node is None:
break
nodes.remove(min_node)
current_weight = visited[min_node]
for edge in mentions[min_node]:
weight = current_weight + 1
if edge not in visited or weight < visited[edge]:
visited[edge] = weight
path[edge] = min_node
return visited, path
def dijkstra_min_path(dijkstra_tuple, initial, target):
visited, path = dijkstra_tuple
min_path = list()
current = target
if current in path or current == initial:
while current is not None:
min_path.append(current)
current = path.get(current)
return (list(reversed(min_path)), visited[target])
return ([], None)
class Dijkstra:
def __init__(self, graph):
self._graph = graph
self._cache = dict()
def __call__(self, initial):
if initial not in self._cache:
self._cache[initial] = dijkstra(self._graph, initial)
return DijkstraResults(initial, self._cache[initial])
class DijkstraResults:
def __init__(self, initial, dijkstra_tuple):
self._i = initial
self._dt = dijkstra_tuple
def __call__(self, target):
return dijkstra_min_path(self._dt, self._i, target)
apacachefile = f"{sys.argv[2]}.apa.json"
print(
f"checking if APA for {sys.argv[2]} is cached at {apacachefile}", file=sys.stderr)
pop_apa_candidates = None
if os.path.isfile(apacachefile):
print(f"loading APA from {apacachefile}", file=sys.stderr)
with open(apacachefile) as f:
pop_apa_candidates = jsonload_list2tuple(json.loads(f.read()))
else:
print(f"calculating APA for {sys.argv[2]}", file=sys.stderr)
pop_apa_candidates = prepare_pop_pair_alternative_paths_for_availability(
network_graph,
network_topo[0]
)
print(f"caching APA at {apacachefile}", file=sys.stderr)
with open(apacachefile, 'w') as f:
f.write(json.dumps(pop_apa_candidates))
class LatencyController:
def __init__(self, datapath):
super().__init__()
self._datapath = datapath
self._sw = f"s{self._datapath.id}"
l = [
(i, x) for i, x in enumerate(network_topo[2])
if x[0] == self._sw or x[1] == self._sw
]
self._l = l
self._links = portno_from_list(l)
self._link_speed = usage_store_from_list(l)
self._link_xfer = usage_store_from_list(l)
self._link_rules = usage_store_from_list(l)
self._bandwidth = bandwidth_from_list(l)
self._host2mac = dict()
self._ospf = None
def connected(self, la):
print(f"Switch connected: {self._sw}")
print(f"Switch OSPF-fallback loading: {self._sw}")
ofproto = self._datapath.ofproto
parser = self._datapath.ofproto_parser
self._ospf = la.ospf_dijkstra(self._sw)
for h in network_topo[0]:
ipv4_dst = id2ip(int(h[1:])-1)
match_ipv4 = parser.OFPMatch(
eth_type=0x0800,
ipv4_dst=ipv4_dst
)
match_arp = parser.OFPMatch(
eth_type=0x0806,
arp_tpa=ipv4_dst
)
next_hop = self._ospf(h)[0][1]
out_port = self._links[(self._sw, next_hop)]
actions = [parser.OFPActionOutput(out_port)]
print(f"{self._sw} --[{out_port}]--> {next_hop} ({ipv4_dst})")
self.add_flow(1, match_ipv4, actions)
self.add_flow(1, match_arp, actions)
print(f"Switch OSPF-fallback loaded: {self._sw}")
def disconnected(self):
print(f"Switch disconnected: {self._sw}")
def add_flow(self, priority, match, actions, buffer_id=None, idle_timeout=0, hard_timeout=0):
ofproto = self._datapath.ofproto
parser = self._datapath.ofproto_parser
inst = [parser.OFPInstructionActions(
ofproto.OFPIT_APPLY_ACTIONS,
actions
)]
additionals = dict()
if buffer_id:
additionals['buffer_id'] = buffer_id
if idle_timeout:
additionals['idle_timeout'] = idle_timeout
if hard_timeout:
additionals['hard_timeout'] = hard_timeout
mod = parser.OFPFlowMod(datapath=self._datapath,
priority=priority, match=match,
instructions=inst,
**additionals)
self._datapath.send_msg(mod)
def _the_other_link_endpoint(self, linktuple):
return linktuple[abs(linktuple.index(self._sw)-1)]
def _on_stats(self, ev, la):
self._link_rules = usage_store_from_list(self._l)
stats = ev.msg.body
for stat in stats:
# measuring load
bc = stat.byte_count
tp = self._links.reverse_lookup(
stat.instructions[0].actions[0].port)[0]
tf = max(0, bc - self._link_xfer[tp])
self._link_xfer[tp] = bc
self._link_speed[tp] = tf/(la.interval*10**6)
# measuring link flows
self._link_rules[tp] = 1+self._link_rules[tp]
def add_latency_segment(self, ips, ipd, nxt):
out_port = self._links[(self._sw, nxt)]
parser = self._datapath.ofproto_parser
match = parser.OFPMatch(
eth_type=0x0800,
ipv4_src=ips,
ipv4_dst=ipd
)
out_port = self._links[(self._sw, nxt)]
actions = [parser.OFPActionOutput(out_port)]
self.add_flow(5, match, actions)
ppe = ProcessPoolExecutor(1)
last_process = None
class PathEvaluator:
def __init__(self, link_usage, link_flows):
self._link_usage = link_usage
self._link_flows = link_flows
def __call__(self):
processed = UsageStore()
ospf = Dijkstra(network_graph)
for h1 in network_topo[0]:
for h2 in network_topo[0]:
if h1 != h2 and not processed.contains((h1, h2)):
h1p = network_topo[0].index(h1)
h2p = network_topo[0].index(h2)
ospf_delay = 0
ospf_aggregate = ospf(h1)(h2)[0]
for i in range(len(ospf_aggregate)-1):
ospf_delay += self._link_usage[
tuple([*ospf_aggregate[i:i+2]])
] + hop_delay
aggregates = pop_apa_candidates[h1p][h2p]
min_aggregate = None
min_aggregate_val = None
for aggregate in aggregates:
na = 0 # flows in aggregate
omax = 0 # maximum overload
for i in range(len(aggregate)-1):
link = _sort_pair(*aggregate[i:i+2])
omax = max(omax, self._link_usage[link])
na += self._link_flows[link]
pass # fraction of aggregate on path
eps2 = 0
xap = 1
for i in range(len(aggregate)-1):
link = _sort_pair(*aggregate[i:i+2])
dp = self._link_usage[link]+hop_delay
pass # path delay
eps2 += xap*(dp+((dp*m1)/ospf_delay))
exp0 = m2*omax
eps3 = 0
for i in range(len(aggregate)-1):
link = _sort_pair(*aggregate[i:i+2])
eps3 += self._link_usage[link]
aggregate_val = na*eps2+exp0+eps3
if min_aggregate_val is None or aggregate_val < min_aggregate_val:
min_aggregate_val = aggregate_val
min_aggregate = aggregate
processed[(h1, h2)] = min_aggregate
return processed
class LatencyApp(app_manager.RyuApp):
OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.mac_to_port = {}
self.controllers = {}
self.max_speed = bandwidth_from_list(enumerate(network_topo[2]))
self.last_speed = UsageStoreProxyFromCallableIterable(
AttrCallableIterable(self.controllers.values, '_link_speed'),
sum
)
self.flow_count = UsageStoreProxyFromCallableIterable(
AttrCallableIterable(self.controllers.values, '_link_rules'),
sum
)
self.ospf_dijkstra = Dijkstra(network_graph)
self.interval = float(network_config['monitoring']['interval'])
hub.spawn(self._start_monitoring, self.interval)
@set_ev_cls(ofp_event.EventOFPStateChange, [MAIN_DISPATCHER, DEAD_DISPATCHER])
def _event_ofp_state_change(self, ev):
datapath = ev.datapath
dpid = datapath.id
if ev.state == MAIN_DISPATCHER:
if dpid not in self.controllers:
self.controllers[dpid] = LatencyController(datapath)
self.controllers[dpid].connected(self)
self._request_stats(datapath)
elif ev.state == DEAD_DISPATCHER:
if dpid in self.controllers:
self.controllers[dpid].disconnected()
del self.controllers[dpid]
def _start_monitoring(self, repeat):
while True:
for ctrl in self.controllers.values():
self._request_stats(ctrl._datapath)
sleep(repeat)
def _request_stats(self, datapath):
datapath.send_msg(
datapath.ofproto_parser.OFPFlowStatsRequest(
datapath
)
)
@set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
def _flow_stats_reply_handler(self, ev):
global last_process
dpid = ev.msg.datapath.id
ctrl = self.controllers.get(dpid)
if ctrl is not None:
ctrl._on_stats(ev, self)
if last_process is not None:
last_process.cancel()
last_process = ppe.submit(
PathEvaluator(*self._monitored_data())
)
last_process.add_done_callback(self._update_topo_done)
def _monitored_data(self):
return (
self.max_speed.calculate(
self.last_speed,
lambda maxspeed, lastspeed: (
lastspeed / max(maxspeed, 0.0000000000001)
)
),
self.flow_count.to_regular_storage()
)
def _update_topo_done(self, future):
if not future.cancelled():
res = future.result()
self._update_topo_done_successfully(res)
def _update_topo_done_successfully(self, processed):
print(f'-> Topo updated {datetime.datetime.now()}')
print(f'@start.routes')
for (h1, h2), path in processed._pairs.items():
ip1 = id2ip(int(h1[1:])-1)
ip2 = id2ip(int(h2[1:])-1)
for i in range(len(path)-1):
segment = path[i:i+2]
if segment[0].startswith('s'):
ctrl = self.controllers[int(segment[0][1:])]
ctrl.add_latency_segment(ip1, ip2, segment[1])
# ctrl.add_latency_segment(ip2, ip1, segment[1])
if segment[1].startswith('s'):
ctrl = self.controllers[int(segment[1][1:])]
# ctrl.add_latency_segment(ip1, ip2, segment[0])
ctrl.add_latency_segment(ip2, ip1, segment[0])
print(path)
print(f'@end.routes')
def portno_from_list(l):
return UsageStore(dict([
(tuple(x[:2]), i+1)
for i, x in l
]))
def usage_store_from_list(l):
return UsageStore(dict([
(tuple(x[:2]), 0)
for _, x in l
]))
def bandwidth_from_list(l):
return UsageStore(dict([
(tuple(x[:2]), x[2])
for _, x in l
]))
def _sort_pair(a: str, b: str) -> Tuple[str, str]:
if a[0] == 'h' and b[0] == 's':
return b, a # h5,s7 => s7,h5
elif a[0] != b[0]:
return a, b # s7,h5 => s7,h5
elif int(a[1:]) > int(b[1:]):
return b, a # s7,s5 => s5,s7
else:
return a, b # s5,s7 => s5,s7
class PairSorterMixin:
def _sort_pair(self, a, b):
return _sort_pair(a, b)
class UsageStore(PairSorterMixin):
def __init__(self, initial=dict()):
self._pairs = dict()
for k, v in initial.items():
self[k] = v
def __getitem__(self, idx) -> float:
srtd = self._sort_pair(*idx)
if srtd not in self._pairs:
return 0
else:
return self._pairs[srtd]
def reverse_lookup(self, val):
return [k for k, v in self._pairs.items() if v == val]
def calculate(self, other, operation):
return UsageStore({
k: operation(v, other[k])
for k, v in self._pairs.items()
})
def contains(self, idx) -> bool:
return self._sort_pair(*idx) in self._pairs.keys()
def __setitem__(self, idx, val: float):
self._pairs[self._sort_pair(*idx)] = val
def __repr__(self):
return f"{type(self).__name__}({repr(self._pairs)})"
class UsageStoreProxyFromCallableIterable(PairSorterMixin):
def __init__(self, clb, get_behaviour=None):
self._callable = clb
self._behaviour = get_behaviour
def __getitem__(self, idx) -> float:
r = list()
srtd = self._sort_pair(*idx)
for us in self._callable():
v = us._pairs.get(srtd)
if v is not None:
r.append(v)
if self._behaviour is None:
return 0
else:
return self._behaviour(r)
def __setitem__(self, idx, val: float):
alo = False
srtd = self._sort_pair(*idx)
for us in self._callable():
v = us._pairs.get(srtd)
if v is not None:
us._pairs[srtd] = val
alo = True
if not alo:
raise ValueError(
f"There's no existing connection between" +
f" {idx[0]} and {idx[1]} " +
f"to update"
)
def to_regular_storage(self):
keys = set([
key
for us in self._callable()
for key in us._pairs.keys()
])
return UsageStore(dict([(key, self[key]) for key in keys]))
class AttrCallableIterable:
def __init__(self, callablecollection, attr):
self._cc = callablecollection
self._attr = attr
def __call__(self):
for i in self._cc():
yield getattr(i, self._attr)
yield from EMPTY_ITER