#!/usr/bin/env python3 # -*- encoding: utf-8 -*- import os import sys import json import random import datetime import threading from graphtools import Dijkstra, find_all_paths from time import sleep from io import StringIO from id2ip import id2ip, ip2id from typing import Any from typing import Union from typing import Tuple from typing import List from typing import Dict from configparser import ConfigParser from ryu.lib import hub from ryu.base import app_manager from ryu.lib import ofctl_v1_3 from ryu.lib.packet import ether_types from ryu.lib.packet import ethernet from ryu.ofproto import ofproto_v1_3 from ryu.controller.handler import set_ev_cls from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER from ryu.controller import ofp_event from ryu.lib.packet import packet from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ProcessPoolExecutor EMPTY_ITER = iter(list()) print(f"loading network {sys.argv[2]}", file=sys.stderr) network_topo = '' with open(sys.argv[2]) as f: network_topo = json.loads(f.read()) # ryu uses eventlet, which, on some versions, breaks pathlib's read_text base_net_name = '.'.join(sys.argv[2].split('.')[:-1]) print(f"reading network configuration variables.ini", file=sys.stderr) network_config = ConfigParser() with open('variables.ini') as f: network_config.read_string(f.read()) m1 = float(network_config['linearalgconst']['m1']) m2 = float(network_config['linearalgconst']['m2']) hop_delay = float(network_config['linearalgconst']['hop_delay']) ospf_only = bool(eval(network_config['GENERAL']['ospf_only'])) network_graph = dict() for h in network_topo[0]: network_graph[h] = dict() for s in network_topo[1]: network_graph[s] = dict() for l in network_topo[2]: network_graph[l[0]][l[1]] = l[2] network_graph[l[1]][l[0]] = l[2] def jsonload_list2tuple(x): if isinstance(x, type(None)) or isinstance(x, int) or isinstance(x, float) or isinstance(x, str): return x elif isinstance(x, list) or isinstance(x, tuple): return tuple([jsonload_list2tuple(k) for k in x]) elif isinstance(x, dict): return {jsonload_list2tuple(k): jsonload_list2tuple(v) for k, v in x.items()} else: raise ValueError("Input didn't come from a standard JSON") def prepare_pop_pair_alternative_paths_for_availability(graph, hosts): pop_apa = [[None for x in hosts] for y in hosts] for i, h1 in enumerate(hosts): for j, h2 in enumerate(hosts): if pop_apa[i][j] is None: apa = tuple(list(find_all_paths(graph, h1, h2))) pop_apa[i][j] = apa pop_apa[j][i] = apa return pop_apa apacachefile = f"{base_net_name}.apa.json" print( f"checking if APA for {sys.argv[2]} is cached at {apacachefile}", file=sys.stderr) pop_apa_candidates = None if os.path.isfile(apacachefile): print(f"loading APA from {apacachefile}", file=sys.stderr) with open(apacachefile) as f: pop_apa_candidates = jsonload_list2tuple(json.loads(f.read())) else: print(f"calculating APA for {sys.argv[2]}", file=sys.stderr) pop_apa_candidates = prepare_pop_pair_alternative_paths_for_availability( network_graph, network_topo[0] ) print(f"caching APA at {apacachefile}", file=sys.stderr) with open(apacachefile, 'w') as f: f.write(json.dumps(pop_apa_candidates)) class LatencyController: def __init__(self, datapath): super().__init__() self._datapath = datapath self._sw = f"s{self._datapath.id}" l = [ (i, x) for i, x in enumerate(network_topo[2]) if x[0] == self._sw or x[1] == self._sw ] self._l = l self._links = portno_from_list(l) self._flow_xfer = dict() self._flow_speed = dict() self._link_speed = UsageStoreProxyFromFlowDict(self._flow_speed) self._link_rules = usage_store_from_list(l) self._bandwidth = bandwidth_from_list(l) self._host2mac = dict() self._ospf = None def connected(self, la): print(f"Switch connected: {self._sw}") print(f"Switch OSPF-fallback loading: {self._sw}") parser = self._datapath.ofproto_parser self._ospf = la.ospf_dijkstra(self._sw) for h in network_topo[0]: ipv4_dst = id2ip(int(h[1:])-1) match_ipv4 = parser.OFPMatch( eth_type=0x0800, ipv4_dst=ipv4_dst ) match_arp = parser.OFPMatch( eth_type=0x0806, arp_tpa=ipv4_dst ) next_hop = self._ospf(h)[0][1] out_port = self._links[(self._sw, next_hop)] actions = [parser.OFPActionOutput(out_port)] print(f"{self._sw} --[{out_port}]--> {next_hop} ({ipv4_dst})") self.add_flow(1, match_ipv4, actions) self.add_flow(1, match_arp, actions) print(f"Switch OSPF-fallback loaded: {self._sw}") def disconnected(self): print(f"Switch disconnected: {self._sw}") def add_flow(self, priority, match, actions, buffer_id=None, idle_timeout=0, hard_timeout=0): ofproto = self._datapath.ofproto parser = self._datapath.ofproto_parser inst = [parser.OFPInstructionActions( ofproto.OFPIT_APPLY_ACTIONS, actions )] additionals = dict() if buffer_id: additionals['buffer_id'] = buffer_id if idle_timeout: additionals['idle_timeout'] = idle_timeout if hard_timeout: additionals['hard_timeout'] = hard_timeout mod = parser.OFPFlowMod(datapath=self._datapath, priority=priority, match=match, instructions=inst, **additionals) self._datapath.send_msg(mod) def _the_other_link_endpoint(self, linktuple): return linktuple[abs(linktuple.index(self._sw)-1)] def _on_stats(self, ev, la): self._link_rules = usage_store_from_list(self._l) stats = ev.msg.body allkeys = [] for k in self._link_rules._pairs.keys(): self._link_rules._pairs[k] = 0 for stat in stats: # ltp -> link tuple # ftp -> flow tuple # bc -> byte count # obc -> old byte count out_port = stat.instructions[0].actions[0].port ltps = self._links.reverse_lookup(out_port) if len(ltps) <= 0: continue ltp = ltps[0] nxt = ltp[abs(ltp.index(self._sw)-1)] # measuring link flows self._link_rules[ltp] = 1+self._link_rules[ltp] # measuring load if stat.match['eth_type'] == 0x0800: src = stat.match.get('ipv4_src', '10.0.0.0') dst = stat.match['ipv4_dst'] src = f"h{ip2id(src)+1}" dst = f"h{ip2id(dst)+1}" if src == 'h0': src = None key = (src, self._sw, nxt, dst) bc = stat.byte_count obc = self._flow_xfer.get(key, 0) tfd = max(0, bc-obc) # bytes per cycle tfr = ((8*tfd)/la.interval)/10**6 # mbps self._flow_xfer[key] = bc self._flow_speed[key] = tfr allkeys.append(key) for k in (set(self._flow_speed.keys())-set(allkeys)): self._flow_speed[k] = 0 def add_latency_segment(self, ips, ipd, nxt): if ospf_only: return link = (self._sw, nxt) parser = self._datapath.ofproto_parser actions = [parser.OFPActionOutput(self._links[link])] match = parser.OFPMatch( eth_type=0x0800, ipv4_src=ips, ipv4_dst=ipd ) self.add_flow(5, match, actions) # hs = f"h{ip2id(ips)+1}" # hd = f"h{ip2id(ipd)+1}" # self._flow_xfer[(hs, hd, nxt)] = 0 ppe = ProcessPoolExecutor(1) last_process = None last_process_lock = threading.Lock() class PathEvaluator: def __init__(self, link_usage, link_flows): self._link_usage = link_usage self._link_flows = link_flows def __call__(self): processed = UsageStore() currently_reserved = UsageStore() ospf = Dijkstra(network_graph) for h1 in network_topo[0]: for h2 in network_topo[0]: if h1 != h2 and not processed.contains((h1, h2)): h1p = network_topo[0].index(h1) h2p = network_topo[0].index(h2) ospf_delay = 0 ospf_aggregate = ospf(h1)(h2)[0] for i in range(len(ospf_aggregate)-1): ospf_delay += self._link_usage[ tuple([*ospf_aggregate[i:i+2]]) ] + hop_delay aggregates = list(pop_apa_candidates[h1p][h2p]) random.shuffle(aggregates) min_aggregate = None min_aggregate_val = None for aggregate in aggregates: na = 0 # flows in aggregate omax = 0 # maximum overload for i in range(len(aggregate)-1): link = _sort_pair(*aggregate[i:i+2]) omax = max(omax, self._link_usage[link]) na += self._link_flows[link] na += currently_reserved[link] pass # fraction of aggregate on path eps2 = 0 xap = 1 for i in range(len(aggregate)-1): link = _sort_pair(*aggregate[i:i+2]) dp = self._link_usage[link]+hop_delay pass # path delay eps2 += xap*(dp+((dp*m1)/ospf_delay)) exp0 = m2*omax eps3 = 0 for i in range(len(aggregate)-1): link = _sort_pair(*aggregate[i:i+2]) eps3 += self._link_usage[link] aggregate_val = na*eps2+exp0+eps3 if min_aggregate_val is None or aggregate_val < min_aggregate_val: min_aggregate_val = aggregate_val min_aggregate = aggregate processed[(h1, h2)] = min_aggregate for i in range(len(min_aggregate)-1): link = _sort_pair(*min_aggregate[i:i+2]) currently_reserved[link] = 1+currently_reserved[link] return processed class LatencyApp(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.mac_to_port = {} self.controllers = {} self._paths = host_path_from_list(network_topo[0]) self._loads = usage_store_from_list(enumerate([])) self.max_speed = bandwidth_from_list(enumerate(network_topo[2])) self.last_speed = UsageStoreProxyFromCallableIterable( AttrCallableIterable( AttrCallableIterable( self.controllers.values, '_link_speed' ), 'as_regular_storage' ), sum ) self.flow_count = UsageStoreProxyFromCallableIterable( AttrCallableIterable(self.controllers.values, '_link_rules'), sum ) self.ospf_dijkstra = Dijkstra(network_graph) self._fill_paths_with_ospf() self.interval = float(network_config['monitoring']['interval']) hub.spawn(self._start_monitoring, self.interval) def _fill_paths_with_ospf(self): for h1 in network_topo[0]: for h2 in network_topo[0]: if h1 != h2 and len(self._paths[(h1, h2)]) <= 0: self._paths[(h1, h2)] = self.ospf_dijkstra(h1)(h2)[0] @set_ev_cls(ofp_event.EventOFPStateChange, [MAIN_DISPATCHER, DEAD_DISPATCHER]) def _event_ofp_state_change(self, ev): datapath = ev.datapath dpid = datapath.id if ev.state == MAIN_DISPATCHER: if dpid not in self.controllers: self.controllers[dpid] = LatencyController(datapath) self.controllers[dpid].connected(self) self._request_stats(datapath) elif ev.state == DEAD_DISPATCHER: if dpid in self.controllers: self.controllers[dpid].disconnected() del self.controllers[dpid] def _start_monitoring(self, repeat): while True: for ctrl in self.controllers.values(): self._request_stats(ctrl._datapath) sleep(repeat) def _request_stats(self, datapath): datapath.send_msg( datapath.ofproto_parser.OFPFlowStatsRequest( datapath ) ) @set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER) def _flow_stats_reply_handler(self, ev): global last_process dpid = ev.msg.datapath.id ctrl = self.controllers.get(dpid) if ctrl is not None: ctrl._on_stats(ev, self) with last_process_lock: if last_process is not None: last_process.cancel() last_process = ppe.submit(PathEvaluator(*self._monitored_data())) last_process.add_done_callback(self._update_topo_done) def _monitored_data(self): loads = self.max_speed.calculate( self.last_speed.as_regular_storage, lambda maxspeed, lastspeed: ( lastspeed / max(maxspeed, 0.0000000000001) ) ) flows = self.flow_count.to_regular_storage() self._loads = loads return loads, flows def _update_topo_done(self, future): if not future.cancelled() and future: res = future.result() self._update_topo_done_successfully(res) self._update_performance_state() def _update_topo_done_successfully(self, processed): print(f'-> Topo updated {datetime.datetime.now()}') for (h1, h2), path in processed._pairs.items(): self._paths[(h1, h2)] = path ip1 = id2ip(int(h1[1:])-1) ip2 = id2ip(int(h2[1:])-1) for i in range(len(path)-1): segment = path[i:i+2] if segment[0].startswith('s'): ctrl = self.controllers.get(int(segment[0][1:])) if ctrl is not None: ctrl.add_latency_segment(ip1, ip2, segment[1]) # ctrl.add_latency_segment(ip2, ip1, segment[1]) if segment[1].startswith('s'): ctrl = self.controllers.get(int(segment[1][1:])) if ctrl is not None: # ctrl.add_latency_segment(ip1, ip2, segment[0]) ctrl.add_latency_segment(ip2, ip1, segment[0]) def _update_performance_state(self): sio = StringIO() print(f'@start.path', file=sio) for _, path in sorted(self._paths._pairs.items()): print(repr(path), file=sio) print(f'@end.path', file=sio) print(f'@start.load', file=sio) for (ne1, ne2), load in sorted(self._loads._pairs.items()): print(repr((ne1, ne2, load)), file=sio) print(f'@end.load', file=sio) # print(sio.getvalue(), end='') open(f'{base_net_name}.state', 'a+').close() with open(f'{base_net_name}.state', 'r+') as f: v = sio.getvalue() f.seek(0) f.write(v) f.truncate() def portno_from_list(l): return UsageStore(dict([ (tuple(x[:2]), i+1) for i, x in l ])) def usage_store_from_list(l): return UsageStore(dict([ (tuple(x[:2]), 0) for _, x in l ])) def host_path_from_list(l): return UsageStore(dict([ ((x, y), list()) for x in l for y in l if x != y ])) def bandwidth_from_list(l): return UsageStore(dict([ (tuple(x[:2]), x[2]) for _, x in l ])) def _sort_pair(a: str, b: str) -> Tuple[str, str]: if a[0] == 'h' and b[0] == 's': return b, a # h5,s7 => s7,h5 elif a[0] != b[0]: return a, b # s7,h5 => s7,h5 elif int(a[1:]) > int(b[1:]): return b, a # s7,s5 => s5,s7 else: return a, b # s5,s7 => s5,s7 class PairSorterMixin: def _sort_pair(self, a, b): return _sort_pair(a, b) class UsageStore(PairSorterMixin): def __init__(self, initial=dict()): self._pairs = dict() for k, v in initial.items(): self[k] = v def __getitem__(self, idx) -> float: srtd = self._sort_pair(*idx) pairs = self._pairs if srtd not in pairs: return 0 else: return pairs[srtd] def reverse_lookup(self, val): return [k for k, v in self._pairs.items() if v == val] def calculate(self, other, operation): return UsageStore({ k: operation(v, other[k]) for k, v in self._pairs.items() }) def contains(self, idx) -> bool: return self._sort_pair(*idx) in self._pairs.keys() def __setitem__(self, idx, val: float): self._pairs[self._sort_pair(*idx)] = val def __repr__(self): return f"{type(self).__name__}({repr(self._pairs)})" class UsageStoreProxyFromCallableIterable(PairSorterMixin): def __init__(self, clb, get_behaviour=None): self._callable = clb self._behaviour = get_behaviour def __getitem__(self, idx) -> float: r = list() srtd = self._sort_pair(*idx) for us in self._callable(): v = us._pairs.get(srtd) if v is not None: r.append(v) if self._behaviour is None: return 0 else: return self._behaviour(r) def __setitem__(self, idx, val: float): alo = False srtd = self._sort_pair(*idx) for us in self._callable(): v = us._pairs.get(srtd) if v is not None: us._pairs[srtd] = val alo = True if not alo: raise ValueError( f"There's no existing connection between" + f" {idx[0]} and {idx[1]} " + f"to update" ) def to_regular_storage(self): keys = set([ key for us in self._callable() for key in us._pairs.keys() ]) return UsageStore(dict([(key, self[key]) for key in keys])) @property def as_regular_storage(self): return self.to_regular_storage() class UsageStoreProxyFromFlowDict(UsageStore): def __init__(self, dct, aggregator=sum): self._flows = dct self._agg = aggregator @property def _pairs(self): d = dict() l = [(self._sort_pair(k[1], k[2]), v) for k, v in self._flows.items()] for k, _ in l: if k not in d: d[k] = list() for k, v in l: d[k].append(v) for k in d.keys(): d[k] = self._agg(d[k]) return d def to_regular_storage(self): return UsageStore(self._pairs) @property def as_regular_storage(self): return self.to_regular_storage() class AttrCallableIterable: def __init__(self, callablecollection, attr): self._cc = callablecollection self._attr = attr def __call__(self): for i in self._cc(): yield getattr(i, self._attr) yield from EMPTY_ITER