ufes-20191-redes-mininet/proj-impl/latencycontroller.py

1423 lines
50 KiB
Python
Raw Normal View History

2019-06-05 07:17:36 +00:00
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
2019-06-11 01:19:33 +00:00
import os
2019-06-09 17:11:04 +00:00
import sys
import json
2019-07-01 23:42:25 +00:00
import pulp
import numpy
2019-06-14 07:51:57 +00:00
import random
2019-06-11 01:19:33 +00:00
import datetime
2019-06-14 07:51:57 +00:00
import threading
2019-06-15 06:31:09 +00:00
from sortundirectednodepair import _sort_pair
from graphtools import Dijkstra, find_all_paths, graph_from_topo
2019-06-09 17:11:04 +00:00
from time import sleep
2019-06-14 07:51:57 +00:00
from io import StringIO
2019-06-11 01:19:33 +00:00
from id2ip import id2ip, ip2id
from typing import Any
from typing import Union
from typing import Tuple
from typing import List
from typing import Dict
2019-06-28 21:35:39 +00:00
from typing import Optional
2019-06-09 17:11:04 +00:00
from configparser import ConfigParser
from ryu.lib import hub
2019-06-05 07:17:36 +00:00
from ryu.base import app_manager
from ryu.lib import ofctl_v1_3
from ryu.lib.packet import ether_types
from ryu.lib.packet import ethernet
from ryu.ofproto import ofproto_v1_3
from ryu.controller.handler import set_ev_cls
2019-06-09 17:11:04 +00:00
from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER
2019-06-05 07:17:36 +00:00
from ryu.controller import ofp_event
from ryu.lib.packet import packet
2019-06-09 17:11:04 +00:00
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
2019-07-01 23:42:25 +00:00
NEG_INF = float('-inf')
MIN_POSITIVE_FLOAT = numpy.nextafter(0, 1)
2019-06-11 01:19:33 +00:00
EMPTY_ITER = iter(list())
2019-06-28 08:33:02 +00:00
SPEED_KILOBIT = 0.001
SPEED_MEGABIT = 1
SPEED_GIGABIT = 1000
SPEED_TERABIT = 1000000
UNLIMITED_BANDWIDTH = 70*SPEED_TERABIT
2019-06-28 21:35:39 +00:00
NoneType = type(None)
2019-06-11 01:19:33 +00:00
print(f"loading network {sys.argv[2]}", file=sys.stderr)
2019-06-09 17:11:04 +00:00
network_topo = ''
with open(sys.argv[2]) as f:
network_topo = json.loads(f.read())
# ryu uses eventlet, which, on some versions, breaks pathlib's read_text
2019-06-14 07:51:57 +00:00
base_net_name = '.'.join(sys.argv[2].split('.')[:-1])
2019-07-01 23:42:25 +00:00
with open('~current.state', 'w') as f:
f.write(base_net_name)
2019-07-03 20:52:11 +00:00
with open('~current.sws.state', 'w') as f:
f.write("")
2019-07-05 08:52:59 +00:00
if os.path.exists(f'{base_net_name}.state'):
os.unlink(f'{base_net_name}.state')
2019-06-11 01:19:33 +00:00
print(f"reading network configuration variables.ini", file=sys.stderr)
2019-06-09 17:11:04 +00:00
network_config = ConfigParser()
with open('variables.ini') as f:
network_config.read_string(f.read())
2019-06-11 01:19:33 +00:00
m1 = float(network_config['linearalgconst']['m1'])
m2 = float(network_config['linearalgconst']['m2'])
hop_delay = float(network_config['linearalgconst']['hop_delay'])
2019-07-01 23:42:25 +00:00
apa_path_stretch = float(network_config['APA']['path_stretch'])
2019-06-28 08:33:02 +00:00
routing_algo = network_config['GENERAL']['algo']
2019-06-11 01:19:33 +00:00
2019-06-15 06:31:09 +00:00
network_graph = graph_from_topo(network_topo)
2019-06-11 01:19:33 +00:00
def jsonload_list2tuple(x):
if isinstance(x, type(None)) or isinstance(x, int) or isinstance(x, float) or isinstance(x, str):
return x
elif isinstance(x, list) or isinstance(x, tuple):
return tuple([jsonload_list2tuple(k) for k in x])
elif isinstance(x, dict):
return {jsonload_list2tuple(k): jsonload_list2tuple(v) for k, v in x.items()}
else:
raise ValueError("Input didn't come from a standard JSON")
def prepare_pop_pair_alternative_paths_for_availability(graph, hosts):
pop_apa = [[None for x in hosts] for y in hosts]
for i, h1 in enumerate(hosts):
for j, h2 in enumerate(hosts):
if pop_apa[i][j] is None:
apa = tuple(list(find_all_paths(graph, h1, h2)))
pop_apa[i][j] = apa
pop_apa[j][i] = apa
return pop_apa
2019-06-14 07:51:57 +00:00
apacachefile = f"{base_net_name}.apa.json"
2019-06-11 01:19:33 +00:00
print(
f"checking if APA for {sys.argv[2]} is cached at {apacachefile}", file=sys.stderr)
pop_apa_candidates = None
if os.path.isfile(apacachefile):
print(f"loading APA from {apacachefile}", file=sys.stderr)
with open(apacachefile) as f:
pop_apa_candidates = jsonload_list2tuple(json.loads(f.read()))
else:
print(f"calculating APA for {sys.argv[2]}", file=sys.stderr)
pop_apa_candidates = prepare_pop_pair_alternative_paths_for_availability(
network_graph,
network_topo[0]
)
print(f"caching APA at {apacachefile}", file=sys.stderr)
with open(apacachefile, 'w') as f:
f.write(json.dumps(pop_apa_candidates))
2019-06-09 17:11:04 +00:00
2019-07-05 08:52:59 +00:00
def filter_out_invalid_paths_from_multiple_paths(h1, h2, candidates, sw=None):
valid_paths = list()
valid_paths_bits_rev = set()
for path in sorted(
candidates,
key=lambda a: (len(a), a)
):
if sw is not None and sw not in path:
continue
if path[0] == h2 and path[-1] == h1:
path = list(reversed(path))
path_bits = set([
tuple(path[i:i+2])
for i in range(len(path)-1)
])
if len(path_bits.intersection(valid_paths_bits_rev)) <= 0:
valid_paths.append(path)
valid_paths_bits_rev = valid_paths_bits_rev.union(set([
tuple(reversed(i))
for i in path_bits
]))
return valid_paths
2019-06-09 17:11:04 +00:00
class LatencyController:
def __init__(self, datapath):
super().__init__()
self._datapath = datapath
self._sw = f"s{self._datapath.id}"
l = [
2019-06-11 01:19:33 +00:00
(i, x) for i, x in enumerate(network_topo[2])
2019-06-09 17:11:04 +00:00
if x[0] == self._sw or x[1] == self._sw
]
2019-06-11 01:19:33 +00:00
self._l = l
self._links = portno_from_list(l)
2019-06-14 07:51:57 +00:00
self._flow_xfer = dict()
self._flow_speed = dict()
self._link_speed = UsageStoreProxyFromFlowDict(self._flow_speed)
2019-06-11 01:19:33 +00:00
self._link_rules = usage_store_from_list(l)
2019-06-09 17:11:04 +00:00
self._bandwidth = bandwidth_from_list(l)
2019-06-11 01:19:33 +00:00
self._ospf = None
2019-06-05 07:17:36 +00:00
2019-07-05 08:52:59 +00:00
def connected(self, la: 'LatencyApp'):
2019-06-09 17:11:04 +00:00
print(f"Switch connected: {self._sw}")
2019-06-11 01:19:33 +00:00
print(f"Switch OSPF-fallback loading: {self._sw}")
parser = self._datapath.ofproto_parser
self._ospf = la.ospf_dijkstra(self._sw)
for h in network_topo[0]:
ipv4_dst = id2ip(int(h[1:])-1)
match_ipv4 = parser.OFPMatch(
eth_type=0x0800,
ipv4_dst=ipv4_dst
)
match_arp = parser.OFPMatch(
eth_type=0x0806,
arp_tpa=ipv4_dst
)
next_hop = self._ospf(h)[0][1]
out_port = self._links[(self._sw, next_hop)]
actions = [parser.OFPActionOutput(out_port)]
print(f"{self._sw} --[{out_port}]--> {next_hop} ({ipv4_dst})")
self.add_flow(1, match_ipv4, actions)
self.add_flow(1, match_arp, actions)
print(f"Switch OSPF-fallback loaded: {self._sw}")
2019-06-28 08:33:02 +00:00
if routing_algo == 'ecmp':
print(f"Switch ECMP setting up: {self._sw}")
self._configureECMP(la)
print(f"Switch ECMP set up: {self._sw}")
2019-07-05 08:52:59 +00:00
hub.spawn(self._write_in_initialized_file)
# self._write_in_initialized_file()
def _write_in_initialized_file(self, retries=5):
with initialized_switch_writing_lock:
with open('~current.sws.state', 'a') as f:
f.write(f"{self._sw}{os.linesep}")
if retries>0:
2019-07-07 14:48:16 +00:00
sleep(random.uniform(0.5, 2.5))
2019-07-05 08:52:59 +00:00
self._write_in_initialized_file(retries-1)
2019-06-05 07:17:36 +00:00
2019-06-09 17:11:04 +00:00
def disconnected(self):
print(f"Switch disconnected: {self._sw}")
2019-06-05 07:17:36 +00:00
2019-06-11 01:19:33 +00:00
def add_flow(self, priority, match, actions, buffer_id=None, idle_timeout=0, hard_timeout=0):
ofproto = self._datapath.ofproto
parser = self._datapath.ofproto_parser
2019-06-05 07:17:36 +00:00
2019-06-11 01:19:33 +00:00
inst = [parser.OFPInstructionActions(
ofproto.OFPIT_APPLY_ACTIONS,
actions
)]
additionals = dict()
2019-06-05 07:17:36 +00:00
if buffer_id:
2019-06-11 01:19:33 +00:00
additionals['buffer_id'] = buffer_id
if idle_timeout:
additionals['idle_timeout'] = idle_timeout
if hard_timeout:
additionals['hard_timeout'] = hard_timeout
mod = parser.OFPFlowMod(datapath=self._datapath,
priority=priority, match=match,
instructions=inst,
**additionals)
self._datapath.send_msg(mod)
def _the_other_link_endpoint(self, linktuple):
return linktuple[abs(linktuple.index(self._sw)-1)]
2019-06-05 07:17:36 +00:00
2019-07-01 23:42:25 +00:00
def _on_stats(self, ev, la: 'LatencyApp'):
2019-06-11 01:19:33 +00:00
self._link_rules = usage_store_from_list(self._l)
2019-06-28 08:33:02 +00:00
parser = self._datapath.ofproto_parser
2019-06-11 01:19:33 +00:00
stats = ev.msg.body
2019-06-14 07:51:57 +00:00
allkeys = []
for k in self._link_rules._pairs.keys():
self._link_rules._pairs[k] = 0
2019-06-11 01:19:33 +00:00
for stat in stats:
2019-06-14 07:51:57 +00:00
# ltp -> link tuple
# ftp -> flow tuple
# bc -> byte count
# obc -> old byte count
2019-06-28 08:33:02 +00:00
action = stat.instructions[0].actions[0]
outs = list()
if isinstance(action, parser.OFPActionGroup):
2019-07-01 23:42:25 +00:00
weight_sum = sum([
bucket.weight
for bucket in la.ecmp_group_id_buckets[action.group_id]
])
2019-06-28 08:33:02 +00:00
for bucket in la.ecmp_group_id_buckets[action.group_id]:
outs.append((
bucket.actions[0].port,
2019-07-01 23:42:25 +00:00
bucket.weight/max(1, weight_sum)
2019-06-28 08:33:02 +00:00
))
else:
outs.append((action.port, 1))
for out_port, weight in outs:
ltps = self._links.reverse_lookup(out_port)
if len(ltps) <= 0:
continue
ltp = ltps[0]
nxt = ltp[abs(ltp.index(self._sw)-1)]
# measuring link flows
self._link_rules[ltp] = 1+self._link_rules[ltp]
# measuring load
if stat.match['eth_type'] == 0x0800:
src = stat.match.get('ipv4_src', '10.0.0.0')
dst = stat.match['ipv4_dst']
src = f"h{ip2id(src)+1}"
dst = f"h{ip2id(dst)+1}"
if src == 'h0':
src = None
key = (src, self._sw, nxt, dst)
bc = stat.byte_count
obc = self._flow_xfer.get(key, 0)
tfd = max(0, bc-obc)*weight # bytes per cycle
tfr = ((8*tfd)/la.interval)/10**6 # mbps
self._flow_xfer[key] = bc
self._flow_speed[key] = tfr
allkeys.append(key)
2019-06-14 07:51:57 +00:00
for k in (set(self._flow_speed.keys())-set(allkeys)):
self._flow_speed[k] = 0
2019-06-11 01:19:33 +00:00
2019-06-28 08:33:02 +00:00
# with parts from <https://github.com/wildan2711/multipath/blob/master/ryu_multipath.py>
# commented by <https://wildanmsyah.wordpress.com/2018/01/13/multipath-routing-with-load-balancing-using-ryu-openflow-controller/>
2019-07-01 23:42:25 +00:00
def _configureECMP(self, la: 'LatencyApp'):
2019-06-28 08:33:02 +00:00
ofproto = self._datapath.ofproto
parser = self._datapath.ofproto_parser
2019-07-05 08:52:59 +00:00
prints = list()
2019-06-28 08:33:02 +00:00
for h1p, h1 in enumerate(network_topo[0]):
for h2p, h2 in enumerate(network_topo[0]):
if h1p != h2p:
ips = id2ip(int(h1[1:])-1)
ipd = id2ip(int(h2[1:])-1)
match = parser.OFPMatch(
eth_type=0x0800,
ipv4_src=ips,
ipv4_dst=ipd
)
2019-07-05 08:52:59 +00:00
valid_paths = filter_out_invalid_paths_from_multiple_paths(
h1,
h2,
pop_apa_candidates[h1p][h2p],
self._sw
)
2019-06-28 08:33:02 +00:00
portouts = list()
2019-07-02 03:14:03 +00:00
nexthops = list()
2019-07-05 08:52:59 +00:00
for path in valid_paths:
2019-07-01 23:42:25 +00:00
nexthop = (path[path.index(self._sw)+1], 1.0)
2019-07-02 03:14:03 +00:00
link = (self._sw, nexthop[0])
2019-06-28 08:33:02 +00:00
portout = self._links[link]
portouts.append(portout)
2019-07-02 03:14:03 +00:00
nexthops.append(nexthop[0])
2019-07-05 08:52:59 +00:00
portouts = list(set(portouts))
nexthops = list(set(nexthops))
prints.append(
f"({ips}) {self._sw} --{portouts}-> {set(nexthops)} ({ipd})"
)
2019-06-28 08:33:02 +00:00
if len(portouts) <= 0:
continue
# no output
elif len(portouts) == 1:
actions = [parser.OFPActionOutput(portouts[0])]
2019-07-02 03:14:03 +00:00
self.add_flow(7, match, actions)
2019-06-28 08:33:02 +00:00
# add simple rule
else:
all_bws = [x if x is not None else UNLIMITED_BANDWIDTH for x in [
network_topo[2][portout-1][2] for portout in portouts]]
sum_all_bws = sum(all_bws)
weighted_bws = [bw/sum_all_bws for bw in all_bws]
out_ports = list(zip(portouts, weighted_bws))
del weighted_bws
del sum_all_bws
del portouts
tpl = tuple([h1, self._sw, h2])
gnw = False
if tpl not in la.ecmp_group_ids:
la.ecmp_group_ids[tpl] = len(la.ecmp_group_ids)+1
gnw = True
gid = la.ecmp_group_ids[tpl]
buckets = []
for port, weight in out_ports:
2019-07-05 08:52:59 +00:00
bucket_weight = int(
round(
65535 * min(
1.0,
max(
0.0,
(1 - weight)
)
)
)
)
2019-06-28 08:33:02 +00:00
bucket_action = [parser.OFPActionOutput(port)]
buckets.append(
parser.OFPBucket(
weight=bucket_weight,
watch_port=port,
watch_group=ofproto.OFPG_ANY,
actions=bucket_action
)
)
2019-07-01 23:42:25 +00:00
2019-06-28 08:33:02 +00:00
la.ecmp_group_id_buckets[gid] = buckets
req = parser.OFPGroupMod(
self._datapath,
ofproto.OFPGC_ADD if gnw else ofproto.OFPGC_MODIFY,
ofproto.OFPGT_SELECT,
gid,
buckets
)
self._datapath.send_msg(req)
# set group
actions = [parser.OFPActionGroup(gid)]
2019-07-02 03:14:03 +00:00
self.add_flow(7, match, actions)
2019-06-28 08:33:02 +00:00
# add rule
2019-07-05 08:52:59 +00:00
print(os.linesep.join(prints))
2019-06-28 08:33:02 +00:00
pass
2019-07-01 23:42:25 +00:00
def add_latency_segment(self, ips: str, ipd: str, nxt: str, ignore_routing_algo: bool = False):
2019-06-28 08:33:02 +00:00
if (routing_algo not in ['ldr', 'minmax']) and not ignore_routing_algo:
2019-06-14 07:51:57 +00:00
return
link = (self._sw, nxt)
2019-06-11 01:19:33 +00:00
parser = self._datapath.ofproto_parser
2019-06-14 07:51:57 +00:00
actions = [parser.OFPActionOutput(self._links[link])]
2019-06-11 01:19:33 +00:00
match = parser.OFPMatch(
eth_type=0x0800,
ipv4_src=ips,
ipv4_dst=ipd
)
self.add_flow(5, match, actions)
2019-06-14 07:51:57 +00:00
# hs = f"h{ip2id(ips)+1}"
# hd = f"h{ip2id(ipd)+1}"
# self._flow_xfer[(hs, hd, nxt)] = 0
2019-07-03 20:52:11 +00:00
2019-07-01 23:42:25 +00:00
def add_weighted_latency(self, la: 'LatencyApp', ips: str, ipd: str, nxts: Dict[str, float]):
2019-07-03 20:52:11 +00:00
if len(nxts) <= 0:
2019-07-01 23:42:25 +00:00
return
2019-07-03 20:52:11 +00:00
elif len(nxts) == 1:
2019-07-01 23:42:25 +00:00
self.add_latency_segment(ips, ipd, list(nxts.keys())[0])
else:
ofproto = self._datapath.ofproto
parser = self._datapath.ofproto_parser
match = parser.OFPMatch(
eth_type=0x0800,
ipv4_src=ips,
ipv4_dst=ipd
)
h1 = f"h{ip2id(ips)+1}"
h2 = f"h{ip2id(ipd)+1}"
out_ports = [
(self._links[(self._sw, nexthop)], weight)
for nexthop, weight in nxts.items()
]
tpl = tuple([h1, self._sw, h2])
gnw = False
if tpl not in la.ecmp_group_ids:
la.ecmp_group_ids[tpl] = len(la.ecmp_group_ids)+1
gnw = True
gid = la.ecmp_group_ids[tpl]
buckets = []
for port, weight in out_ports:
bucket_weight = int(round(weight * 65535))
bucket_action = [parser.OFPActionOutput(port)]
buckets.append(
parser.OFPBucket(
weight=bucket_weight,
watch_port=port,
watch_group=ofproto.OFPG_ANY,
actions=bucket_action
)
)
la.ecmp_group_id_buckets[gid] = buckets
req = parser.OFPGroupMod(
self._datapath,
ofproto.OFPGC_ADD if gnw else ofproto.OFPGC_MODIFY,
ofproto.OFPGT_SELECT,
gid,
buckets
)
self._datapath.send_msg(req)
# set group
actions = [parser.OFPActionGroup(gid)]
self.add_flow(5, match, actions)
# add rule
2019-06-05 07:17:36 +00:00
2019-06-18 06:50:25 +00:00
@property
def simulatable(self):
return SimulatableSwitch(
self._sw,
self._flow_speed.copy(),
self._bandwidth.copy()
)
class SimulatableSwitch:
def __init__(self, sw: str, fs: dict, bw: 'UsageStore'):
self._sw = sw
self._flow_speed = fs
self._bandwidth = bw
self._link_speed = UsageStoreProxyFromFlowDict(self._flow_speed)
self._reinit()
def _reinit(self):
self._link_rules = self.get_link_rules()
def get_link_rules(self):
cnt = UsageStore()
for key, bw in self._flow_speed.items():
if bw > 0:
k = (key[1], key[2])
cnt[k] = 1+cnt[k]
return cnt
@property
def name(self):
return self._sw
def copy(self):
return type(self)(
self._sw,
self._flow_speed.copy(),
self._bandwidth.copy()
)
class SimulatableNetwork:
def __init__(self, sws: List[SimulatableSwitch], paths: 'UsageStore'):
self.switches: Dict[str, SimulatableSwitch] = {
sw._sw: sw
for sw in sws
}
self.max_speed = bandwidth_from_list(enumerate(network_topo[2]))
self.paths = paths
self._reinit()
def _reinit(self):
self._flow_speed = DictAggregator(
[sw._flow_speed for sw in self.switches.values()]
)
self.last_speed = UsageStoreProxyFromCallableIterable(
AttrCallableIterable(
AttrCallableIterable(
self.switches.values,
'_link_speed'
),
'as_regular_storage'
),
sum
).as_regular_storage
self.link_usage = self.max_speed.calculate(
self.last_speed.as_regular_storage,
lambda maxspeed, lastspeed: (
lastspeed / max(maxspeed, 0.0000000000001)
)
).as_regular_storage
self.link_flows = self.get_link_flows()
def get_link_flows(self):
cnt = UsageStore()
for switch in self.switches.values():
for (_, sw, nxt, __), speed in switch._flow_speed.items():
key = (sw, nxt)
if switch._bandwidth[key] > 0 and speed > 0:
cnt[key] = 1+cnt[key]
return cnt
def get_routes(self):
return self.paths.copy()
def get_path(self, h1, h2):
return self.paths[(h1, h2)]
def copy(self) -> 'SimulatableNetwork':
return type(self)(
[sw.copy() for sw in self.switches.values()],
self.paths.copy()
)
def sort_by_max_flow_load(self, seqs):
wl = list()
for seq in seqs:
2019-07-01 23:42:25 +00:00
if not isinstance(seq, WeightedPathAggregate):
seq = WeightedPathAggregate({tuple(seq): 1.0})
2019-06-18 06:50:25 +00:00
ml = self.get_max_flow_load(seq)
2019-07-01 23:42:25 +00:00
wl.append((
ml,
sorted(seq.keys(), key=lambda a: len(a))[0],
seq
))
2019-06-18 06:50:25 +00:00
return list(map(
2019-07-01 23:42:25 +00:00
lambda a: a[2],
2019-06-18 06:50:25 +00:00
reversed(sorted(wl))
))
2019-07-01 23:42:25 +00:00
def get_max_flow_load(self, wpa):
h1, h2 = _sort_pair(wpa[0], wpa[-1])
2019-06-18 06:50:25 +00:00
ml = 0
2019-07-01 23:42:25 +00:00
if not isinstance(wpa, WeightedPathAggregate):
wpa = WeightedPathAggregate({tuple(wpa): 1.0})
for seq, weight in wpa.items():
for i in range(len(seq)-1):
link = tuple(seq[i:i+2])
revlink = tuple(list(reversed(link)))
ml = max(
ml,
((
(
self._flow_speed.get(tuple([h1, *link, h2]), 0)
/
self.max_speed[link]
)+(
self._flow_speed.get(tuple([h2, *revlink, h1]), 0)
/
self.max_speed[revlink]
)
))*weight
2019-06-18 06:50:25 +00:00
)
return ml
2019-07-01 23:42:25 +00:00
def get_max_flow_speed(self, wpa):
h1, h2 = _sort_pair(wpa[0], wpa[-1])
if not isinstance(wpa, WeightedPathAggregate):
wpa = WeightedPathAggregate({tuple(wpa): 1.0})
2019-06-18 06:50:25 +00:00
ml = 0
2019-07-01 23:42:25 +00:00
for seq, weight in wpa.items():
for i in range(len(seq)-1):
link = tuple(seq[i:i+2])
revlink = tuple(list(reversed(link)))
ml = max(
ml,
((
2019-07-03 20:52:11 +00:00
self._flow_speed.get(tuple([h1, *link, h2]), 0)
+
self._flow_speed.get(tuple([h2, *revlink, h1]), 0)
2019-07-01 23:42:25 +00:00
))*weight
)
2019-06-18 06:50:25 +00:00
return ml
2019-07-01 23:42:25 +00:00
def get_max_path_load(self, wpa):
2019-06-18 06:50:25 +00:00
ml = 0
2019-07-01 23:42:25 +00:00
if not isinstance(wpa, WeightedPathAggregate):
wpa = WeightedPathAggregate({tuple(wpa): 1.0})
for seq in wpa.keys():
for i in range(len(seq)-1):
link = tuple(seq[i:i+2])
ml = max(
ml,
self.link_usage[link]
)
2019-06-18 06:50:25 +00:00
return ml
def with_modified_path(self, newpath):
mod = self.copy()
oldpath = self.get_path(newpath[0], newpath[-1])
flowspeed = self.get_max_flow_speed(oldpath)
# removing old flows
for sw in mod.switches.values():
for k in sw._flow_speed.copy().keys():
if (
k[0] in [newpath[0], newpath[-1]]
and
k[3] in [newpath[0], newpath[-1]]
):
del sw._flow_speed[k]
# adding new flows
2019-07-01 23:42:25 +00:00
if not isinstance(newpath, WeightedPathAggregate):
newpath = WeightedPathAggregate({tuple(newpath): 1.0})
for newpath2, weight in newpath.items():
for i in range(1, len(newpath2)-1):
ky = newpath2[0], newpath2[i], newpath2[i+1], newpath2[-1]
rk = newpath2[-1], newpath2[i-1], newpath2[i], newpath2[0]
if ky[1].startswith('s') and ky[1] in mod.switches:
mod.switches[ky[1]]._flow_speed[ky] = (weight*flowspeed)/2
if rk[1].startswith('s') and rk[1] in mod.switches:
mod.switches[rk[1]]._flow_speed[rk] = (weight*flowspeed)/2
2019-06-18 06:50:25 +00:00
for sw in mod.switches.values():
sw._reinit()
self.paths[tuple([newpath[0], newpath[-1]])] = newpath
mod._reinit()
return mod
2019-06-28 08:33:02 +00:00
def copy_normalized(self):
net = self.copy()
for path in net.get_routes()._pairs.values():
net = net.with_modified_path(path)
return net
2019-07-03 20:52:11 +00:00
2019-07-01 23:42:25 +00:00
# Gvozdiev et al @ SIGCOMM2018, p. 94 => Algorithm 1
def copy_scaling(self, prev_prediction: Dict[Tuple[str], float], fixed_hedge: float, decay_multiplier: float) -> Tuple['SimulatableNetwork', Dict[Tuple[str], float]]:
if prev_prediction is None:
prev_prediction = dict()
net = self.copy()
next_prediction = dict()
for sw in net.switches.values():
for idtf, prev_value in sw._flow_speed.items():
scaled_est = prev_value*fixed_hedge
if scaled_est > prev_prediction.get(idtf, 0):
next_prediction[idtf] = scaled_est
else:
2019-07-03 20:52:11 +00:00
decay_prediction = prev_prediction.get(
idtf, 0) * decay_multiplier
2019-07-01 23:42:25 +00:00
next_prediction[idtf] = max(decay_prediction, scaled_est)
for sw in net.switches.values():
for idtf in sw._flow_speed.keys():
sw._flow_speed[idtf] = next_prediction[idtf]
sw._reinit()
net._reinit()
return net, next_prediction
2019-06-28 08:33:02 +00:00
2019-06-05 07:17:36 +00:00
2019-06-28 08:33:02 +00:00
class AbstractPathEvaluator:
2019-06-28 21:35:39 +00:00
def __init__(self, simulatable_network: SimulatableNetwork, keepdata: Optional[Any]):
2019-06-28 08:33:02 +00:00
self._simnet: SimulatableNetwork = simulatable_network
2019-06-28 21:35:39 +00:00
self._keepdata = keepdata
2019-06-28 08:33:02 +00:00
2019-06-28 21:35:39 +00:00
def __call__(self) -> Tuple['UsageStore', NoneType]: pass
2019-06-11 01:19:33 +00:00
2019-06-28 08:33:02 +00:00
class NullPathEvaluator(AbstractPathEvaluator):
2019-06-28 21:35:39 +00:00
def __call__(self) -> Tuple['UsageStore', NoneType]:
return UsageStore(), None
2019-06-28 08:33:02 +00:00
2019-07-01 23:42:25 +00:00
class LDRSinglePathEvaluator(AbstractPathEvaluator):
2019-06-28 21:35:39 +00:00
def __call__(self) -> Tuple['UsageStore', NoneType]:
return self._figure13(self._simnet.copy()), None
2019-06-18 06:50:25 +00:00
# return self._figure12(
# self._simnet.link_usage,
# self._simnet.link_flows
# )
2019-06-28 08:33:02 +00:00
def _figure13(self, net: SimulatableNetwork) -> 'UsageStore':
net = net.copy_normalized()
2019-06-18 06:50:25 +00:00
_tmp = self._figure12(
net.link_usage,
net.link_flows
)
target = len(_tmp._pairs)
changed = UsageStore()
newnet = None
while len(changed._pairs) < target:
aggregate_paths = self._figure12(
net.link_usage,
net.link_flows
)
loaded_paths = net.sort_by_max_flow_load([
2019-06-28 08:33:02 +00:00
*aggregate_paths._pairs.values()
2019-06-18 06:50:25 +00:00
])
most_loaded = None
for loaded in loaded_paths:
if not changed.contains((loaded[0], loaded[-1])):
most_loaded = loaded
break
if most_loaded is None:
break
newnet = net.with_modified_path(most_loaded)
2019-06-28 08:33:02 +00:00
old_path_for_most_loaded = net.get_path(
most_loaded[0], most_loaded[-1])
2019-06-18 06:50:25 +00:00
if (
newnet.get_max_path_load(most_loaded)
<=
net.get_max_path_load(old_path_for_most_loaded)
):
net = newnet
changed[(most_loaded[0], most_loaded[-1])] = most_loaded
2019-06-19 16:35:09 +00:00
return net.get_routes()
2019-06-18 06:50:25 +00:00
2019-06-28 08:33:02 +00:00
def _figure12(self, link_usage, link_flows) -> 'UsageStore':
2019-06-11 01:19:33 +00:00
processed = UsageStore()
2019-06-14 07:51:57 +00:00
currently_reserved = UsageStore()
2019-06-11 01:19:33 +00:00
ospf = Dijkstra(network_graph)
2019-06-15 06:31:09 +00:00
l1 = network_topo[0][:]
random.shuffle(l1)
for h1 in l1:
l2 = network_topo[0][:]
random.shuffle(l2)
for h2 in l2:
2019-06-11 01:19:33 +00:00
if h1 != h2 and not processed.contains((h1, h2)):
h1p = network_topo[0].index(h1)
h2p = network_topo[0].index(h2)
ospf_aggregate = ospf(h1)(h2)[0]
2019-06-18 06:50:25 +00:00
ospf_delay = sum([
link_usage[
2019-06-11 01:19:33 +00:00
tuple([*ospf_aggregate[i:i+2]])
] + hop_delay
2019-06-18 06:50:25 +00:00
for i in range(len(ospf_aggregate)-1)
])
2019-06-14 07:51:57 +00:00
aggregates = list(pop_apa_candidates[h1p][h2p])
random.shuffle(aggregates)
2019-06-18 06:50:25 +00:00
del h1p
del h2p
2019-06-11 01:19:33 +00:00
min_aggregate = None
min_aggregate_val = None
2019-06-18 06:50:25 +00:00
min_aggregate_links = list()
nalist = list()
na = 0 # flows in aggregate
omax = 0 # maximum overload
2019-06-11 01:19:33 +00:00
for aggregate in aggregates:
2019-06-18 06:50:25 +00:00
links = [
_sort_pair(*aggregate[i:i+2])
for i in range(len(aggregate)-1)
]
omax2 = 0
for link in links:
2019-07-01 23:42:25 +00:00
omax2 = max(omax2, link_usage[link])
2019-06-18 06:50:25 +00:00
if link not in nalist:
na += link_flows[link]
na += currently_reserved[link]
nalist.append(link)
2019-06-11 01:19:33 +00:00
pass # fraction of aggregate on path
2019-06-18 06:50:25 +00:00
if omax2 > omax:
omax = omax2
# del nalist
# for aggregate in aggregates:
links = [
_sort_pair(*aggregate[i:i+2])
for i in range(len(aggregate)-1)
]
2019-06-11 01:19:33 +00:00
eps2 = 0
xap = 1
2019-06-18 06:50:25 +00:00
for link in links:
dp = link_usage[link]+hop_delay
2019-06-11 01:19:33 +00:00
pass # path delay
eps2 += xap*(dp+((dp*m1)/ospf_delay))
exp0 = m2*omax
eps3 = 0
2019-06-18 06:50:25 +00:00
eps3 += sum([link_usage[link] for link in links])
2019-06-11 01:19:33 +00:00
aggregate_val = na*eps2+exp0+eps3
2019-06-18 06:50:25 +00:00
if (
min_aggregate_val is None
or
min_aggregate_val > aggregate_val
):
2019-06-11 01:19:33 +00:00
min_aggregate_val = aggregate_val
min_aggregate = aggregate
2019-06-18 06:50:25 +00:00
min_aggregate_links = links
2019-06-11 01:19:33 +00:00
processed[(h1, h2)] = min_aggregate
2019-06-18 06:50:25 +00:00
for link in min_aggregate_links:
2019-06-14 07:51:57 +00:00
currently_reserved[link] = 1+currently_reserved[link]
2019-06-11 01:19:33 +00:00
return processed
2019-07-01 23:42:25 +00:00
class LDRPathEvaluator(AbstractPathEvaluator):
def __call__(self) -> Tuple['UsageStore', NoneType]:
net, kd = self._simnet.copy_scaling(self._keepdata, 1.1, 0.98)
return self._figure12(net.copy()), kd
def _figure13(self, net: SimulatableNetwork) -> 'UsageStore':
net = net.copy_normalized()
_tmp = self._figure12(net.copy())
target = len(_tmp._pairs)
changed = UsageStore()
newnet = None
while len(changed._pairs) < target:
aggregate_paths = self._figure12(net.copy())
loaded_paths = net.sort_by_max_flow_load([
*aggregate_paths._pairs.values()
])
most_loaded = None
for loaded in loaded_paths:
if not changed.contains((loaded[0], loaded[-1])):
most_loaded = loaded
break
if most_loaded is None:
break
newnet = net.with_modified_path(most_loaded)
old_path_for_most_loaded = net.get_path(
most_loaded[0], most_loaded[-1])
if (
newnet.get_max_path_load(most_loaded)
<=
net.get_max_path_load(old_path_for_most_loaded)
):
net = newnet
changed[(most_loaded[0], most_loaded[-1])] = most_loaded
return net.get_routes()
def _figure12(self, net: SimulatableNetwork) -> 'UsageStore':
link_usage = net.link_usage
link_flows = net.link_flows
ospf = Dijkstra(network_graph)
opt_model = pulp.LpProblem("LDR", pulp.LpMinimize)
xap = pulp.LpVariable("xa", 0, 1)
processed = UsageStore(default=dict())
l1 = network_topo[0][:]
random.shuffle(l1)
sumaggregates = 0
for h1 in l1:
l2 = network_topo[0][:]
random.shuffle(l2)
for h2 in l2:
if h1 != h2 and not processed.contains((h1, h2)):
h1, h2 = _sort_pair(h1, h2)
opt_vars = dict()
processed[(h1, h2)] = opt_vars
h1p = network_topo[0].index(h1)
h2p = network_topo[0].index(h2)
ospf_aggregate = ospf(h1)(h2)[0]
ospf_delay = sum([
link_usage[
tuple([*ospf_aggregate[i:i+2]])
] + hop_delay
for i in range(len(ospf_aggregate)-1)
])
aggregates = list(pop_apa_candidates[h1p][h2p])[:]
for aggregate in aggregates[:]:
links = [
_sort_pair(*aggregate[i:i+2])
for i in range(len(aggregate)-1)
]
dp = sum(
[link_usage[link]+hop_delay for link in links])
if (ospf_delay == 0) or ((dp/ospf_delay) > apa_path_stretch):
del aggregates[aggregates.index(aggregate)]
random.shuffle(aggregates)
sumpinpa = 0
sumxappinpa = 0
na = sum([
link_flows[link]
for aggregate in aggregates
for link in [
_sort_pair(*aggregate[i:i+2])
for i in range(len(aggregate)-1)
]
])
na = max(na, MIN_POSITIVE_FLOAT)
for aggregate in aggregates:
links = [
_sort_pair(*aggregate[i:i+2])
for i in range(len(aggregate)-1)
]
xap = pulp.LpVariable(
f"xap_{h1}_{h2}__{'_'.join(aggregate)}", 0, 1)
opt_vars[tuple(aggregate)] = xap
omax = max(0, *[link_usage[link] for link in links])
olinks = sum([link_usage[link] for link in links])
dp = sum(
[link_usage[link]+hop_delay for link in links])
tiebreaker = (dp+((dp*m1)/ospf_delay))
sumxappinpa += xap
sumpinpa += xap*(tiebreaker+(m2*omax)+olinks)
opt_model += 0 <= xap <= 1
opt_model += sumxappinpa == 1
sumaggregates += na*sumpinpa
opt_model += sumaggregates
opt_model.solve()
solution = {
(tuple(k[4:].split('__')[0].split('_')), tuple(k[4:].split('__')[1].split('_'))):
v.varValue
for k, v in opt_model.variablesDict().items()
if k.startswith('xap_')
}
# print(opt_model)
d = dict()
for (tpl, path), weight in solution.items():
if tpl not in d:
d[tpl] = dict()
d[tpl][path] = weight
weighted_result = UsageStore(initial={
k: WeightedPathAggregate(v)
for k, v in d.items()
})
# print(weighted_result)
# opt_model.solve() # solving with CBC
# opt_model.solve(solver=pulp.GLPK_CMD()) # solving with Glpk
return weighted_result
class MinMaxSinglePathEvaluator(AbstractPathEvaluator):
2019-06-28 21:35:39 +00:00
def __call__(self) -> Tuple['UsageStore', NoneType]:
return self.minmax(self._simnet.copy()), None
2019-06-28 08:33:02 +00:00
def minmax(self, net: SimulatableNetwork) -> 'UsageStore':
net = net.copy_normalized()
processed = UsageStore()
l1 = network_topo[0][:]
random.shuffle(l1)
for h1 in l1:
l2 = network_topo[0][:]
random.shuffle(l2)
for h2 in l2:
if h1 != h2 and not processed.contains((h1, h2)):
h1p = network_topo[0].index(h1)
h2p = network_topo[0].index(h2)
aggregates = list(pop_apa_candidates[h1p][h2p])
configurations = list()
for aggregate in aggregates:
links = [
_sort_pair(*aggregate[i:i+2])
for i in range(len(aggregate)-1)
]
newnet = net.with_modified_path(aggregate)
2019-07-05 08:52:59 +00:00
configurations.append((
newnet.get_max_path_load(aggregate),
len(aggregate),
aggregate,
newnet
))
2019-06-28 08:33:02 +00:00
configurations.sort()
configuration = configurations[0]
processed[(h1, h2)] = configuration[2]
net = configuration[3]
return processed
path_evaluators = {
'ospf': NullPathEvaluator,
2019-07-01 23:42:25 +00:00
'ldr-single': LDRSinglePathEvaluator,
'minmax-single': MinMaxSinglePathEvaluator,
2019-06-28 08:33:02 +00:00
'ecmp': NullPathEvaluator,
'ldr': LDRPathEvaluator,
}
PathEvaluator = path_evaluators[routing_algo]
2019-07-05 08:52:59 +00:00
ppe = ProcessPoolExecutor(1)
last_process = None
last_process_lock = threading.Lock()
initialized_switch_writing_lock = threading.Lock()
2019-06-28 08:33:02 +00:00
2019-06-09 17:11:04 +00:00
class LatencyApp(app_manager.RyuApp):
OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
2019-06-05 07:17:36 +00:00
2019-06-09 17:11:04 +00:00
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.mac_to_port = {}
self.controllers = {}
2019-06-14 07:51:57 +00:00
self._paths = host_path_from_list(network_topo[0])
self._loads = usage_store_from_list(enumerate([]))
2019-06-11 01:19:33 +00:00
self.max_speed = bandwidth_from_list(enumerate(network_topo[2]))
self.last_speed = UsageStoreProxyFromCallableIterable(
2019-06-14 07:51:57 +00:00
AttrCallableIterable(
AttrCallableIterable(
self.controllers.values,
'_link_speed'
),
'as_regular_storage'
),
2019-06-11 01:19:33 +00:00
sum
2019-06-09 17:11:04 +00:00
)
2019-06-11 01:19:33 +00:00
self.flow_count = UsageStoreProxyFromCallableIterable(
AttrCallableIterable(self.controllers.values, '_link_rules'),
sum
)
self.ospf_dijkstra = Dijkstra(network_graph)
2019-06-28 08:33:02 +00:00
self.ecmp_group_ids = dict()
self.ecmp_group_id_buckets = dict()
2019-06-28 21:35:39 +00:00
self._last_process_data = None
2019-06-11 01:19:33 +00:00
self.interval = float(network_config['monitoring']['interval'])
hub.spawn(self._start_monitoring, self.interval)
2019-06-09 17:11:04 +00:00
@set_ev_cls(ofp_event.EventOFPStateChange, [MAIN_DISPATCHER, DEAD_DISPATCHER])
def _event_ofp_state_change(self, ev):
datapath = ev.datapath
dpid = datapath.id
if ev.state == MAIN_DISPATCHER:
if dpid not in self.controllers:
self.controllers[dpid] = LatencyController(datapath)
2019-06-11 01:19:33 +00:00
self.controllers[dpid].connected(self)
self._request_stats(datapath)
2019-06-09 17:11:04 +00:00
elif ev.state == DEAD_DISPATCHER:
if dpid in self.controllers:
self.controllers[dpid].disconnected()
del self.controllers[dpid]
def _start_monitoring(self, repeat):
2019-07-05 08:52:59 +00:00
global last_process
2019-06-09 17:11:04 +00:00
while True:
2019-07-05 08:52:59 +00:00
print("Monitoring loop called")
2019-06-09 17:11:04 +00:00
for ctrl in self.controllers.values():
2019-07-01 23:42:25 +00:00
hub.spawn(self._request_stats, ctrl._datapath)
hub.spawn(self._update_performance_state)
2019-07-05 08:52:59 +00:00
hub.spawn(self._trigger_path_refresh)
sleep(repeat)
def _trigger_path_refresh(self):
with last_process_lock:
# if last_process is not None:
# last_process.cancel()
last_process = ppe.submit(
PathEvaluator(
*self._monitored_data(),
self._last_process_data
)
)
last_process.add_done_callback(self._update_topo_done)
2019-06-09 17:11:04 +00:00
def _request_stats(self, datapath):
datapath.send_msg(
2019-06-11 01:19:33 +00:00
datapath.ofproto_parser.OFPFlowStatsRequest(
datapath
)
)
2019-06-09 17:11:04 +00:00
@set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
def _flow_stats_reply_handler(self, ev):
global last_process
dpid = ev.msg.datapath.id
ctrl = self.controllers.get(dpid)
if ctrl is not None:
ctrl._on_stats(ev, self)
2019-06-11 01:19:33 +00:00
def _monitored_data(self):
2019-06-18 06:50:25 +00:00
self._loads = self.max_speed.calculate(
2019-06-14 07:51:57 +00:00
self.last_speed.as_regular_storage,
lambda maxspeed, lastspeed: (
lastspeed / max(maxspeed, 0.0000000000001)
)
2019-06-11 01:19:33 +00:00
)
2019-06-18 06:50:25 +00:00
return (
SimulatableNetwork([
ctrl.simulatable
for ctrl in self.controllers.values()
], self._paths),
)
2019-06-09 17:11:04 +00:00
def _update_topo_done(self, future):
2019-07-05 08:52:59 +00:00
if not future.cancelled():
2019-06-28 21:35:39 +00:00
res, keepdata = future.result()
2019-06-11 01:19:33 +00:00
self._update_topo_done_successfully(res)
2019-06-28 21:35:39 +00:00
self._last_process_data = keepdata
2019-07-05 08:52:59 +00:00
self._update_performance_state()
else:
print(future.exception)
2019-06-09 17:11:04 +00:00
2019-06-11 01:19:33 +00:00
def _update_topo_done_successfully(self, processed):
print(f'-> Topo updated {datetime.datetime.now()}')
for (h1, h2), path in processed._pairs.items():
2019-06-14 07:51:57 +00:00
self._paths[(h1, h2)] = path
2019-06-11 01:19:33 +00:00
ip1 = id2ip(int(h1[1:])-1)
ip2 = id2ip(int(h2[1:])-1)
2019-07-01 23:42:25 +00:00
if isinstance(path, WeightedPathAggregate):
for src, weighted_destinations in path.transitions(h1).items():
if src.startswith('s'):
ctrl = self.controllers.get(int(src[1:]))
if ctrl is not None:
2019-07-03 20:52:11 +00:00
ctrl.add_weighted_latency(
self, ip1, ip2, weighted_destinations)
2019-07-01 23:42:25 +00:00
for src, weighted_destinations in path.transitions(h2).items():
if src.startswith('s'):
ctrl = self.controllers.get(int(src[1:]))
if ctrl is not None:
2019-07-03 20:52:11 +00:00
ctrl.add_weighted_latency(
self, ip2, ip1, weighted_destinations)
2019-07-01 23:42:25 +00:00
else:
for i in range(len(path)-1):
segment = path[i:i+2]
if segment[0].startswith('s'):
ctrl = self.controllers.get(int(segment[0][1:]))
if ctrl is not None:
ctrl.add_latency_segment(ip1, ip2, segment[1])
if segment[1].startswith('s'):
ctrl = self.controllers.get(int(segment[1][1:]))
if ctrl is not None:
ctrl.add_latency_segment(ip2, ip1, segment[0])
2019-06-14 07:51:57 +00:00
def _update_performance_state(self):
sio = StringIO()
print(f'@start.path', file=sio)
for _, path in sorted(self._paths._pairs.items()):
2019-07-01 23:42:25 +00:00
if isinstance(path, WeightedPathAggregate):
print(repr(dict(sorted(path.items()))), file=sio)
else:
print(repr({tuple(path): 1.0}), file=sio)
2019-06-14 07:51:57 +00:00
print(f'@end.path', file=sio)
print(f'@start.load', file=sio)
for (ne1, ne2), load in sorted(self._loads._pairs.items()):
print(repr((ne1, ne2, load)), file=sio)
print(f'@end.load', file=sio)
2019-07-02 03:14:03 +00:00
# print(sio.getvalue(), end='')
2019-07-01 23:42:25 +00:00
with open(f'{base_net_name}.state', 'w') as f:
2019-06-14 07:51:57 +00:00
v = sio.getvalue()
f.write(v)
2019-07-01 23:42:25 +00:00
# os.rename(f'{base_net_name}.state2', f'{base_net_name}.state')
2019-06-09 17:11:04 +00:00
2019-06-11 01:19:33 +00:00
def portno_from_list(l):
2019-06-09 17:11:04 +00:00
return UsageStore(dict([
(tuple(x[:2]), i+1)
2019-06-11 01:19:33 +00:00
for i, x in l
]))
def usage_store_from_list(l):
return UsageStore(dict([
(tuple(x[:2]), 0)
for _, x in l
2019-06-09 17:11:04 +00:00
]))
2019-06-14 07:51:57 +00:00
def host_path_from_list(l):
return UsageStore(dict([
((x, y), list())
for x in l
for y in l
if x != y
2019-07-05 08:52:59 +00:00
]), default=list())
2019-06-14 07:51:57 +00:00
2019-06-09 17:11:04 +00:00
def bandwidth_from_list(l):
return UsageStore(dict([
(tuple(x[:2]), x[2])
2019-06-11 01:19:33 +00:00
for _, x in l
2019-06-09 17:11:04 +00:00
]))
2019-06-11 01:19:33 +00:00
class PairSorterMixin:
def _sort_pair(self, a, b):
return _sort_pair(a, b)
class UsageStore(PairSorterMixin):
2019-06-28 21:35:39 +00:00
def __init__(self, initial=dict(), default=0):
2019-06-09 17:11:04 +00:00
self._pairs = dict()
for k, v in initial.items():
self[k] = v
2019-06-28 21:35:39 +00:00
self._default = default
2019-06-09 17:11:04 +00:00
def __getitem__(self, idx) -> float:
srtd = self._sort_pair(*idx)
2019-06-14 07:51:57 +00:00
pairs = self._pairs
if srtd not in pairs:
2019-06-28 21:35:39 +00:00
return self._default
2019-06-05 07:17:36 +00:00
else:
2019-06-14 07:51:57 +00:00
return pairs[srtd]
2019-06-05 07:17:36 +00:00
2019-06-11 01:19:33 +00:00
def reverse_lookup(self, val):
return [k for k, v in self._pairs.items() if v == val]
def calculate(self, other, operation):
return UsageStore({
k: operation(v, other[k])
for k, v in self._pairs.items()
})
def contains(self, idx) -> bool:
return self._sort_pair(*idx) in self._pairs.keys()
2019-06-09 17:11:04 +00:00
def __setitem__(self, idx, val: float):
self._pairs[self._sort_pair(*idx)] = val
2019-06-11 01:19:33 +00:00
2019-07-01 23:42:25 +00:00
def __str__(self):
return repr(self)
2019-06-11 01:19:33 +00:00
def __repr__(self):
return f"{type(self).__name__}({repr(self._pairs)})"
2019-06-18 06:50:25 +00:00
def copy(self):
2019-07-01 23:42:25 +00:00
return type(self)(self._pairs.copy(), self._default)
2019-06-18 06:50:25 +00:00
def to_regular_storage(self):
return self.copy()
@property
def as_regular_storage(self):
return self.to_regular_storage()
class ValueGetterCallable:
def __init__(self, value):
self._val = value
def __call__(self, *args, **kwargs):
return self._val
2019-06-11 01:19:33 +00:00
class UsageStoreProxyFromCallableIterable(PairSorterMixin):
2019-06-28 21:35:39 +00:00
def __init__(self, clb, get_behaviour=None, default=0):
2019-06-11 01:19:33 +00:00
self._callable = clb
self._behaviour = get_behaviour
2019-06-18 06:50:25 +00:00
self.is_frozen = False
2019-06-28 21:35:39 +00:00
self._default = default
2019-06-11 01:19:33 +00:00
def __getitem__(self, idx) -> float:
r = list()
srtd = self._sort_pair(*idx)
for us in self._callable():
v = us._pairs.get(srtd)
if v is not None:
r.append(v)
if self._behaviour is None:
2019-06-28 21:35:39 +00:00
return self._default
2019-06-11 01:19:33 +00:00
else:
return self._behaviour(r)
def __setitem__(self, idx, val: float):
alo = False
srtd = self._sort_pair(*idx)
for us in self._callable():
v = us._pairs.get(srtd)
if v is not None:
us._pairs[srtd] = val
alo = True
if not alo:
raise ValueError(
f"There's no existing connection between" +
f" {idx[0]} and {idx[1]} " +
f"to update"
)
def to_regular_storage(self):
keys = set([
key
for us in self._callable()
for key in us._pairs.keys()
])
return UsageStore(dict([(key, self[key]) for key in keys]))
2019-06-14 07:51:57 +00:00
@property
def as_regular_storage(self):
return self.to_regular_storage()
2019-06-18 06:50:25 +00:00
@property
def frozen(self):
frz = type(self)(
ValueGetterCallable(self._callable()),
self._behaviour
)
frz.is_frozen = True
return frz
2019-06-14 07:51:57 +00:00
class UsageStoreProxyFromFlowDict(UsageStore):
def __init__(self, dct, aggregator=sum):
self._flows = dct
self._agg = aggregator
@property
def _pairs(self):
d = dict()
l = [(self._sort_pair(k[1], k[2]), v) for k, v in self._flows.items()]
for k, _ in l:
if k not in d:
d[k] = list()
for k, v in l:
d[k].append(v)
for k in d.keys():
d[k] = self._agg(d[k])
return d
def to_regular_storage(self):
return UsageStore(self._pairs)
@property
def as_regular_storage(self):
return self.to_regular_storage()
2019-06-11 01:19:33 +00:00
class AttrCallableIterable:
def __init__(self, callablecollection, attr):
self._cc = callablecollection
self._attr = attr
def __call__(self):
for i in self._cc():
yield getattr(i, self._attr)
yield from EMPTY_ITER
2019-06-18 06:50:25 +00:00
class DictAggregator:
def __init__(self, ds):
self._ds = ds
def __getitem__(self, q):
for d in self._ds:
if q in d:
return d[q]
return self._ds[q]
def get(self, q, e=None):
for d in self._ds:
if q in d:
return d[q]
return e
def keys(self):
return {
k
for d in self._ds
for k in d.keys()
}
def values(self):
return {
v
for d in self._ds
for v in d.values()
}
def items(self):
return {
i
for d in self._ds
for i in d.items()
}
def to_regular_dict(self):
return dict(self.items())
def copy(self):
return type(self)(self._ds.copy())
2019-07-01 23:42:25 +00:00
class WeightedPathAggregate:
def __init__(self, weighted: Dict[Tuple[str], float] = dict()):
self._subscriptable = list(weighted.keys())[0]
self._weighted = weighted
def items(self):
return self._weighted.items()
def keys(self):
return self._weighted.keys()
def values(self):
return self._weighted.values()
2019-07-03 20:52:11 +00:00
2019-07-01 23:42:25 +00:00
def __getitem__(self, val: int) -> str:
return self._subscriptable[val]
def transitions(self, in_src):
transitions_algo_weighted = dict()
for path, weight in self.items():
2019-07-03 20:52:11 +00:00
if path[0] != in_src:
2019-07-01 23:42:25 +00:00
path = tuple(list(reversed(list(path))))
links = [
path[i:i+2]
for i in range(len(path)-1)
]
for ne1, ne2 in links:
if ne1.startswith('s'):
if ne1 not in transitions_algo_weighted:
transitions_algo_weighted[ne1] = dict()
if weight > transitions_algo_weighted[ne1].get(ne2, NEG_INF):
transitions_algo_weighted[ne1][ne2] = weight
transitions_reweighted = dict()
for src, d in transitions_algo_weighted.items():
if src not in transitions_reweighted:
transitions_reweighted[src] = dict()
sw = sum(d.values())
2019-07-03 20:52:11 +00:00
if sw <= 0:
2019-07-01 23:42:25 +00:00
for dst, w in d.items():
transitions_reweighted[src][dst] = 1.0/len(d)
else:
for dst, w in d.items():
transitions_reweighted[src][dst] = w/sw
return transitions_reweighted
def __str__(self):
return f"{type(self).__name__}({repr(self._weighted)})"
def __repr__(self):
return str(self)