558 lines
18 KiB
Python
558 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
# -*- encoding: utf-8 -*-
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import datetime
|
|
from time import sleep
|
|
from id2ip import id2ip, ip2id
|
|
from typing import Any
|
|
from typing import Union
|
|
from typing import Tuple
|
|
from typing import List
|
|
from typing import Dict
|
|
from configparser import ConfigParser
|
|
from ryu.lib import hub
|
|
from ryu.base import app_manager
|
|
from ryu.lib import ofctl_v1_3
|
|
from ryu.lib.packet import ether_types
|
|
from ryu.lib.packet import ethernet
|
|
from ryu.ofproto import ofproto_v1_3
|
|
from ryu.controller.handler import set_ev_cls
|
|
from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER
|
|
from ryu.controller import ofp_event
|
|
from ryu.lib.packet import packet
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from concurrent.futures import ProcessPoolExecutor
|
|
|
|
EMPTY_ITER = iter(list())
|
|
|
|
print(f"loading network {sys.argv[2]}", file=sys.stderr)
|
|
network_topo = ''
|
|
with open(sys.argv[2]) as f:
|
|
network_topo = json.loads(f.read())
|
|
# ryu uses eventlet, which, on some versions, breaks pathlib's read_text
|
|
|
|
print(f"reading network configuration variables.ini", file=sys.stderr)
|
|
network_config = ConfigParser()
|
|
with open('variables.ini') as f:
|
|
network_config.read_string(f.read())
|
|
|
|
m1 = float(network_config['linearalgconst']['m1'])
|
|
m2 = float(network_config['linearalgconst']['m2'])
|
|
hop_delay = float(network_config['linearalgconst']['hop_delay'])
|
|
|
|
network_graph = dict()
|
|
for h in network_topo[0]:
|
|
network_graph[h] = dict()
|
|
for s in network_topo[1]:
|
|
network_graph[s] = dict()
|
|
for l in network_topo[2]:
|
|
network_graph[l[0]][l[1]] = l[2]
|
|
network_graph[l[1]][l[0]] = l[2]
|
|
|
|
|
|
def jsonload_list2tuple(x):
|
|
if isinstance(x, type(None)) or isinstance(x, int) or isinstance(x, float) or isinstance(x, str):
|
|
return x
|
|
elif isinstance(x, list) or isinstance(x, tuple):
|
|
return tuple([jsonload_list2tuple(k) for k in x])
|
|
elif isinstance(x, dict):
|
|
return {jsonload_list2tuple(k): jsonload_list2tuple(v) for k, v in x.items()}
|
|
else:
|
|
raise ValueError("Input didn't come from a standard JSON")
|
|
|
|
|
|
def _get_transition_map(graph):
|
|
transitions = [(source, target) for source, lnks in graph.items()
|
|
for target in lnks.keys()]
|
|
transmap = dict()
|
|
for s, t in transitions:
|
|
if t not in transmap:
|
|
transmap[t] = list()
|
|
transmap[t].append(s)
|
|
return {k: tuple(v) for k, v in transmap.items()}
|
|
|
|
|
|
def _find_all_paths(tm, initial, target, accumulator=None):
|
|
if accumulator is None:
|
|
accumulator = list()
|
|
accumulator = [*accumulator, initial]
|
|
if initial == target:
|
|
yield tuple(accumulator)
|
|
else:
|
|
for intermediate in tm[initial]:
|
|
if intermediate not in accumulator:
|
|
yield from _find_all_paths(tm, intermediate, target, accumulator)
|
|
yield from EMPTY_ITER
|
|
|
|
|
|
def find_all_paths(graph, initial, target, accumulator=None):
|
|
if initial == target:
|
|
yield tuple([])
|
|
else:
|
|
tm = _get_transition_map(graph)
|
|
yield from _find_all_paths(tm, initial, target)
|
|
|
|
|
|
def prepare_pop_pair_alternative_paths_for_availability(graph, hosts):
|
|
pop_apa = [[None for x in hosts] for y in hosts]
|
|
for i, h1 in enumerate(hosts):
|
|
for j, h2 in enumerate(hosts):
|
|
if pop_apa[i][j] is None:
|
|
apa = tuple(list(find_all_paths(graph, h1, h2)))
|
|
pop_apa[i][j] = apa
|
|
pop_apa[j][i] = apa
|
|
return pop_apa
|
|
|
|
|
|
def dijkstra(graph, initial):
|
|
visited = {initial: 0}
|
|
path = dict()
|
|
|
|
nodes = set(graph.keys())
|
|
mentions = {node: list(graph[node].keys()) for node in nodes}
|
|
|
|
while len(nodes) > 0:
|
|
min_node = None
|
|
for node in nodes:
|
|
if node in visited:
|
|
if min_node is None:
|
|
min_node = node
|
|
elif visited[node] < visited[min_node]:
|
|
min_node = node
|
|
|
|
if min_node is None:
|
|
break
|
|
|
|
nodes.remove(min_node)
|
|
current_weight = visited[min_node]
|
|
|
|
for edge in mentions[min_node]:
|
|
weight = current_weight + 1
|
|
if edge not in visited or weight < visited[edge]:
|
|
visited[edge] = weight
|
|
path[edge] = min_node
|
|
return visited, path
|
|
|
|
|
|
def dijkstra_min_path(dijkstra_tuple, initial, target):
|
|
visited, path = dijkstra_tuple
|
|
min_path = list()
|
|
current = target
|
|
if current in path or current == initial:
|
|
while current is not None:
|
|
min_path.append(current)
|
|
current = path.get(current)
|
|
return (list(reversed(min_path)), visited[target])
|
|
return ([], None)
|
|
|
|
|
|
class Dijkstra:
|
|
def __init__(self, graph):
|
|
self._graph = graph
|
|
self._cache = dict()
|
|
|
|
def __call__(self, initial):
|
|
if initial not in self._cache:
|
|
self._cache[initial] = dijkstra(self._graph, initial)
|
|
return DijkstraResults(initial, self._cache[initial])
|
|
|
|
|
|
class DijkstraResults:
|
|
def __init__(self, initial, dijkstra_tuple):
|
|
self._i = initial
|
|
self._dt = dijkstra_tuple
|
|
|
|
def __call__(self, target):
|
|
return dijkstra_min_path(self._dt, self._i, target)
|
|
|
|
|
|
apacachefile = f"{sys.argv[2]}.apa.json"
|
|
print(
|
|
f"checking if APA for {sys.argv[2]} is cached at {apacachefile}", file=sys.stderr)
|
|
|
|
pop_apa_candidates = None
|
|
if os.path.isfile(apacachefile):
|
|
print(f"loading APA from {apacachefile}", file=sys.stderr)
|
|
with open(apacachefile) as f:
|
|
pop_apa_candidates = jsonload_list2tuple(json.loads(f.read()))
|
|
else:
|
|
print(f"calculating APA for {sys.argv[2]}", file=sys.stderr)
|
|
pop_apa_candidates = prepare_pop_pair_alternative_paths_for_availability(
|
|
network_graph,
|
|
network_topo[0]
|
|
)
|
|
print(f"caching APA at {apacachefile}", file=sys.stderr)
|
|
with open(apacachefile, 'w') as f:
|
|
f.write(json.dumps(pop_apa_candidates))
|
|
|
|
|
|
class LatencyController:
|
|
def __init__(self, datapath):
|
|
super().__init__()
|
|
self._datapath = datapath
|
|
self._sw = f"s{self._datapath.id}"
|
|
l = [
|
|
(i, x) for i, x in enumerate(network_topo[2])
|
|
if x[0] == self._sw or x[1] == self._sw
|
|
]
|
|
self._l = l
|
|
self._links = portno_from_list(l)
|
|
self._link_speed = usage_store_from_list(l)
|
|
self._link_xfer = usage_store_from_list(l)
|
|
self._link_rules = usage_store_from_list(l)
|
|
self._bandwidth = bandwidth_from_list(l)
|
|
self._host2mac = dict()
|
|
self._ospf = None
|
|
|
|
def connected(self, la):
|
|
print(f"Switch connected: {self._sw}")
|
|
print(f"Switch OSPF-fallback loading: {self._sw}")
|
|
ofproto = self._datapath.ofproto
|
|
parser = self._datapath.ofproto_parser
|
|
self._ospf = la.ospf_dijkstra(self._sw)
|
|
for h in network_topo[0]:
|
|
ipv4_dst = id2ip(int(h[1:])-1)
|
|
match_ipv4 = parser.OFPMatch(
|
|
eth_type=0x0800,
|
|
ipv4_dst=ipv4_dst
|
|
)
|
|
match_arp = parser.OFPMatch(
|
|
eth_type=0x0806,
|
|
arp_tpa=ipv4_dst
|
|
)
|
|
next_hop = self._ospf(h)[0][1]
|
|
out_port = self._links[(self._sw, next_hop)]
|
|
actions = [parser.OFPActionOutput(out_port)]
|
|
print(f"{self._sw} --[{out_port}]--> {next_hop} ({ipv4_dst})")
|
|
self.add_flow(1, match_ipv4, actions)
|
|
self.add_flow(1, match_arp, actions)
|
|
print(f"Switch OSPF-fallback loaded: {self._sw}")
|
|
|
|
def disconnected(self):
|
|
print(f"Switch disconnected: {self._sw}")
|
|
|
|
def add_flow(self, priority, match, actions, buffer_id=None, idle_timeout=0, hard_timeout=0):
|
|
ofproto = self._datapath.ofproto
|
|
parser = self._datapath.ofproto_parser
|
|
|
|
inst = [parser.OFPInstructionActions(
|
|
ofproto.OFPIT_APPLY_ACTIONS,
|
|
actions
|
|
)]
|
|
additionals = dict()
|
|
if buffer_id:
|
|
additionals['buffer_id'] = buffer_id
|
|
if idle_timeout:
|
|
additionals['idle_timeout'] = idle_timeout
|
|
if hard_timeout:
|
|
additionals['hard_timeout'] = hard_timeout
|
|
mod = parser.OFPFlowMod(datapath=self._datapath,
|
|
priority=priority, match=match,
|
|
instructions=inst,
|
|
**additionals)
|
|
self._datapath.send_msg(mod)
|
|
|
|
def _the_other_link_endpoint(self, linktuple):
|
|
return linktuple[abs(linktuple.index(self._sw)-1)]
|
|
|
|
def _on_stats(self, ev, la):
|
|
self._link_rules = usage_store_from_list(self._l)
|
|
stats = ev.msg.body
|
|
for stat in stats:
|
|
# measuring load
|
|
bc = stat.byte_count
|
|
tp = self._links.reverse_lookup(
|
|
stat.instructions[0].actions[0].port)[0]
|
|
tf = max(0, bc - self._link_xfer[tp])
|
|
self._link_xfer[tp] = bc
|
|
self._link_speed[tp] = tf/(la.interval*10**6)
|
|
# measuring link flows
|
|
self._link_rules[tp] = 1+self._link_rules[tp]
|
|
|
|
def add_latency_segment(self, ips, ipd, nxt):
|
|
out_port = self._links[(self._sw, nxt)]
|
|
parser = self._datapath.ofproto_parser
|
|
match = parser.OFPMatch(
|
|
eth_type=0x0800,
|
|
ipv4_src=ips,
|
|
ipv4_dst=ipd
|
|
)
|
|
out_port = self._links[(self._sw, nxt)]
|
|
actions = [parser.OFPActionOutput(out_port)]
|
|
self.add_flow(5, match, actions)
|
|
|
|
|
|
ppe = ProcessPoolExecutor(1)
|
|
last_process = None
|
|
|
|
|
|
class PathEvaluator:
|
|
def __init__(self, link_usage, link_flows):
|
|
self._link_usage = link_usage
|
|
self._link_flows = link_flows
|
|
|
|
def __call__(self):
|
|
processed = UsageStore()
|
|
ospf = Dijkstra(network_graph)
|
|
for h1 in network_topo[0]:
|
|
for h2 in network_topo[0]:
|
|
if h1 != h2 and not processed.contains((h1, h2)):
|
|
h1p = network_topo[0].index(h1)
|
|
h2p = network_topo[0].index(h2)
|
|
ospf_delay = 0
|
|
ospf_aggregate = ospf(h1)(h2)[0]
|
|
for i in range(len(ospf_aggregate)-1):
|
|
ospf_delay += self._link_usage[
|
|
tuple([*ospf_aggregate[i:i+2]])
|
|
] + hop_delay
|
|
aggregates = pop_apa_candidates[h1p][h2p]
|
|
min_aggregate = None
|
|
min_aggregate_val = None
|
|
for aggregate in aggregates:
|
|
na = 0 # flows in aggregate
|
|
omax = 0 # maximum overload
|
|
for i in range(len(aggregate)-1):
|
|
link = _sort_pair(*aggregate[i:i+2])
|
|
omax = max(omax, self._link_usage[link])
|
|
na += self._link_flows[link]
|
|
pass # fraction of aggregate on path
|
|
eps2 = 0
|
|
xap = 1
|
|
for i in range(len(aggregate)-1):
|
|
link = _sort_pair(*aggregate[i:i+2])
|
|
dp = self._link_usage[link]+hop_delay
|
|
pass # path delay
|
|
eps2 += xap*(dp+((dp*m1)/ospf_delay))
|
|
exp0 = m2*omax
|
|
eps3 = 0
|
|
for i in range(len(aggregate)-1):
|
|
link = _sort_pair(*aggregate[i:i+2])
|
|
eps3 += self._link_usage[link]
|
|
aggregate_val = na*eps2+exp0+eps3
|
|
if min_aggregate_val is None or aggregate_val < min_aggregate_val:
|
|
min_aggregate_val = aggregate_val
|
|
min_aggregate = aggregate
|
|
processed[(h1, h2)] = min_aggregate
|
|
return processed
|
|
|
|
|
|
class LatencyApp(app_manager.RyuApp):
|
|
OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.mac_to_port = {}
|
|
self.controllers = {}
|
|
self.max_speed = bandwidth_from_list(enumerate(network_topo[2]))
|
|
self.last_speed = UsageStoreProxyFromCallableIterable(
|
|
AttrCallableIterable(self.controllers.values, '_link_speed'),
|
|
sum
|
|
)
|
|
self.flow_count = UsageStoreProxyFromCallableIterable(
|
|
AttrCallableIterable(self.controllers.values, '_link_rules'),
|
|
sum
|
|
)
|
|
self.ospf_dijkstra = Dijkstra(network_graph)
|
|
self.interval = float(network_config['monitoring']['interval'])
|
|
hub.spawn(self._start_monitoring, self.interval)
|
|
|
|
@set_ev_cls(ofp_event.EventOFPStateChange, [MAIN_DISPATCHER, DEAD_DISPATCHER])
|
|
def _event_ofp_state_change(self, ev):
|
|
datapath = ev.datapath
|
|
dpid = datapath.id
|
|
if ev.state == MAIN_DISPATCHER:
|
|
if dpid not in self.controllers:
|
|
self.controllers[dpid] = LatencyController(datapath)
|
|
self.controllers[dpid].connected(self)
|
|
self._request_stats(datapath)
|
|
elif ev.state == DEAD_DISPATCHER:
|
|
if dpid in self.controllers:
|
|
self.controllers[dpid].disconnected()
|
|
del self.controllers[dpid]
|
|
|
|
def _start_monitoring(self, repeat):
|
|
while True:
|
|
for ctrl in self.controllers.values():
|
|
self._request_stats(ctrl._datapath)
|
|
sleep(repeat)
|
|
|
|
def _request_stats(self, datapath):
|
|
datapath.send_msg(
|
|
datapath.ofproto_parser.OFPFlowStatsRequest(
|
|
datapath
|
|
)
|
|
)
|
|
|
|
@set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
|
|
def _flow_stats_reply_handler(self, ev):
|
|
global last_process
|
|
dpid = ev.msg.datapath.id
|
|
ctrl = self.controllers.get(dpid)
|
|
if ctrl is not None:
|
|
ctrl._on_stats(ev, self)
|
|
if last_process is not None:
|
|
last_process.cancel()
|
|
last_process = ppe.submit(
|
|
PathEvaluator(*self._monitored_data())
|
|
)
|
|
last_process.add_done_callback(self._update_topo_done)
|
|
|
|
def _monitored_data(self):
|
|
return (
|
|
self.max_speed.calculate(
|
|
self.last_speed,
|
|
lambda maxspeed, lastspeed: (
|
|
lastspeed / max(maxspeed, 0.0000000000001)
|
|
)
|
|
),
|
|
self.flow_count.to_regular_storage()
|
|
)
|
|
|
|
def _update_topo_done(self, future):
|
|
if not future.cancelled():
|
|
res = future.result()
|
|
self._update_topo_done_successfully(res)
|
|
|
|
def _update_topo_done_successfully(self, processed):
|
|
print(f'-> Topo updated {datetime.datetime.now()}')
|
|
print(f'@start.routes')
|
|
for (h1, h2), path in processed._pairs.items():
|
|
ip1 = id2ip(int(h1[1:])-1)
|
|
ip2 = id2ip(int(h2[1:])-1)
|
|
for i in range(len(path)-1):
|
|
segment = path[i:i+2]
|
|
if segment[0].startswith('s'):
|
|
ctrl = self.controllers[int(segment[0][1:])]
|
|
ctrl.add_latency_segment(ip1, ip2, segment[1])
|
|
# ctrl.add_latency_segment(ip2, ip1, segment[1])
|
|
if segment[1].startswith('s'):
|
|
ctrl = self.controllers[int(segment[1][1:])]
|
|
# ctrl.add_latency_segment(ip1, ip2, segment[0])
|
|
ctrl.add_latency_segment(ip2, ip1, segment[0])
|
|
print(path)
|
|
print(f'@end.routes')
|
|
|
|
|
|
def portno_from_list(l):
|
|
return UsageStore(dict([
|
|
(tuple(x[:2]), i+1)
|
|
for i, x in l
|
|
]))
|
|
|
|
|
|
def usage_store_from_list(l):
|
|
return UsageStore(dict([
|
|
(tuple(x[:2]), 0)
|
|
for _, x in l
|
|
]))
|
|
|
|
|
|
def bandwidth_from_list(l):
|
|
return UsageStore(dict([
|
|
(tuple(x[:2]), x[2])
|
|
for _, x in l
|
|
]))
|
|
|
|
|
|
def _sort_pair(a: str, b: str) -> Tuple[str, str]:
|
|
if a[0] == 'h' and b[0] == 's':
|
|
return b, a # h5,s7 => s7,h5
|
|
elif a[0] != b[0]:
|
|
return a, b # s7,h5 => s7,h5
|
|
elif int(a[1:]) > int(b[1:]):
|
|
return b, a # s7,s5 => s5,s7
|
|
else:
|
|
return a, b # s5,s7 => s5,s7
|
|
|
|
|
|
class PairSorterMixin:
|
|
def _sort_pair(self, a, b):
|
|
return _sort_pair(a, b)
|
|
|
|
|
|
class UsageStore(PairSorterMixin):
|
|
def __init__(self, initial=dict()):
|
|
self._pairs = dict()
|
|
for k, v in initial.items():
|
|
self[k] = v
|
|
|
|
def __getitem__(self, idx) -> float:
|
|
srtd = self._sort_pair(*idx)
|
|
if srtd not in self._pairs:
|
|
return 0
|
|
else:
|
|
return self._pairs[srtd]
|
|
|
|
def reverse_lookup(self, val):
|
|
return [k for k, v in self._pairs.items() if v == val]
|
|
|
|
def calculate(self, other, operation):
|
|
return UsageStore({
|
|
k: operation(v, other[k])
|
|
for k, v in self._pairs.items()
|
|
})
|
|
|
|
def contains(self, idx) -> bool:
|
|
return self._sort_pair(*idx) in self._pairs.keys()
|
|
|
|
def __setitem__(self, idx, val: float):
|
|
self._pairs[self._sort_pair(*idx)] = val
|
|
|
|
def __repr__(self):
|
|
return f"{type(self).__name__}({repr(self._pairs)})"
|
|
|
|
|
|
class UsageStoreProxyFromCallableIterable(PairSorterMixin):
|
|
def __init__(self, clb, get_behaviour=None):
|
|
self._callable = clb
|
|
self._behaviour = get_behaviour
|
|
|
|
def __getitem__(self, idx) -> float:
|
|
r = list()
|
|
srtd = self._sort_pair(*idx)
|
|
for us in self._callable():
|
|
v = us._pairs.get(srtd)
|
|
if v is not None:
|
|
r.append(v)
|
|
if self._behaviour is None:
|
|
return 0
|
|
else:
|
|
return self._behaviour(r)
|
|
|
|
def __setitem__(self, idx, val: float):
|
|
alo = False
|
|
srtd = self._sort_pair(*idx)
|
|
for us in self._callable():
|
|
v = us._pairs.get(srtd)
|
|
if v is not None:
|
|
us._pairs[srtd] = val
|
|
alo = True
|
|
if not alo:
|
|
raise ValueError(
|
|
f"There's no existing connection between" +
|
|
f" {idx[0]} and {idx[1]} " +
|
|
f"to update"
|
|
)
|
|
|
|
def to_regular_storage(self):
|
|
keys = set([
|
|
key
|
|
for us in self._callable()
|
|
for key in us._pairs.keys()
|
|
])
|
|
return UsageStore(dict([(key, self[key]) for key in keys]))
|
|
|
|
|
|
class AttrCallableIterable:
|
|
def __init__(self, callablecollection, attr):
|
|
self._cc = callablecollection
|
|
self._attr = attr
|
|
|
|
def __call__(self):
|
|
for i in self._cc():
|
|
yield getattr(i, self._attr)
|
|
yield from EMPTY_ITER
|