ufes-20191-redes-mininet/proj-impl/latencycontroller.py
2019-06-28 05:33:02 -03:00

1038 lines
35 KiB
Python

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import os
import sys
import json
import random
import datetime
import threading
from sortundirectednodepair import _sort_pair
from graphtools import Dijkstra, find_all_paths, graph_from_topo
from time import sleep
from io import StringIO
from id2ip import id2ip, ip2id
from typing import Any
from typing import Union
from typing import Tuple
from typing import List
from typing import Dict
from configparser import ConfigParser
from ryu.lib import hub
from ryu.base import app_manager
from ryu.lib import ofctl_v1_3
from ryu.lib.packet import ether_types
from ryu.lib.packet import ethernet
from ryu.ofproto import ofproto_v1_3
from ryu.controller.handler import set_ev_cls
from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller import ofp_event
from ryu.lib.packet import packet
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
EMPTY_ITER = iter(list())
SPEED_KILOBIT = 0.001
SPEED_MEGABIT = 1
SPEED_GIGABIT = 1000
SPEED_TERABIT = 1000000
UNLIMITED_BANDWIDTH = 70*SPEED_TERABIT
print(f"loading network {sys.argv[2]}", file=sys.stderr)
network_topo = ''
with open(sys.argv[2]) as f:
network_topo = json.loads(f.read())
# ryu uses eventlet, which, on some versions, breaks pathlib's read_text
base_net_name = '.'.join(sys.argv[2].split('.')[:-1])
print(f"reading network configuration variables.ini", file=sys.stderr)
network_config = ConfigParser()
with open('variables.ini') as f:
network_config.read_string(f.read())
m1 = float(network_config['linearalgconst']['m1'])
m2 = float(network_config['linearalgconst']['m2'])
hop_delay = float(network_config['linearalgconst']['hop_delay'])
routing_algo = network_config['GENERAL']['algo']
network_graph = graph_from_topo(network_topo)
def jsonload_list2tuple(x):
if isinstance(x, type(None)) or isinstance(x, int) or isinstance(x, float) or isinstance(x, str):
return x
elif isinstance(x, list) or isinstance(x, tuple):
return tuple([jsonload_list2tuple(k) for k in x])
elif isinstance(x, dict):
return {jsonload_list2tuple(k): jsonload_list2tuple(v) for k, v in x.items()}
else:
raise ValueError("Input didn't come from a standard JSON")
def prepare_pop_pair_alternative_paths_for_availability(graph, hosts):
pop_apa = [[None for x in hosts] for y in hosts]
for i, h1 in enumerate(hosts):
for j, h2 in enumerate(hosts):
if pop_apa[i][j] is None:
apa = tuple(list(find_all_paths(graph, h1, h2)))
pop_apa[i][j] = apa
pop_apa[j][i] = apa
return pop_apa
apacachefile = f"{base_net_name}.apa.json"
print(
f"checking if APA for {sys.argv[2]} is cached at {apacachefile}", file=sys.stderr)
pop_apa_candidates = None
if os.path.isfile(apacachefile):
print(f"loading APA from {apacachefile}", file=sys.stderr)
with open(apacachefile) as f:
pop_apa_candidates = jsonload_list2tuple(json.loads(f.read()))
else:
print(f"calculating APA for {sys.argv[2]}", file=sys.stderr)
pop_apa_candidates = prepare_pop_pair_alternative_paths_for_availability(
network_graph,
network_topo[0]
)
print(f"caching APA at {apacachefile}", file=sys.stderr)
with open(apacachefile, 'w') as f:
f.write(json.dumps(pop_apa_candidates))
class LatencyController:
def __init__(self, datapath):
super().__init__()
self._datapath = datapath
self._sw = f"s{self._datapath.id}"
l = [
(i, x) for i, x in enumerate(network_topo[2])
if x[0] == self._sw or x[1] == self._sw
]
self._l = l
self._links = portno_from_list(l)
self._flow_xfer = dict()
self._flow_speed = dict()
self._link_speed = UsageStoreProxyFromFlowDict(self._flow_speed)
self._link_rules = usage_store_from_list(l)
self._bandwidth = bandwidth_from_list(l)
self._ospf = None
def connected(self, la):
print(f"Switch connected: {self._sw}")
print(f"Switch OSPF-fallback loading: {self._sw}")
parser = self._datapath.ofproto_parser
self._ospf = la.ospf_dijkstra(self._sw)
for h in network_topo[0]:
ipv4_dst = id2ip(int(h[1:])-1)
match_ipv4 = parser.OFPMatch(
eth_type=0x0800,
ipv4_dst=ipv4_dst
)
match_arp = parser.OFPMatch(
eth_type=0x0806,
arp_tpa=ipv4_dst
)
next_hop = self._ospf(h)[0][1]
out_port = self._links[(self._sw, next_hop)]
actions = [parser.OFPActionOutput(out_port)]
print(f"{self._sw} --[{out_port}]--> {next_hop} ({ipv4_dst})")
self.add_flow(1, match_ipv4, actions)
self.add_flow(1, match_arp, actions)
print(f"Switch OSPF-fallback loaded: {self._sw}")
if routing_algo == 'ecmp':
print(f"Switch ECMP setting up: {self._sw}")
self._configureECMP(la)
print(f"Switch ECMP set up: {self._sw}")
def disconnected(self):
print(f"Switch disconnected: {self._sw}")
def add_flow(self, priority, match, actions, buffer_id=None, idle_timeout=0, hard_timeout=0):
ofproto = self._datapath.ofproto
parser = self._datapath.ofproto_parser
inst = [parser.OFPInstructionActions(
ofproto.OFPIT_APPLY_ACTIONS,
actions
)]
additionals = dict()
if buffer_id:
additionals['buffer_id'] = buffer_id
if idle_timeout:
additionals['idle_timeout'] = idle_timeout
if hard_timeout:
additionals['hard_timeout'] = hard_timeout
mod = parser.OFPFlowMod(datapath=self._datapath,
priority=priority, match=match,
instructions=inst,
**additionals)
self._datapath.send_msg(mod)
def _the_other_link_endpoint(self, linktuple):
return linktuple[abs(linktuple.index(self._sw)-1)]
def _on_stats(self, ev, la):
self._link_rules = usage_store_from_list(self._l)
parser = self._datapath.ofproto_parser
stats = ev.msg.body
allkeys = []
for k in self._link_rules._pairs.keys():
self._link_rules._pairs[k] = 0
for stat in stats:
# ltp -> link tuple
# ftp -> flow tuple
# bc -> byte count
# obc -> old byte count
action = stat.instructions[0].actions[0]
outs = list()
if isinstance(action, parser.OFPActionGroup):
for bucket in la.ecmp_group_id_buckets[action.group_id]:
outs.append((
bucket.actions[0].port,
bucket.weight/256
))
else:
outs.append((action.port, 1))
for out_port, weight in outs:
ltps = self._links.reverse_lookup(out_port)
if len(ltps) <= 0:
continue
ltp = ltps[0]
nxt = ltp[abs(ltp.index(self._sw)-1)]
# measuring link flows
self._link_rules[ltp] = 1+self._link_rules[ltp]
# measuring load
if stat.match['eth_type'] == 0x0800:
src = stat.match.get('ipv4_src', '10.0.0.0')
dst = stat.match['ipv4_dst']
src = f"h{ip2id(src)+1}"
dst = f"h{ip2id(dst)+1}"
if src == 'h0':
src = None
key = (src, self._sw, nxt, dst)
bc = stat.byte_count
obc = self._flow_xfer.get(key, 0)
tfd = max(0, bc-obc)*weight # bytes per cycle
tfr = ((8*tfd)/la.interval)/10**6 # mbps
self._flow_xfer[key] = bc
self._flow_speed[key] = tfr
allkeys.append(key)
for k in (set(self._flow_speed.keys())-set(allkeys)):
self._flow_speed[k] = 0
# with parts from <https://github.com/wildan2711/multipath/blob/master/ryu_multipath.py>
# commented by <https://wildanmsyah.wordpress.com/2018/01/13/multipath-routing-with-load-balancing-using-ryu-openflow-controller/>
def _configureECMP(self, la):
ofproto = self._datapath.ofproto
parser = self._datapath.ofproto_parser
for h1p, h1 in enumerate(network_topo[0]):
for h2p, h2 in enumerate(network_topo[0]):
if h1p != h2p:
ips = id2ip(int(h1[1:])-1)
ipd = id2ip(int(h2[1:])-1)
match = parser.OFPMatch(
eth_type=0x0800,
ipv4_src=ips,
ipv4_dst=ipd
)
portouts = list()
for path in pop_apa_candidates[h1p][h2p]:
if self._sw not in path:
continue
if path[0] == h2 and path[-1] == h1:
path = list(reversed(path))
nexthop = path[path.index(self._sw)+1]
link = (self._sw, nexthop)
portout = self._links[link]
portouts.append(portout)
if len(portouts) <= 0:
continue
# no output
elif len(portouts) == 1:
actions = [parser.OFPActionOutput(portouts[0])]
self.add_flow(7, match, actions)
# add simple rule
else:
all_bws = [x if x is not None else UNLIMITED_BANDWIDTH for x in [
network_topo[2][portout-1][2] for portout in portouts]]
sum_all_bws = sum(all_bws)
weighted_bws = [bw/sum_all_bws for bw in all_bws]
out_ports = list(zip(portouts, weighted_bws))
del weighted_bws
del sum_all_bws
del portouts
tpl = tuple([h1, self._sw, h2])
gnw = False
if tpl not in la.ecmp_group_ids:
la.ecmp_group_ids[tpl] = len(la.ecmp_group_ids)+1
gnw = True
gid = la.ecmp_group_ids[tpl]
buckets = []
for port, weight in out_ports:
bucket_weight = int(round((1 - weight) * 255))
bucket_action = [parser.OFPActionOutput(port)]
buckets.append(
parser.OFPBucket(
weight=bucket_weight,
watch_port=port,
watch_group=ofproto.OFPG_ANY,
actions=bucket_action
)
)
la.ecmp_group_id_buckets[gid] = buckets
req = parser.OFPGroupMod(
self._datapath,
ofproto.OFPGC_ADD if gnw else ofproto.OFPGC_MODIFY,
ofproto.OFPGT_SELECT,
gid,
buckets
)
self._datapath.send_msg(req)
# set group
actions = [parser.OFPActionGroup(gid)]
self.add_flow(7, match, actions)
# add rule
pass
def add_latency_segment(self, ips, ipd, nxt, ignore_routing_algo=False):
if (routing_algo not in ['ldr', 'minmax']) and not ignore_routing_algo:
return
link = (self._sw, nxt)
parser = self._datapath.ofproto_parser
actions = [parser.OFPActionOutput(self._links[link])]
match = parser.OFPMatch(
eth_type=0x0800,
ipv4_src=ips,
ipv4_dst=ipd
)
self.add_flow(5, match, actions)
# hs = f"h{ip2id(ips)+1}"
# hd = f"h{ip2id(ipd)+1}"
# self._flow_xfer[(hs, hd, nxt)] = 0
@property
def simulatable(self):
return SimulatableSwitch(
self._sw,
self._flow_speed.copy(),
self._bandwidth.copy()
)
class SimulatableSwitch:
def __init__(self, sw: str, fs: dict, bw: 'UsageStore'):
self._sw = sw
self._flow_speed = fs
self._bandwidth = bw
self._link_speed = UsageStoreProxyFromFlowDict(self._flow_speed)
self._reinit()
def _reinit(self):
self._link_rules = self.get_link_rules()
def get_link_rules(self):
cnt = UsageStore()
for key, bw in self._flow_speed.items():
if bw > 0:
k = (key[1], key[2])
cnt[k] = 1+cnt[k]
return cnt
@property
def name(self):
return self._sw
def copy(self):
return type(self)(
self._sw,
self._flow_speed.copy(),
self._bandwidth.copy()
)
class SimulatableNetwork:
def __init__(self, sws: List[SimulatableSwitch], paths: 'UsageStore'):
self.switches: Dict[str, SimulatableSwitch] = {
sw._sw: sw
for sw in sws
}
self.max_speed = bandwidth_from_list(enumerate(network_topo[2]))
self.paths = paths
self._reinit()
def _reinit(self):
self._flow_speed = DictAggregator(
[sw._flow_speed for sw in self.switches.values()]
)
self.last_speed = UsageStoreProxyFromCallableIterable(
AttrCallableIterable(
AttrCallableIterable(
self.switches.values,
'_link_speed'
),
'as_regular_storage'
),
sum
).as_regular_storage
self.link_usage = self.max_speed.calculate(
self.last_speed.as_regular_storage,
lambda maxspeed, lastspeed: (
lastspeed / max(maxspeed, 0.0000000000001)
)
).as_regular_storage
self.link_flows = self.get_link_flows()
def get_link_flows(self):
cnt = UsageStore()
for switch in self.switches.values():
for (_, sw, nxt, __), speed in switch._flow_speed.items():
key = (sw, nxt)
if switch._bandwidth[key] > 0 and speed > 0:
cnt[key] = 1+cnt[key]
return cnt
def get_routes(self):
return self.paths.copy()
def get_path(self, h1, h2):
return self.paths[(h1, h2)]
def copy(self) -> 'SimulatableNetwork':
return type(self)(
[sw.copy() for sw in self.switches.values()],
self.paths.copy()
)
def sort_by_max_flow_load(self, seqs):
wl = list()
for seq in seqs:
ml = self.get_max_flow_load(seq)
wl.append((ml, seq))
return list(map(
lambda a: a[1],
reversed(sorted(wl))
))
def get_max_flow_load(self, seq):
h1, h2 = seq[0], seq[-1]
ml = 0
for i in range(len(seq)-1):
link = tuple(seq[i:i+2])
revlink = tuple(list(reversed(link)))
ml = max(
ml,
(
(
self._flow_speed.get(tuple([h1, *link, h2]), 0)
/
self.max_speed[link]
)+(
self._flow_speed.get(tuple([h2, *revlink, h1]), 0)
/
self.max_speed[revlink]
)
)
)
return ml
def get_max_flow_speed(self, seq):
h1, h2 = seq[0], seq[-1]
ml = 0
for i in range(len(seq)-1):
link = tuple(seq[i:i+2])
revlink = tuple(list(reversed(link)))
ml = max(
ml,
self._flow_speed.get(tuple([h1, *link, h2]), 0)
+
self._flow_speed.get(tuple([h2, *revlink, h1]), 0)
)
return ml
def get_max_path_load(self, seq):
ml = 0
for i in range(len(seq)-1):
link = tuple(seq[i:i+2])
ml = max(
ml,
self.link_usage[link]
)
return ml
def with_modified_path(self, newpath):
mod = self.copy()
oldpath = self.get_path(newpath[0], newpath[-1])
flowspeed = self.get_max_flow_speed(oldpath)
# removing old flows
for sw in mod.switches.values():
for k in sw._flow_speed.copy().keys():
if (
k[0] in [newpath[0], newpath[-1]]
and
k[3] in [newpath[0], newpath[-1]]
):
del sw._flow_speed[k]
# adding new flows
for i in range(1, len(newpath)-1):
ky = newpath[0], newpath[i], newpath[i+1], newpath[-1]
rk = newpath[-1], newpath[i-1], newpath[i], newpath[0]
if ky[1].startswith('s') and ky[1] in mod.switches:
mod.switches[ky[1]]._flow_speed[ky] = flowspeed/2
if rk[1].startswith('s') and rk[1] in mod.switches:
mod.switches[rk[1]]._flow_speed[rk] = flowspeed/2
for sw in mod.switches.values():
sw._reinit()
self.paths[tuple([newpath[0], newpath[-1]])] = newpath
mod._reinit()
return mod
def copy_normalized(self):
net = self.copy()
for path in net.get_routes()._pairs.values():
net = net.with_modified_path(path)
return net
ppe = ProcessPoolExecutor(1)
last_process = None
last_process_lock = threading.Lock()
class AbstractPathEvaluator:
def __init__(self, simulatable_network: SimulatableNetwork):
self._simnet: SimulatableNetwork = simulatable_network
def __call__(self) -> 'UsageStore': pass
class NullPathEvaluator(AbstractPathEvaluator):
def __call__(self) -> 'UsageStore':
return UsageStore()
class LDRPathEvaluator(AbstractPathEvaluator):
def __call__(self) -> 'UsageStore':
return self._figure13(self._simnet.copy())
# return self._figure12(
# self._simnet.link_usage,
# self._simnet.link_flows
# )
def _figure13(self, net: SimulatableNetwork) -> 'UsageStore':
net = net.copy_normalized()
_tmp = self._figure12(
net.link_usage,
net.link_flows
)
target = len(_tmp._pairs)
hkeys = list(_tmp._pairs.keys())
changed = UsageStore()
newnet = None
while len(changed._pairs) < target:
aggregate_paths = self._figure12(
net.link_usage,
net.link_flows
)
loaded_paths = net.sort_by_max_flow_load([
*aggregate_paths._pairs.values()
])
most_loaded = None
for loaded in loaded_paths:
if not changed.contains((loaded[0], loaded[-1])):
most_loaded = loaded
break
if most_loaded is None:
break
newnet = net.with_modified_path(most_loaded)
old_path_for_most_loaded = net.get_path(
most_loaded[0], most_loaded[-1])
if (
newnet.get_max_path_load(most_loaded)
<=
net.get_max_path_load(old_path_for_most_loaded)
):
net = newnet
changed[(most_loaded[0], most_loaded[-1])] = most_loaded
return net.get_routes()
def _figure12(self, link_usage, link_flows) -> 'UsageStore':
processed = UsageStore()
currently_reserved = UsageStore()
ospf = Dijkstra(network_graph)
l1 = network_topo[0][:]
random.shuffle(l1)
for h1 in l1:
l2 = network_topo[0][:]
random.shuffle(l2)
for h2 in l2:
if h1 != h2 and not processed.contains((h1, h2)):
h1p = network_topo[0].index(h1)
h2p = network_topo[0].index(h2)
ospf_aggregate = ospf(h1)(h2)[0]
ospf_delay = sum([
link_usage[
tuple([*ospf_aggregate[i:i+2]])
] + hop_delay
for i in range(len(ospf_aggregate)-1)
])
aggregates = list(pop_apa_candidates[h1p][h2p])
random.shuffle(aggregates)
del h1p
del h2p
min_aggregate = None
min_aggregate_val = None
min_aggregate_links = list()
nalist = list()
na = 0 # flows in aggregate
omax = 0 # maximum overload
for aggregate in aggregates:
links = [
_sort_pair(*aggregate[i:i+2])
for i in range(len(aggregate)-1)
]
omax2 = 0
for link in links:
omax2 = max(omax, link_usage[link])
if link not in nalist:
na += link_flows[link]
na += currently_reserved[link]
nalist.append(link)
pass # fraction of aggregate on path
if omax2 > omax:
omax = omax2
# del nalist
# for aggregate in aggregates:
links = [
_sort_pair(*aggregate[i:i+2])
for i in range(len(aggregate)-1)
]
eps2 = 0
xap = 1
for link in links:
dp = link_usage[link]+hop_delay
pass # path delay
eps2 += xap*(dp+((dp*m1)/ospf_delay))
exp0 = m2*omax
eps3 = 0
eps3 += sum([link_usage[link] for link in links])
aggregate_val = na*eps2+exp0+eps3
if (
min_aggregate_val is None
or
min_aggregate_val > aggregate_val
):
min_aggregate_val = aggregate_val
min_aggregate = aggregate
min_aggregate_links = links
processed[(h1, h2)] = min_aggregate
for link in min_aggregate_links:
currently_reserved[link] = 1+currently_reserved[link]
return processed
class MinMaxPathEvaluator(AbstractPathEvaluator):
def __call__(self) -> 'UsageStore':
return self.minmax(self._simnet.copy())
def minmax(self, net: SimulatableNetwork) -> 'UsageStore':
net = net.copy_normalized()
processed = UsageStore()
l1 = network_topo[0][:]
random.shuffle(l1)
for h1 in l1:
l2 = network_topo[0][:]
random.shuffle(l2)
for h2 in l2:
if h1 != h2 and not processed.contains((h1, h2)):
h1p = network_topo[0].index(h1)
h2p = network_topo[0].index(h2)
aggregates = list(pop_apa_candidates[h1p][h2p])
configurations = list()
for aggregate in aggregates:
links = [
_sort_pair(*aggregate[i:i+2])
for i in range(len(aggregate)-1)
]
newnet = net.with_modified_path(aggregate)
configurations.append((newnet.get_max_path_load(aggregate), len(aggregate), aggregate, newnet))
configurations.sort()
configuration = configurations[0]
processed[(h1, h2)] = configuration[2]
net = configuration[3]
return processed
path_evaluators = {
'ospf': NullPathEvaluator,
'ecmp': NullPathEvaluator,
'ldr': LDRPathEvaluator,
'minmax': MinMaxPathEvaluator,
}
PathEvaluator = path_evaluators[routing_algo]
class LatencyApp(app_manager.RyuApp):
OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.mac_to_port = {}
self.controllers = {}
self._paths = host_path_from_list(network_topo[0])
self._loads = usage_store_from_list(enumerate([]))
self.max_speed = bandwidth_from_list(enumerate(network_topo[2]))
self.last_speed = UsageStoreProxyFromCallableIterable(
AttrCallableIterable(
AttrCallableIterable(
self.controllers.values,
'_link_speed'
),
'as_regular_storage'
),
sum
)
self.flow_count = UsageStoreProxyFromCallableIterable(
AttrCallableIterable(self.controllers.values, '_link_rules'),
sum
)
self.ospf_dijkstra = Dijkstra(network_graph)
self.ecmp_group_ids = dict()
self.ecmp_group_id_buckets = dict()
self._fill_paths_with_ospf()
self.interval = float(network_config['monitoring']['interval'])
hub.spawn(self._start_monitoring, self.interval)
def _fill_paths_with_ospf(self):
for h1 in network_topo[0]:
for h2 in network_topo[0]:
if h1 != h2 and len(self._paths[(h1, h2)]) <= 0:
self._paths[(h1, h2)] = self.ospf_dijkstra(h1)(h2)[0]
@set_ev_cls(ofp_event.EventOFPStateChange, [MAIN_DISPATCHER, DEAD_DISPATCHER])
def _event_ofp_state_change(self, ev):
datapath = ev.datapath
dpid = datapath.id
if ev.state == MAIN_DISPATCHER:
if dpid not in self.controllers:
self.controllers[dpid] = LatencyController(datapath)
self.controllers[dpid].connected(self)
self._request_stats(datapath)
elif ev.state == DEAD_DISPATCHER:
if dpid in self.controllers:
self.controllers[dpid].disconnected()
del self.controllers[dpid]
def _start_monitoring(self, repeat):
while True:
for ctrl in self.controllers.values():
self._request_stats(ctrl._datapath)
sleep(repeat)
def _request_stats(self, datapath):
datapath.send_msg(
datapath.ofproto_parser.OFPFlowStatsRequest(
datapath
)
)
@set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
def _flow_stats_reply_handler(self, ev):
global last_process
dpid = ev.msg.datapath.id
ctrl = self.controllers.get(dpid)
if ctrl is not None:
ctrl._on_stats(ev, self)
with last_process_lock:
if last_process is not None:
last_process.cancel()
last_process = ppe.submit(PathEvaluator(*self._monitored_data()))
last_process.add_done_callback(self._update_topo_done)
def _monitored_data(self):
self._loads = self.max_speed.calculate(
self.last_speed.as_regular_storage,
lambda maxspeed, lastspeed: (
lastspeed / max(maxspeed, 0.0000000000001)
)
)
return (
SimulatableNetwork([
ctrl.simulatable
for ctrl in self.controllers.values()
], self._paths),
)
def _update_topo_done(self, future):
if not future.cancelled() and future:
res = future.result()
self._update_topo_done_successfully(res)
self._update_performance_state()
def _update_topo_done_successfully(self, processed):
print(f'-> Topo updated {datetime.datetime.now()}')
for (h1, h2), path in processed._pairs.items():
self._paths[(h1, h2)] = path
ip1 = id2ip(int(h1[1:])-1)
ip2 = id2ip(int(h2[1:])-1)
for i in range(len(path)-1):
segment = path[i:i+2]
if segment[0].startswith('s'):
ctrl = self.controllers.get(int(segment[0][1:]))
if ctrl is not None:
ctrl.add_latency_segment(ip1, ip2, segment[1])
# ctrl.add_latency_segment(ip2, ip1, segment[1])
if segment[1].startswith('s'):
ctrl = self.controllers.get(int(segment[1][1:]))
if ctrl is not None:
# ctrl.add_latency_segment(ip1, ip2, segment[0])
ctrl.add_latency_segment(ip2, ip1, segment[0])
def _update_performance_state(self):
sio = StringIO()
print(f'@start.path', file=sio)
for _, path in sorted(self._paths._pairs.items()):
print(repr(path), file=sio)
print(f'@end.path', file=sio)
print(f'@start.load', file=sio)
for (ne1, ne2), load in sorted(self._loads._pairs.items()):
print(repr((ne1, ne2, load)), file=sio)
print(f'@end.load', file=sio)
print(sio.getvalue(), end='')
open(f'{base_net_name}.state', 'a+').close()
with open(f'{base_net_name}.state', 'r+') as f:
v = sio.getvalue()
f.seek(0)
f.write(v)
f.truncate()
def portno_from_list(l):
return UsageStore(dict([
(tuple(x[:2]), i+1)
for i, x in l
]))
def usage_store_from_list(l):
return UsageStore(dict([
(tuple(x[:2]), 0)
for _, x in l
]))
def host_path_from_list(l):
return UsageStore(dict([
((x, y), list())
for x in l
for y in l
if x != y
]))
def bandwidth_from_list(l):
return UsageStore(dict([
(tuple(x[:2]), x[2])
for _, x in l
]))
class PairSorterMixin:
def _sort_pair(self, a, b):
return _sort_pair(a, b)
class UsageStore(PairSorterMixin):
def __init__(self, initial=dict()):
self._pairs = dict()
for k, v in initial.items():
self[k] = v
def __getitem__(self, idx) -> float:
srtd = self._sort_pair(*idx)
pairs = self._pairs
if srtd not in pairs:
return 0
else:
return pairs[srtd]
def reverse_lookup(self, val):
return [k for k, v in self._pairs.items() if v == val]
def calculate(self, other, operation):
return UsageStore({
k: operation(v, other[k])
for k, v in self._pairs.items()
})
def contains(self, idx) -> bool:
return self._sort_pair(*idx) in self._pairs.keys()
def __setitem__(self, idx, val: float):
self._pairs[self._sort_pair(*idx)] = val
def __repr__(self):
return f"{type(self).__name__}({repr(self._pairs)})"
def copy(self):
return type(self)(self._pairs.copy())
def to_regular_storage(self):
return self.copy()
@property
def as_regular_storage(self):
return self.to_regular_storage()
class ValueGetterCallable:
def __init__(self, value):
self._val = value
def __call__(self, *args, **kwargs):
return self._val
class UsageStoreProxyFromCallableIterable(PairSorterMixin):
def __init__(self, clb, get_behaviour=None):
self._callable = clb
self._behaviour = get_behaviour
self.is_frozen = False
def __getitem__(self, idx) -> float:
r = list()
srtd = self._sort_pair(*idx)
for us in self._callable():
v = us._pairs.get(srtd)
if v is not None:
r.append(v)
if self._behaviour is None:
return 0
else:
return self._behaviour(r)
def __setitem__(self, idx, val: float):
alo = False
srtd = self._sort_pair(*idx)
for us in self._callable():
v = us._pairs.get(srtd)
if v is not None:
us._pairs[srtd] = val
alo = True
if not alo:
raise ValueError(
f"There's no existing connection between" +
f" {idx[0]} and {idx[1]} " +
f"to update"
)
def to_regular_storage(self):
keys = set([
key
for us in self._callable()
for key in us._pairs.keys()
])
return UsageStore(dict([(key, self[key]) for key in keys]))
@property
def as_regular_storage(self):
return self.to_regular_storage()
@property
def frozen(self):
frz = type(self)(
ValueGetterCallable(self._callable()),
self._behaviour
)
frz.is_frozen = True
return frz
class UsageStoreProxyFromFlowDict(UsageStore):
def __init__(self, dct, aggregator=sum):
self._flows = dct
self._agg = aggregator
@property
def _pairs(self):
d = dict()
l = [(self._sort_pair(k[1], k[2]), v) for k, v in self._flows.items()]
for k, _ in l:
if k not in d:
d[k] = list()
for k, v in l:
d[k].append(v)
for k in d.keys():
d[k] = self._agg(d[k])
return d
def to_regular_storage(self):
return UsageStore(self._pairs)
@property
def as_regular_storage(self):
return self.to_regular_storage()
class AttrCallableIterable:
def __init__(self, callablecollection, attr):
self._cc = callablecollection
self._attr = attr
def __call__(self):
for i in self._cc():
yield getattr(i, self._attr)
yield from EMPTY_ITER
class DictAggregator:
def __init__(self, ds):
self._ds = ds
def __getitem__(self, q):
for d in self._ds:
if q in d:
return d[q]
return self._ds[q]
def get(self, q, e=None):
for d in self._ds:
if q in d:
return d[q]
return e
def keys(self):
return {
k
for d in self._ds
for k in d.keys()
}
def values(self):
return {
v
for d in self._ds
for v in d.values()
}
def items(self):
return {
i
for d in self._ds
for i in d.items()
}
def to_regular_dict(self):
return dict(self.items())
def copy(self):
return type(self)(self._ds.copy())