2019-06-05 07:17:36 +00:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
# -*- encoding: utf-8 -*-
|
|
|
|
|
2019-06-11 01:19:33 +00:00
|
|
|
import os
|
2019-06-09 17:11:04 +00:00
|
|
|
import sys
|
|
|
|
import json
|
2019-07-01 23:42:25 +00:00
|
|
|
import pulp
|
|
|
|
import numpy
|
2019-06-14 07:51:57 +00:00
|
|
|
import random
|
2019-06-11 01:19:33 +00:00
|
|
|
import datetime
|
2019-06-14 07:51:57 +00:00
|
|
|
import threading
|
2019-06-15 06:31:09 +00:00
|
|
|
from sortundirectednodepair import _sort_pair
|
|
|
|
from graphtools import Dijkstra, find_all_paths, graph_from_topo
|
2019-06-09 17:11:04 +00:00
|
|
|
from time import sleep
|
2019-06-14 07:51:57 +00:00
|
|
|
from io import StringIO
|
2019-06-11 01:19:33 +00:00
|
|
|
from id2ip import id2ip, ip2id
|
|
|
|
from typing import Any
|
|
|
|
from typing import Union
|
|
|
|
from typing import Tuple
|
|
|
|
from typing import List
|
|
|
|
from typing import Dict
|
2019-06-28 21:35:39 +00:00
|
|
|
from typing import Optional
|
2019-06-09 17:11:04 +00:00
|
|
|
from configparser import ConfigParser
|
|
|
|
from ryu.lib import hub
|
2019-06-05 07:17:36 +00:00
|
|
|
from ryu.base import app_manager
|
|
|
|
from ryu.lib import ofctl_v1_3
|
|
|
|
from ryu.lib.packet import ether_types
|
|
|
|
from ryu.lib.packet import ethernet
|
|
|
|
from ryu.ofproto import ofproto_v1_3
|
|
|
|
from ryu.controller.handler import set_ev_cls
|
2019-06-09 17:11:04 +00:00
|
|
|
from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER
|
2019-06-05 07:17:36 +00:00
|
|
|
from ryu.controller import ofp_event
|
|
|
|
from ryu.lib.packet import packet
|
2019-06-09 17:11:04 +00:00
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
from concurrent.futures import ProcessPoolExecutor
|
|
|
|
|
2019-07-01 23:42:25 +00:00
|
|
|
NEG_INF = float('-inf')
|
|
|
|
MIN_POSITIVE_FLOAT = numpy.nextafter(0, 1)
|
|
|
|
|
2019-06-11 01:19:33 +00:00
|
|
|
EMPTY_ITER = iter(list())
|
|
|
|
|
2019-06-28 08:33:02 +00:00
|
|
|
SPEED_KILOBIT = 0.001
|
|
|
|
SPEED_MEGABIT = 1
|
|
|
|
SPEED_GIGABIT = 1000
|
|
|
|
SPEED_TERABIT = 1000000
|
|
|
|
|
|
|
|
UNLIMITED_BANDWIDTH = 70*SPEED_TERABIT
|
|
|
|
|
2019-06-28 21:35:39 +00:00
|
|
|
NoneType = type(None)
|
|
|
|
|
2019-06-11 01:19:33 +00:00
|
|
|
print(f"loading network {sys.argv[2]}", file=sys.stderr)
|
2019-06-09 17:11:04 +00:00
|
|
|
network_topo = ''
|
|
|
|
with open(sys.argv[2]) as f:
|
|
|
|
network_topo = json.loads(f.read())
|
|
|
|
# ryu uses eventlet, which, on some versions, breaks pathlib's read_text
|
|
|
|
|
2019-06-14 07:51:57 +00:00
|
|
|
base_net_name = '.'.join(sys.argv[2].split('.')[:-1])
|
|
|
|
|
2019-07-01 23:42:25 +00:00
|
|
|
with open('~current.state', 'w') as f:
|
|
|
|
f.write(base_net_name)
|
|
|
|
|
2019-07-03 20:52:11 +00:00
|
|
|
with open('~current.sws.state', 'w') as f:
|
|
|
|
f.write("")
|
|
|
|
|
2019-07-05 08:52:59 +00:00
|
|
|
if os.path.exists(f'{base_net_name}.state'):
|
|
|
|
os.unlink(f'{base_net_name}.state')
|
|
|
|
|
2019-06-11 01:19:33 +00:00
|
|
|
print(f"reading network configuration variables.ini", file=sys.stderr)
|
2019-06-09 17:11:04 +00:00
|
|
|
network_config = ConfigParser()
|
|
|
|
with open('variables.ini') as f:
|
|
|
|
network_config.read_string(f.read())
|
|
|
|
|
2019-06-11 01:19:33 +00:00
|
|
|
m1 = float(network_config['linearalgconst']['m1'])
|
|
|
|
m2 = float(network_config['linearalgconst']['m2'])
|
|
|
|
hop_delay = float(network_config['linearalgconst']['hop_delay'])
|
2019-07-01 23:42:25 +00:00
|
|
|
apa_path_stretch = float(network_config['APA']['path_stretch'])
|
2019-06-28 08:33:02 +00:00
|
|
|
routing_algo = network_config['GENERAL']['algo']
|
2019-06-11 01:19:33 +00:00
|
|
|
|
2019-06-15 06:31:09 +00:00
|
|
|
network_graph = graph_from_topo(network_topo)
|
2019-06-11 01:19:33 +00:00
|
|
|
|
|
|
|
|
|
|
|
def jsonload_list2tuple(x):
|
|
|
|
if isinstance(x, type(None)) or isinstance(x, int) or isinstance(x, float) or isinstance(x, str):
|
|
|
|
return x
|
|
|
|
elif isinstance(x, list) or isinstance(x, tuple):
|
|
|
|
return tuple([jsonload_list2tuple(k) for k in x])
|
|
|
|
elif isinstance(x, dict):
|
|
|
|
return {jsonload_list2tuple(k): jsonload_list2tuple(v) for k, v in x.items()}
|
|
|
|
else:
|
|
|
|
raise ValueError("Input didn't come from a standard JSON")
|
|
|
|
|
|
|
|
|
|
|
|
def prepare_pop_pair_alternative_paths_for_availability(graph, hosts):
|
|
|
|
pop_apa = [[None for x in hosts] for y in hosts]
|
|
|
|
for i, h1 in enumerate(hosts):
|
|
|
|
for j, h2 in enumerate(hosts):
|
|
|
|
if pop_apa[i][j] is None:
|
|
|
|
apa = tuple(list(find_all_paths(graph, h1, h2)))
|
|
|
|
pop_apa[i][j] = apa
|
|
|
|
pop_apa[j][i] = apa
|
|
|
|
return pop_apa
|
|
|
|
|
|
|
|
|
2019-06-14 07:51:57 +00:00
|
|
|
apacachefile = f"{base_net_name}.apa.json"
|
2019-06-11 01:19:33 +00:00
|
|
|
print(
|
|
|
|
f"checking if APA for {sys.argv[2]} is cached at {apacachefile}", file=sys.stderr)
|
|
|
|
|
|
|
|
pop_apa_candidates = None
|
|
|
|
if os.path.isfile(apacachefile):
|
|
|
|
print(f"loading APA from {apacachefile}", file=sys.stderr)
|
|
|
|
with open(apacachefile) as f:
|
|
|
|
pop_apa_candidates = jsonload_list2tuple(json.loads(f.read()))
|
|
|
|
else:
|
|
|
|
print(f"calculating APA for {sys.argv[2]}", file=sys.stderr)
|
|
|
|
pop_apa_candidates = prepare_pop_pair_alternative_paths_for_availability(
|
|
|
|
network_graph,
|
|
|
|
network_topo[0]
|
|
|
|
)
|
|
|
|
print(f"caching APA at {apacachefile}", file=sys.stderr)
|
|
|
|
with open(apacachefile, 'w') as f:
|
|
|
|
f.write(json.dumps(pop_apa_candidates))
|
|
|
|
|
2019-06-09 17:11:04 +00:00
|
|
|
|
2019-07-05 08:52:59 +00:00
|
|
|
def filter_out_invalid_paths_from_multiple_paths(h1, h2, candidates, sw=None):
|
|
|
|
valid_paths = list()
|
|
|
|
valid_paths_bits_rev = set()
|
|
|
|
for path in sorted(
|
|
|
|
candidates,
|
|
|
|
key=lambda a: (len(a), a)
|
|
|
|
):
|
|
|
|
if sw is not None and sw not in path:
|
|
|
|
continue
|
|
|
|
if path[0] == h2 and path[-1] == h1:
|
|
|
|
path = list(reversed(path))
|
|
|
|
path_bits = set([
|
|
|
|
tuple(path[i:i+2])
|
|
|
|
for i in range(len(path)-1)
|
|
|
|
])
|
|
|
|
if len(path_bits.intersection(valid_paths_bits_rev)) <= 0:
|
|
|
|
valid_paths.append(path)
|
|
|
|
valid_paths_bits_rev = valid_paths_bits_rev.union(set([
|
|
|
|
tuple(reversed(i))
|
|
|
|
for i in path_bits
|
|
|
|
]))
|
|
|
|
return valid_paths
|
|
|
|
|
|
|
|
|
2019-06-09 17:11:04 +00:00
|
|
|
class LatencyController:
|
|
|
|
def __init__(self, datapath):
|
|
|
|
super().__init__()
|
|
|
|
self._datapath = datapath
|
|
|
|
self._sw = f"s{self._datapath.id}"
|
|
|
|
l = [
|
2019-06-11 01:19:33 +00:00
|
|
|
(i, x) for i, x in enumerate(network_topo[2])
|
2019-06-09 17:11:04 +00:00
|
|
|
if x[0] == self._sw or x[1] == self._sw
|
|
|
|
]
|
2019-06-11 01:19:33 +00:00
|
|
|
self._l = l
|
|
|
|
self._links = portno_from_list(l)
|
2019-06-14 07:51:57 +00:00
|
|
|
self._flow_xfer = dict()
|
|
|
|
self._flow_speed = dict()
|
|
|
|
self._link_speed = UsageStoreProxyFromFlowDict(self._flow_speed)
|
2019-06-11 01:19:33 +00:00
|
|
|
self._link_rules = usage_store_from_list(l)
|
2019-06-09 17:11:04 +00:00
|
|
|
self._bandwidth = bandwidth_from_list(l)
|
2019-06-11 01:19:33 +00:00
|
|
|
self._ospf = None
|
2019-06-05 07:17:36 +00:00
|
|
|
|
2019-07-05 08:52:59 +00:00
|
|
|
def connected(self, la: 'LatencyApp'):
|
2019-06-09 17:11:04 +00:00
|
|
|
print(f"Switch connected: {self._sw}")
|
2019-06-11 01:19:33 +00:00
|
|
|
print(f"Switch OSPF-fallback loading: {self._sw}")
|
|
|
|
parser = self._datapath.ofproto_parser
|
|
|
|
self._ospf = la.ospf_dijkstra(self._sw)
|
|
|
|
for h in network_topo[0]:
|
|
|
|
ipv4_dst = id2ip(int(h[1:])-1)
|
|
|
|
match_ipv4 = parser.OFPMatch(
|
|
|
|
eth_type=0x0800,
|
|
|
|
ipv4_dst=ipv4_dst
|
|
|
|
)
|
|
|
|
match_arp = parser.OFPMatch(
|
|
|
|
eth_type=0x0806,
|
|
|
|
arp_tpa=ipv4_dst
|
|
|
|
)
|
|
|
|
next_hop = self._ospf(h)[0][1]
|
|
|
|
out_port = self._links[(self._sw, next_hop)]
|
|
|
|
actions = [parser.OFPActionOutput(out_port)]
|
|
|
|
print(f"{self._sw} --[{out_port}]--> {next_hop} ({ipv4_dst})")
|
|
|
|
self.add_flow(1, match_ipv4, actions)
|
|
|
|
self.add_flow(1, match_arp, actions)
|
|
|
|
print(f"Switch OSPF-fallback loaded: {self._sw}")
|
2019-06-28 08:33:02 +00:00
|
|
|
if routing_algo == 'ecmp':
|
|
|
|
print(f"Switch ECMP setting up: {self._sw}")
|
|
|
|
self._configureECMP(la)
|
|
|
|
print(f"Switch ECMP set up: {self._sw}")
|
2019-07-05 08:52:59 +00:00
|
|
|
hub.spawn(self._write_in_initialized_file)
|
|
|
|
# self._write_in_initialized_file()
|
|
|
|
|
|
|
|
def _write_in_initialized_file(self, retries=5):
|
|
|
|
with initialized_switch_writing_lock:
|
|
|
|
with open('~current.sws.state', 'a') as f:
|
|
|
|
f.write(f"{self._sw}{os.linesep}")
|
|
|
|
if retries>0:
|
|
|
|
sleep(random.uniform(0.25, 1.5))
|
|
|
|
self._write_in_initialized_file(retries-1)
|
2019-06-05 07:17:36 +00:00
|
|
|
|
2019-06-09 17:11:04 +00:00
|
|
|
def disconnected(self):
|
|
|
|
print(f"Switch disconnected: {self._sw}")
|
2019-06-05 07:17:36 +00:00
|
|
|
|
2019-06-11 01:19:33 +00:00
|
|
|
def add_flow(self, priority, match, actions, buffer_id=None, idle_timeout=0, hard_timeout=0):
|
|
|
|
ofproto = self._datapath.ofproto
|
|
|
|
parser = self._datapath.ofproto_parser
|
2019-06-05 07:17:36 +00:00
|
|
|
|
2019-06-11 01:19:33 +00:00
|
|
|
inst = [parser.OFPInstructionActions(
|
|
|
|
ofproto.OFPIT_APPLY_ACTIONS,
|
|
|
|
actions
|
|
|
|
)]
|
|
|
|
additionals = dict()
|
2019-06-05 07:17:36 +00:00
|
|
|
if buffer_id:
|
2019-06-11 01:19:33 +00:00
|
|
|
additionals['buffer_id'] = buffer_id
|
|
|
|
if idle_timeout:
|
|
|
|
additionals['idle_timeout'] = idle_timeout
|
|
|
|
if hard_timeout:
|
|
|
|
additionals['hard_timeout'] = hard_timeout
|
|
|
|
mod = parser.OFPFlowMod(datapath=self._datapath,
|
|
|
|
priority=priority, match=match,
|
|
|
|
instructions=inst,
|
|
|
|
**additionals)
|
|
|
|
self._datapath.send_msg(mod)
|
|
|
|
|
|
|
|
def _the_other_link_endpoint(self, linktuple):
|
|
|
|
return linktuple[abs(linktuple.index(self._sw)-1)]
|
2019-06-05 07:17:36 +00:00
|
|
|
|
2019-07-01 23:42:25 +00:00
|
|
|
def _on_stats(self, ev, la: 'LatencyApp'):
|
2019-06-11 01:19:33 +00:00
|
|
|
self._link_rules = usage_store_from_list(self._l)
|
2019-06-28 08:33:02 +00:00
|
|
|
parser = self._datapath.ofproto_parser
|
2019-06-11 01:19:33 +00:00
|
|
|
stats = ev.msg.body
|
2019-06-14 07:51:57 +00:00
|
|
|
allkeys = []
|
|
|
|
for k in self._link_rules._pairs.keys():
|
|
|
|
self._link_rules._pairs[k] = 0
|
2019-06-11 01:19:33 +00:00
|
|
|
for stat in stats:
|
2019-06-14 07:51:57 +00:00
|
|
|
# ltp -> link tuple
|
|
|
|
# ftp -> flow tuple
|
|
|
|
# bc -> byte count
|
|
|
|
# obc -> old byte count
|
2019-06-28 08:33:02 +00:00
|
|
|
action = stat.instructions[0].actions[0]
|
|
|
|
outs = list()
|
|
|
|
if isinstance(action, parser.OFPActionGroup):
|
2019-07-01 23:42:25 +00:00
|
|
|
weight_sum = sum([
|
|
|
|
bucket.weight
|
|
|
|
for bucket in la.ecmp_group_id_buckets[action.group_id]
|
|
|
|
])
|
2019-06-28 08:33:02 +00:00
|
|
|
for bucket in la.ecmp_group_id_buckets[action.group_id]:
|
|
|
|
outs.append((
|
|
|
|
bucket.actions[0].port,
|
2019-07-01 23:42:25 +00:00
|
|
|
bucket.weight/max(1, weight_sum)
|
2019-06-28 08:33:02 +00:00
|
|
|
))
|
|
|
|
else:
|
|
|
|
outs.append((action.port, 1))
|
|
|
|
for out_port, weight in outs:
|
|
|
|
ltps = self._links.reverse_lookup(out_port)
|
|
|
|
if len(ltps) <= 0:
|
|
|
|
continue
|
|
|
|
ltp = ltps[0]
|
|
|
|
nxt = ltp[abs(ltp.index(self._sw)-1)]
|
|
|
|
# measuring link flows
|
|
|
|
self._link_rules[ltp] = 1+self._link_rules[ltp]
|
|
|
|
# measuring load
|
|
|
|
if stat.match['eth_type'] == 0x0800:
|
|
|
|
src = stat.match.get('ipv4_src', '10.0.0.0')
|
|
|
|
dst = stat.match['ipv4_dst']
|
|
|
|
src = f"h{ip2id(src)+1}"
|
|
|
|
dst = f"h{ip2id(dst)+1}"
|
|
|
|
if src == 'h0':
|
|
|
|
src = None
|
|
|
|
key = (src, self._sw, nxt, dst)
|
|
|
|
bc = stat.byte_count
|
|
|
|
obc = self._flow_xfer.get(key, 0)
|
|
|
|
tfd = max(0, bc-obc)*weight # bytes per cycle
|
|
|
|
tfr = ((8*tfd)/la.interval)/10**6 # mbps
|
|
|
|
self._flow_xfer[key] = bc
|
|
|
|
self._flow_speed[key] = tfr
|
|
|
|
allkeys.append(key)
|
2019-06-14 07:51:57 +00:00
|
|
|
for k in (set(self._flow_speed.keys())-set(allkeys)):
|
|
|
|
self._flow_speed[k] = 0
|
2019-06-11 01:19:33 +00:00
|
|
|
|
2019-06-28 08:33:02 +00:00
|
|
|
# with parts from <https://github.com/wildan2711/multipath/blob/master/ryu_multipath.py>
|
|
|
|
# commented by <https://wildanmsyah.wordpress.com/2018/01/13/multipath-routing-with-load-balancing-using-ryu-openflow-controller/>
|
2019-07-01 23:42:25 +00:00
|
|
|
def _configureECMP(self, la: 'LatencyApp'):
|
2019-06-28 08:33:02 +00:00
|
|
|
ofproto = self._datapath.ofproto
|
|
|
|
parser = self._datapath.ofproto_parser
|
2019-07-05 08:52:59 +00:00
|
|
|
prints = list()
|
2019-06-28 08:33:02 +00:00
|
|
|
for h1p, h1 in enumerate(network_topo[0]):
|
|
|
|
for h2p, h2 in enumerate(network_topo[0]):
|
|
|
|
if h1p != h2p:
|
|
|
|
ips = id2ip(int(h1[1:])-1)
|
|
|
|
ipd = id2ip(int(h2[1:])-1)
|
|
|
|
match = parser.OFPMatch(
|
|
|
|
eth_type=0x0800,
|
|
|
|
ipv4_src=ips,
|
|
|
|
ipv4_dst=ipd
|
|
|
|
)
|
2019-07-05 08:52:59 +00:00
|
|
|
valid_paths = filter_out_invalid_paths_from_multiple_paths(
|
|
|
|
h1,
|
|
|
|
h2,
|
|
|
|
pop_apa_candidates[h1p][h2p],
|
|
|
|
self._sw
|
|
|
|
)
|
2019-06-28 08:33:02 +00:00
|
|
|
portouts = list()
|
2019-07-02 03:14:03 +00:00
|
|
|
nexthops = list()
|
2019-07-05 08:52:59 +00:00
|
|
|
for path in valid_paths:
|
2019-07-01 23:42:25 +00:00
|
|
|
nexthop = (path[path.index(self._sw)+1], 1.0)
|
2019-07-02 03:14:03 +00:00
|
|
|
link = (self._sw, nexthop[0])
|
2019-06-28 08:33:02 +00:00
|
|
|
portout = self._links[link]
|
|
|
|
portouts.append(portout)
|
2019-07-02 03:14:03 +00:00
|
|
|
nexthops.append(nexthop[0])
|
2019-07-05 08:52:59 +00:00
|
|
|
portouts = list(set(portouts))
|
|
|
|
nexthops = list(set(nexthops))
|
|
|
|
prints.append(
|
|
|
|
f"({ips}) {self._sw} --{portouts}-> {set(nexthops)} ({ipd})"
|
|
|
|
)
|
2019-06-28 08:33:02 +00:00
|
|
|
if len(portouts) <= 0:
|
|
|
|
continue
|
|
|
|
# no output
|
|
|
|
elif len(portouts) == 1:
|
|
|
|
actions = [parser.OFPActionOutput(portouts[0])]
|
2019-07-02 03:14:03 +00:00
|
|
|
self.add_flow(7, match, actions)
|
2019-06-28 08:33:02 +00:00
|
|
|
# add simple rule
|
|
|
|
else:
|
|
|
|
all_bws = [x if x is not None else UNLIMITED_BANDWIDTH for x in [
|
|
|
|
network_topo[2][portout-1][2] for portout in portouts]]
|
|
|
|
sum_all_bws = sum(all_bws)
|
|
|
|
weighted_bws = [bw/sum_all_bws for bw in all_bws]
|
|
|
|
out_ports = list(zip(portouts, weighted_bws))
|
|
|
|
del weighted_bws
|
|
|
|
del sum_all_bws
|
|
|
|
del portouts
|
|
|
|
tpl = tuple([h1, self._sw, h2])
|
|
|
|
gnw = False
|
|
|
|
if tpl not in la.ecmp_group_ids:
|
|
|
|
la.ecmp_group_ids[tpl] = len(la.ecmp_group_ids)+1
|
|
|
|
gnw = True
|
|
|
|
gid = la.ecmp_group_ids[tpl]
|
|
|
|
buckets = []
|
|
|
|
|
|
|
|
for port, weight in out_ports:
|
2019-07-05 08:52:59 +00:00
|
|
|
bucket_weight = int(
|
|
|
|
round(
|
|
|
|
65535 * min(
|
|
|
|
1.0,
|
|
|
|
max(
|
|
|
|
0.0,
|
|
|
|
(1 - weight)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
)
|
2019-06-28 08:33:02 +00:00
|
|
|
bucket_action = [parser.OFPActionOutput(port)]
|
|
|
|
buckets.append(
|
|
|
|
parser.OFPBucket(
|
|
|
|
weight=bucket_weight,
|
|
|
|
watch_port=port,
|
|
|
|
watch_group=ofproto.OFPG_ANY,
|
|
|
|
actions=bucket_action
|
|
|
|
)
|
|
|
|
)
|
2019-07-01 23:42:25 +00:00
|
|
|
|
2019-06-28 08:33:02 +00:00
|
|
|
la.ecmp_group_id_buckets[gid] = buckets
|
|
|
|
|
|
|
|
req = parser.OFPGroupMod(
|
|
|
|
self._datapath,
|
|
|
|
ofproto.OFPGC_ADD if gnw else ofproto.OFPGC_MODIFY,
|
|
|
|
ofproto.OFPGT_SELECT,
|
|
|
|
gid,
|
|
|
|
buckets
|
|
|
|
)
|
|
|
|
self._datapath.send_msg(req)
|
|
|
|
# set group
|
|
|
|
|
|
|
|
actions = [parser.OFPActionGroup(gid)]
|
2019-07-02 03:14:03 +00:00
|
|
|
self.add_flow(7, match, actions)
|
2019-06-28 08:33:02 +00:00
|
|
|
# add rule
|
2019-07-05 08:52:59 +00:00
|
|
|
print(os.linesep.join(prints))
|
2019-06-28 08:33:02 +00:00
|
|
|
pass
|
|
|
|
|
2019-07-01 23:42:25 +00:00
|
|
|
def add_latency_segment(self, ips: str, ipd: str, nxt: str, ignore_routing_algo: bool = False):
|
2019-06-28 08:33:02 +00:00
|
|
|
if (routing_algo not in ['ldr', 'minmax']) and not ignore_routing_algo:
|
2019-06-14 07:51:57 +00:00
|
|
|
return
|
|
|
|
link = (self._sw, nxt)
|
2019-06-11 01:19:33 +00:00
|
|
|
parser = self._datapath.ofproto_parser
|
2019-06-14 07:51:57 +00:00
|
|
|
actions = [parser.OFPActionOutput(self._links[link])]
|
2019-06-11 01:19:33 +00:00
|
|
|
match = parser.OFPMatch(
|
|
|
|
eth_type=0x0800,
|
|
|
|
ipv4_src=ips,
|
|
|
|
ipv4_dst=ipd
|
|
|
|
)
|
|
|
|
self.add_flow(5, match, actions)
|
2019-06-14 07:51:57 +00:00
|
|
|
# hs = f"h{ip2id(ips)+1}"
|
|
|
|
# hd = f"h{ip2id(ipd)+1}"
|
|
|
|
# self._flow_xfer[(hs, hd, nxt)] = 0
|
2019-07-03 20:52:11 +00:00
|
|
|
|
2019-07-01 23:42:25 +00:00
|
|
|
def add_weighted_latency(self, la: 'LatencyApp', ips: str, ipd: str, nxts: Dict[str, float]):
|
2019-07-03 20:52:11 +00:00
|
|
|
if len(nxts) <= 0:
|
2019-07-01 23:42:25 +00:00
|
|
|
return
|
2019-07-03 20:52:11 +00:00
|
|
|
elif len(nxts) == 1:
|
2019-07-01 23:42:25 +00:00
|
|
|
self.add_latency_segment(ips, ipd, list(nxts.keys())[0])
|
|
|
|
else:
|
|
|
|
ofproto = self._datapath.ofproto
|
|
|
|
parser = self._datapath.ofproto_parser
|
|
|
|
match = parser.OFPMatch(
|
|
|
|
eth_type=0x0800,
|
|
|
|
ipv4_src=ips,
|
|
|
|
ipv4_dst=ipd
|
|
|
|
)
|
|
|
|
h1 = f"h{ip2id(ips)+1}"
|
|
|
|
h2 = f"h{ip2id(ipd)+1}"
|
|
|
|
|
|
|
|
out_ports = [
|
|
|
|
(self._links[(self._sw, nexthop)], weight)
|
|
|
|
for nexthop, weight in nxts.items()
|
|
|
|
]
|
|
|
|
tpl = tuple([h1, self._sw, h2])
|
|
|
|
gnw = False
|
|
|
|
if tpl not in la.ecmp_group_ids:
|
|
|
|
la.ecmp_group_ids[tpl] = len(la.ecmp_group_ids)+1
|
|
|
|
gnw = True
|
|
|
|
gid = la.ecmp_group_ids[tpl]
|
|
|
|
buckets = []
|
|
|
|
|
|
|
|
for port, weight in out_ports:
|
|
|
|
bucket_weight = int(round(weight * 65535))
|
|
|
|
bucket_action = [parser.OFPActionOutput(port)]
|
|
|
|
buckets.append(
|
|
|
|
parser.OFPBucket(
|
|
|
|
weight=bucket_weight,
|
|
|
|
watch_port=port,
|
|
|
|
watch_group=ofproto.OFPG_ANY,
|
|
|
|
actions=bucket_action
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
la.ecmp_group_id_buckets[gid] = buckets
|
|
|
|
|
|
|
|
req = parser.OFPGroupMod(
|
|
|
|
self._datapath,
|
|
|
|
ofproto.OFPGC_ADD if gnw else ofproto.OFPGC_MODIFY,
|
|
|
|
ofproto.OFPGT_SELECT,
|
|
|
|
gid,
|
|
|
|
buckets
|
|
|
|
)
|
|
|
|
self._datapath.send_msg(req)
|
|
|
|
# set group
|
|
|
|
|
|
|
|
actions = [parser.OFPActionGroup(gid)]
|
|
|
|
self.add_flow(5, match, actions)
|
|
|
|
# add rule
|
2019-06-05 07:17:36 +00:00
|
|
|
|
2019-06-18 06:50:25 +00:00
|
|
|
@property
|
|
|
|
def simulatable(self):
|
|
|
|
return SimulatableSwitch(
|
|
|
|
self._sw,
|
|
|
|
self._flow_speed.copy(),
|
|
|
|
self._bandwidth.copy()
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class SimulatableSwitch:
|
|
|
|
def __init__(self, sw: str, fs: dict, bw: 'UsageStore'):
|
|
|
|
self._sw = sw
|
|
|
|
self._flow_speed = fs
|
|
|
|
self._bandwidth = bw
|
|
|
|
self._link_speed = UsageStoreProxyFromFlowDict(self._flow_speed)
|
|
|
|
self._reinit()
|
|
|
|
|
|
|
|
def _reinit(self):
|
|
|
|
self._link_rules = self.get_link_rules()
|
|
|
|
|
|
|
|
def get_link_rules(self):
|
|
|
|
cnt = UsageStore()
|
|
|
|
for key, bw in self._flow_speed.items():
|
|
|
|
if bw > 0:
|
|
|
|
k = (key[1], key[2])
|
|
|
|
cnt[k] = 1+cnt[k]
|
|
|
|
return cnt
|
|
|
|
|
|
|
|
@property
|
|
|
|
def name(self):
|
|
|
|
return self._sw
|
|
|
|
|
|
|
|
def copy(self):
|
|
|
|
return type(self)(
|
|
|
|
self._sw,
|
|
|
|
self._flow_speed.copy(),
|
|
|
|
self._bandwidth.copy()
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class SimulatableNetwork:
|
|
|
|
def __init__(self, sws: List[SimulatableSwitch], paths: 'UsageStore'):
|
|
|
|
self.switches: Dict[str, SimulatableSwitch] = {
|
|
|
|
sw._sw: sw
|
|
|
|
for sw in sws
|
|
|
|
}
|
|
|
|
self.max_speed = bandwidth_from_list(enumerate(network_topo[2]))
|
|
|
|
self.paths = paths
|
|
|
|
self._reinit()
|
|
|
|
|
|
|
|
def _reinit(self):
|
|
|
|
self._flow_speed = DictAggregator(
|
|
|
|
[sw._flow_speed for sw in self.switches.values()]
|
|
|
|
)
|
|
|
|
self.last_speed = UsageStoreProxyFromCallableIterable(
|
|
|
|
AttrCallableIterable(
|
|
|
|
AttrCallableIterable(
|
|
|
|
self.switches.values,
|
|
|
|
'_link_speed'
|
|
|
|
),
|
|
|
|
'as_regular_storage'
|
|
|
|
),
|
|
|
|
sum
|
|
|
|
).as_regular_storage
|
|
|
|
self.link_usage = self.max_speed.calculate(
|
|
|
|
self.last_speed.as_regular_storage,
|
|
|
|
lambda maxspeed, lastspeed: (
|
|
|
|
lastspeed / max(maxspeed, 0.0000000000001)
|
|
|
|
)
|
|
|
|
).as_regular_storage
|
|
|
|
self.link_flows = self.get_link_flows()
|
|
|
|
|
|
|
|
def get_link_flows(self):
|
|
|
|
cnt = UsageStore()
|
|
|
|
for switch in self.switches.values():
|
|
|
|
for (_, sw, nxt, __), speed in switch._flow_speed.items():
|
|
|
|
key = (sw, nxt)
|
|
|
|
if switch._bandwidth[key] > 0 and speed > 0:
|
|
|
|
cnt[key] = 1+cnt[key]
|
|
|
|
return cnt
|
|
|
|
|
|
|
|
def get_routes(self):
|
|
|
|
return self.paths.copy()
|
|
|
|
|
|
|
|
def get_path(self, h1, h2):
|
|
|
|
return self.paths[(h1, h2)]
|
|
|
|
|
|
|
|
def copy(self) -> 'SimulatableNetwork':
|
|
|
|
return type(self)(
|
|
|
|
[sw.copy() for sw in self.switches.values()],
|
|
|
|
self.paths.copy()
|
|
|
|
)
|
|
|
|
|
|
|
|
def sort_by_max_flow_load(self, seqs):
|
|
|
|
wl = list()
|
|
|
|
for seq in seqs:
|
2019-07-01 23:42:25 +00:00
|
|
|
if not isinstance(seq, WeightedPathAggregate):
|
|
|
|
seq = WeightedPathAggregate({tuple(seq): 1.0})
|
2019-06-18 06:50:25 +00:00
|
|
|
ml = self.get_max_flow_load(seq)
|
2019-07-01 23:42:25 +00:00
|
|
|
wl.append((
|
|
|
|
ml,
|
|
|
|
sorted(seq.keys(), key=lambda a: len(a))[0],
|
|
|
|
seq
|
|
|
|
))
|
2019-06-18 06:50:25 +00:00
|
|
|
return list(map(
|
2019-07-01 23:42:25 +00:00
|
|
|
lambda a: a[2],
|
2019-06-18 06:50:25 +00:00
|
|
|
reversed(sorted(wl))
|
|
|
|
))
|
|
|
|
|
2019-07-01 23:42:25 +00:00
|
|
|
def get_max_flow_load(self, wpa):
|
|
|
|
h1, h2 = _sort_pair(wpa[0], wpa[-1])
|
2019-06-18 06:50:25 +00:00
|
|
|
ml = 0
|
2019-07-01 23:42:25 +00:00
|
|
|
if not isinstance(wpa, WeightedPathAggregate):
|
|
|
|
wpa = WeightedPathAggregate({tuple(wpa): 1.0})
|
|
|
|
for seq, weight in wpa.items():
|
|
|
|
for i in range(len(seq)-1):
|
|
|
|
link = tuple(seq[i:i+2])
|
|
|
|
revlink = tuple(list(reversed(link)))
|
|
|
|
ml = max(
|
|
|
|
ml,
|
|
|
|
((
|
|
|
|
(
|
|
|
|
self._flow_speed.get(tuple([h1, *link, h2]), 0)
|
|
|
|
/
|
|
|
|
self.max_speed[link]
|
|
|
|
)+(
|
|
|
|
self._flow_speed.get(tuple([h2, *revlink, h1]), 0)
|
|
|
|
/
|
|
|
|
self.max_speed[revlink]
|
|
|
|
)
|
|
|
|
))*weight
|
2019-06-18 06:50:25 +00:00
|
|
|
)
|
|
|
|
return ml
|
|
|
|
|
2019-07-01 23:42:25 +00:00
|
|
|
def get_max_flow_speed(self, wpa):
|
|
|
|
h1, h2 = _sort_pair(wpa[0], wpa[-1])
|
|
|
|
if not isinstance(wpa, WeightedPathAggregate):
|
|
|
|
wpa = WeightedPathAggregate({tuple(wpa): 1.0})
|
2019-06-18 06:50:25 +00:00
|
|
|
ml = 0
|
2019-07-01 23:42:25 +00:00
|
|
|
for seq, weight in wpa.items():
|
|
|
|
for i in range(len(seq)-1):
|
|
|
|
link = tuple(seq[i:i+2])
|
|
|
|
revlink = tuple(list(reversed(link)))
|
|
|
|
ml = max(
|
|
|
|
ml,
|
|
|
|
((
|
2019-07-03 20:52:11 +00:00
|
|
|
self._flow_speed.get(tuple([h1, *link, h2]), 0)
|
|
|
|
+
|
|
|
|
self._flow_speed.get(tuple([h2, *revlink, h1]), 0)
|
2019-07-01 23:42:25 +00:00
|
|
|
))*weight
|
|
|
|
)
|
2019-06-18 06:50:25 +00:00
|
|
|
return ml
|
|
|
|
|
2019-07-01 23:42:25 +00:00
|
|
|
def get_max_path_load(self, wpa):
|
2019-06-18 06:50:25 +00:00
|
|
|
ml = 0
|
2019-07-01 23:42:25 +00:00
|
|
|
if not isinstance(wpa, WeightedPathAggregate):
|
|
|
|
wpa = WeightedPathAggregate({tuple(wpa): 1.0})
|
|
|
|
for seq in wpa.keys():
|
|
|
|
for i in range(len(seq)-1):
|
|
|
|
link = tuple(seq[i:i+2])
|
|
|
|
ml = max(
|
|
|
|
ml,
|
|
|
|
self.link_usage[link]
|
|
|
|
)
|
2019-06-18 06:50:25 +00:00
|
|
|
return ml
|
|
|
|
|
|
|
|
def with_modified_path(self, newpath):
|
|
|
|
mod = self.copy()
|
|
|
|
oldpath = self.get_path(newpath[0], newpath[-1])
|
|
|
|
flowspeed = self.get_max_flow_speed(oldpath)
|
|
|
|
# removing old flows
|
|
|
|
for sw in mod.switches.values():
|
|
|
|
for k in sw._flow_speed.copy().keys():
|
|
|
|
if (
|
|
|
|
k[0] in [newpath[0], newpath[-1]]
|
|
|
|
and
|
|
|
|
k[3] in [newpath[0], newpath[-1]]
|
|
|
|
):
|
|
|
|
del sw._flow_speed[k]
|
|
|
|
# adding new flows
|
2019-07-01 23:42:25 +00:00
|
|
|
if not isinstance(newpath, WeightedPathAggregate):
|
|
|
|
newpath = WeightedPathAggregate({tuple(newpath): 1.0})
|
|
|
|
for newpath2, weight in newpath.items():
|
|
|
|
for i in range(1, len(newpath2)-1):
|
|
|
|
ky = newpath2[0], newpath2[i], newpath2[i+1], newpath2[-1]
|
|
|
|
rk = newpath2[-1], newpath2[i-1], newpath2[i], newpath2[0]
|
|
|
|
if ky[1].startswith('s') and ky[1] in mod.switches:
|
|
|
|
mod.switches[ky[1]]._flow_speed[ky] = (weight*flowspeed)/2
|
|
|
|
if rk[1].startswith('s') and rk[1] in mod.switches:
|
|
|
|
mod.switches[rk[1]]._flow_speed[rk] = (weight*flowspeed)/2
|
2019-06-18 06:50:25 +00:00
|
|
|
for sw in mod.switches.values():
|
|
|
|
sw._reinit()
|
|
|
|
self.paths[tuple([newpath[0], newpath[-1]])] = newpath
|
|
|
|
mod._reinit()
|
|
|
|
return mod
|
|
|
|
|
2019-06-28 08:33:02 +00:00
|
|
|
def copy_normalized(self):
|
|
|
|
net = self.copy()
|
|
|
|
for path in net.get_routes()._pairs.values():
|
|
|
|
net = net.with_modified_path(path)
|
|
|
|
return net
|
2019-07-03 20:52:11 +00:00
|
|
|
|
2019-07-01 23:42:25 +00:00
|
|
|
# Gvozdiev et al @ SIGCOMM2018, p. 94 => Algorithm 1
|
|
|
|
def copy_scaling(self, prev_prediction: Dict[Tuple[str], float], fixed_hedge: float, decay_multiplier: float) -> Tuple['SimulatableNetwork', Dict[Tuple[str], float]]:
|
|
|
|
if prev_prediction is None:
|
|
|
|
prev_prediction = dict()
|
|
|
|
net = self.copy()
|
|
|
|
next_prediction = dict()
|
|
|
|
for sw in net.switches.values():
|
|
|
|
for idtf, prev_value in sw._flow_speed.items():
|
|
|
|
scaled_est = prev_value*fixed_hedge
|
|
|
|
if scaled_est > prev_prediction.get(idtf, 0):
|
|
|
|
next_prediction[idtf] = scaled_est
|
|
|
|
else:
|
2019-07-03 20:52:11 +00:00
|
|
|
decay_prediction = prev_prediction.get(
|
|
|
|
idtf, 0) * decay_multiplier
|
2019-07-01 23:42:25 +00:00
|
|
|
next_prediction[idtf] = max(decay_prediction, scaled_est)
|
|
|
|
for sw in net.switches.values():
|
|
|
|
for idtf in sw._flow_speed.keys():
|
|
|
|
sw._flow_speed[idtf] = next_prediction[idtf]
|
|
|
|
sw._reinit()
|
|
|
|
net._reinit()
|
|
|
|
return net, next_prediction
|
2019-06-28 08:33:02 +00:00
|
|
|
|
2019-06-05 07:17:36 +00:00
|
|
|
|
2019-06-28 08:33:02 +00:00
|
|
|
class AbstractPathEvaluator:
|
2019-06-28 21:35:39 +00:00
|
|
|
def __init__(self, simulatable_network: SimulatableNetwork, keepdata: Optional[Any]):
|
2019-06-28 08:33:02 +00:00
|
|
|
self._simnet: SimulatableNetwork = simulatable_network
|
2019-06-28 21:35:39 +00:00
|
|
|
self._keepdata = keepdata
|
2019-06-28 08:33:02 +00:00
|
|
|
|
2019-06-28 21:35:39 +00:00
|
|
|
def __call__(self) -> Tuple['UsageStore', NoneType]: pass
|
2019-06-11 01:19:33 +00:00
|
|
|
|
2019-06-28 08:33:02 +00:00
|
|
|
|
|
|
|
class NullPathEvaluator(AbstractPathEvaluator):
|
2019-06-28 21:35:39 +00:00
|
|
|
def __call__(self) -> Tuple['UsageStore', NoneType]:
|
|
|
|
return UsageStore(), None
|
2019-06-28 08:33:02 +00:00
|
|
|
|
|
|
|
|
2019-07-01 23:42:25 +00:00
|
|
|
class LDRSinglePathEvaluator(AbstractPathEvaluator):
|
2019-06-28 21:35:39 +00:00
|
|
|
def __call__(self) -> Tuple['UsageStore', NoneType]:
|
|
|
|
return self._figure13(self._simnet.copy()), None
|
2019-06-18 06:50:25 +00:00
|
|
|
# return self._figure12(
|
|
|
|
# self._simnet.link_usage,
|
|
|
|
# self._simnet.link_flows
|
|
|
|
# )
|
|
|
|
|
2019-06-28 08:33:02 +00:00
|
|
|
def _figure13(self, net: SimulatableNetwork) -> 'UsageStore':
|
|
|
|
net = net.copy_normalized()
|
2019-06-18 06:50:25 +00:00
|
|
|
_tmp = self._figure12(
|
|
|
|
net.link_usage,
|
|
|
|
net.link_flows
|
|
|
|
)
|
|
|
|
target = len(_tmp._pairs)
|
|
|
|
changed = UsageStore()
|
|
|
|
newnet = None
|
|
|
|
while len(changed._pairs) < target:
|
|
|
|
aggregate_paths = self._figure12(
|
|
|
|
net.link_usage,
|
|
|
|
net.link_flows
|
|
|
|
)
|
|
|
|
loaded_paths = net.sort_by_max_flow_load([
|
2019-06-28 08:33:02 +00:00
|
|
|
*aggregate_paths._pairs.values()
|
2019-06-18 06:50:25 +00:00
|
|
|
])
|
|
|
|
most_loaded = None
|
|
|
|
for loaded in loaded_paths:
|
|
|
|
if not changed.contains((loaded[0], loaded[-1])):
|
|
|
|
most_loaded = loaded
|
|
|
|
break
|
|
|
|
if most_loaded is None:
|
|
|
|
break
|
|
|
|
newnet = net.with_modified_path(most_loaded)
|
2019-06-28 08:33:02 +00:00
|
|
|
old_path_for_most_loaded = net.get_path(
|
|
|
|
most_loaded[0], most_loaded[-1])
|
2019-06-18 06:50:25 +00:00
|
|
|
if (
|
|
|
|
newnet.get_max_path_load(most_loaded)
|
|
|
|
<=
|
|
|
|
net.get_max_path_load(old_path_for_most_loaded)
|
|
|
|
):
|
|
|
|
net = newnet
|
|
|
|
changed[(most_loaded[0], most_loaded[-1])] = most_loaded
|
2019-06-19 16:35:09 +00:00
|
|
|
return net.get_routes()
|
2019-06-18 06:50:25 +00:00
|
|
|
|
2019-06-28 08:33:02 +00:00
|
|
|
def _figure12(self, link_usage, link_flows) -> 'UsageStore':
|
2019-06-11 01:19:33 +00:00
|
|
|
processed = UsageStore()
|
2019-06-14 07:51:57 +00:00
|
|
|
currently_reserved = UsageStore()
|
2019-06-11 01:19:33 +00:00
|
|
|
ospf = Dijkstra(network_graph)
|
2019-06-15 06:31:09 +00:00
|
|
|
l1 = network_topo[0][:]
|
|
|
|
random.shuffle(l1)
|
|
|
|
for h1 in l1:
|
|
|
|
l2 = network_topo[0][:]
|
|
|
|
random.shuffle(l2)
|
|
|
|
for h2 in l2:
|
2019-06-11 01:19:33 +00:00
|
|
|
if h1 != h2 and not processed.contains((h1, h2)):
|
|
|
|
h1p = network_topo[0].index(h1)
|
|
|
|
h2p = network_topo[0].index(h2)
|
|
|
|
ospf_aggregate = ospf(h1)(h2)[0]
|
2019-06-18 06:50:25 +00:00
|
|
|
ospf_delay = sum([
|
|
|
|
link_usage[
|
2019-06-11 01:19:33 +00:00
|
|
|
tuple([*ospf_aggregate[i:i+2]])
|
|
|
|
] + hop_delay
|
2019-06-18 06:50:25 +00:00
|
|
|
for i in range(len(ospf_aggregate)-1)
|
|
|
|
])
|
2019-06-14 07:51:57 +00:00
|
|
|
aggregates = list(pop_apa_candidates[h1p][h2p])
|
|
|
|
random.shuffle(aggregates)
|
2019-06-18 06:50:25 +00:00
|
|
|
del h1p
|
|
|
|
del h2p
|
2019-06-11 01:19:33 +00:00
|
|
|
min_aggregate = None
|
|
|
|
min_aggregate_val = None
|
2019-06-18 06:50:25 +00:00
|
|
|
min_aggregate_links = list()
|
|
|
|
nalist = list()
|
|
|
|
na = 0 # flows in aggregate
|
|
|
|
omax = 0 # maximum overload
|
2019-06-11 01:19:33 +00:00
|
|
|
for aggregate in aggregates:
|
2019-06-18 06:50:25 +00:00
|
|
|
links = [
|
|
|
|
_sort_pair(*aggregate[i:i+2])
|
|
|
|
for i in range(len(aggregate)-1)
|
|
|
|
]
|
|
|
|
omax2 = 0
|
|
|
|
for link in links:
|
2019-07-01 23:42:25 +00:00
|
|
|
omax2 = max(omax2, link_usage[link])
|
2019-06-18 06:50:25 +00:00
|
|
|
if link not in nalist:
|
|
|
|
na += link_flows[link]
|
|
|
|
na += currently_reserved[link]
|
|
|
|
nalist.append(link)
|
2019-06-11 01:19:33 +00:00
|
|
|
pass # fraction of aggregate on path
|
2019-06-18 06:50:25 +00:00
|
|
|
if omax2 > omax:
|
|
|
|
omax = omax2
|
|
|
|
# del nalist
|
|
|
|
# for aggregate in aggregates:
|
|
|
|
links = [
|
|
|
|
_sort_pair(*aggregate[i:i+2])
|
|
|
|
for i in range(len(aggregate)-1)
|
|
|
|
]
|
2019-06-11 01:19:33 +00:00
|
|
|
eps2 = 0
|
|
|
|
xap = 1
|
2019-06-18 06:50:25 +00:00
|
|
|
for link in links:
|
|
|
|
dp = link_usage[link]+hop_delay
|
2019-06-11 01:19:33 +00:00
|
|
|
pass # path delay
|
|
|
|
eps2 += xap*(dp+((dp*m1)/ospf_delay))
|
|
|
|
exp0 = m2*omax
|
|
|
|
eps3 = 0
|
2019-06-18 06:50:25 +00:00
|
|
|
eps3 += sum([link_usage[link] for link in links])
|
2019-06-11 01:19:33 +00:00
|
|
|
aggregate_val = na*eps2+exp0+eps3
|
2019-06-18 06:50:25 +00:00
|
|
|
if (
|
|
|
|
min_aggregate_val is None
|
|
|
|
or
|
|
|
|
min_aggregate_val > aggregate_val
|
|
|
|
):
|
2019-06-11 01:19:33 +00:00
|
|
|
min_aggregate_val = aggregate_val
|
|
|
|
min_aggregate = aggregate
|
2019-06-18 06:50:25 +00:00
|
|
|
min_aggregate_links = links
|
2019-06-11 01:19:33 +00:00
|
|
|
processed[(h1, h2)] = min_aggregate
|
2019-06-18 06:50:25 +00:00
|
|
|
for link in min_aggregate_links:
|
2019-06-14 07:51:57 +00:00
|
|
|
currently_reserved[link] = 1+currently_reserved[link]
|
2019-06-11 01:19:33 +00:00
|
|
|
return processed
|
|
|
|
|
|
|
|
|
2019-07-01 23:42:25 +00:00
|
|
|
class LDRPathEvaluator(AbstractPathEvaluator):
|
|
|
|
def __call__(self) -> Tuple['UsageStore', NoneType]:
|
|
|
|
net, kd = self._simnet.copy_scaling(self._keepdata, 1.1, 0.98)
|
|
|
|
return self._figure12(net.copy()), kd
|
|
|
|
|
|
|
|
def _figure13(self, net: SimulatableNetwork) -> 'UsageStore':
|
|
|
|
net = net.copy_normalized()
|
|
|
|
_tmp = self._figure12(net.copy())
|
|
|
|
target = len(_tmp._pairs)
|
|
|
|
changed = UsageStore()
|
|
|
|
newnet = None
|
|
|
|
while len(changed._pairs) < target:
|
|
|
|
aggregate_paths = self._figure12(net.copy())
|
|
|
|
loaded_paths = net.sort_by_max_flow_load([
|
|
|
|
*aggregate_paths._pairs.values()
|
|
|
|
])
|
|
|
|
most_loaded = None
|
|
|
|
for loaded in loaded_paths:
|
|
|
|
if not changed.contains((loaded[0], loaded[-1])):
|
|
|
|
most_loaded = loaded
|
|
|
|
break
|
|
|
|
if most_loaded is None:
|
|
|
|
break
|
|
|
|
newnet = net.with_modified_path(most_loaded)
|
|
|
|
old_path_for_most_loaded = net.get_path(
|
|
|
|
most_loaded[0], most_loaded[-1])
|
|
|
|
if (
|
|
|
|
newnet.get_max_path_load(most_loaded)
|
|
|
|
<=
|
|
|
|
net.get_max_path_load(old_path_for_most_loaded)
|
|
|
|
):
|
|
|
|
net = newnet
|
|
|
|
changed[(most_loaded[0], most_loaded[-1])] = most_loaded
|
|
|
|
return net.get_routes()
|
|
|
|
|
|
|
|
def _figure12(self, net: SimulatableNetwork) -> 'UsageStore':
|
|
|
|
link_usage = net.link_usage
|
|
|
|
link_flows = net.link_flows
|
|
|
|
ospf = Dijkstra(network_graph)
|
|
|
|
opt_model = pulp.LpProblem("LDR", pulp.LpMinimize)
|
|
|
|
xap = pulp.LpVariable("xa", 0, 1)
|
|
|
|
processed = UsageStore(default=dict())
|
|
|
|
l1 = network_topo[0][:]
|
|
|
|
random.shuffle(l1)
|
|
|
|
sumaggregates = 0
|
|
|
|
for h1 in l1:
|
|
|
|
l2 = network_topo[0][:]
|
|
|
|
random.shuffle(l2)
|
|
|
|
for h2 in l2:
|
|
|
|
if h1 != h2 and not processed.contains((h1, h2)):
|
|
|
|
h1, h2 = _sort_pair(h1, h2)
|
|
|
|
opt_vars = dict()
|
|
|
|
processed[(h1, h2)] = opt_vars
|
|
|
|
h1p = network_topo[0].index(h1)
|
|
|
|
h2p = network_topo[0].index(h2)
|
|
|
|
ospf_aggregate = ospf(h1)(h2)[0]
|
|
|
|
ospf_delay = sum([
|
|
|
|
link_usage[
|
|
|
|
tuple([*ospf_aggregate[i:i+2]])
|
|
|
|
] + hop_delay
|
|
|
|
for i in range(len(ospf_aggregate)-1)
|
|
|
|
])
|
|
|
|
aggregates = list(pop_apa_candidates[h1p][h2p])[:]
|
|
|
|
for aggregate in aggregates[:]:
|
|
|
|
links = [
|
|
|
|
_sort_pair(*aggregate[i:i+2])
|
|
|
|
for i in range(len(aggregate)-1)
|
|
|
|
]
|
|
|
|
dp = sum(
|
|
|
|
[link_usage[link]+hop_delay for link in links])
|
|
|
|
if (ospf_delay == 0) or ((dp/ospf_delay) > apa_path_stretch):
|
|
|
|
del aggregates[aggregates.index(aggregate)]
|
|
|
|
random.shuffle(aggregates)
|
|
|
|
sumpinpa = 0
|
|
|
|
sumxappinpa = 0
|
|
|
|
na = sum([
|
|
|
|
link_flows[link]
|
|
|
|
for aggregate in aggregates
|
|
|
|
for link in [
|
|
|
|
_sort_pair(*aggregate[i:i+2])
|
|
|
|
for i in range(len(aggregate)-1)
|
|
|
|
]
|
|
|
|
])
|
|
|
|
na = max(na, MIN_POSITIVE_FLOAT)
|
|
|
|
for aggregate in aggregates:
|
|
|
|
links = [
|
|
|
|
_sort_pair(*aggregate[i:i+2])
|
|
|
|
for i in range(len(aggregate)-1)
|
|
|
|
]
|
|
|
|
xap = pulp.LpVariable(
|
|
|
|
f"xap_{h1}_{h2}__{'_'.join(aggregate)}", 0, 1)
|
|
|
|
opt_vars[tuple(aggregate)] = xap
|
|
|
|
omax = max(0, *[link_usage[link] for link in links])
|
|
|
|
olinks = sum([link_usage[link] for link in links])
|
|
|
|
dp = sum(
|
|
|
|
[link_usage[link]+hop_delay for link in links])
|
|
|
|
tiebreaker = (dp+((dp*m1)/ospf_delay))
|
|
|
|
|
|
|
|
sumxappinpa += xap
|
|
|
|
sumpinpa += xap*(tiebreaker+(m2*omax)+olinks)
|
|
|
|
opt_model += 0 <= xap <= 1
|
|
|
|
opt_model += sumxappinpa == 1
|
|
|
|
sumaggregates += na*sumpinpa
|
|
|
|
opt_model += sumaggregates
|
|
|
|
opt_model.solve()
|
|
|
|
solution = {
|
|
|
|
(tuple(k[4:].split('__')[0].split('_')), tuple(k[4:].split('__')[1].split('_'))):
|
|
|
|
v.varValue
|
|
|
|
for k, v in opt_model.variablesDict().items()
|
|
|
|
if k.startswith('xap_')
|
|
|
|
}
|
|
|
|
# print(opt_model)
|
|
|
|
d = dict()
|
|
|
|
for (tpl, path), weight in solution.items():
|
|
|
|
if tpl not in d:
|
|
|
|
d[tpl] = dict()
|
|
|
|
d[tpl][path] = weight
|
|
|
|
weighted_result = UsageStore(initial={
|
|
|
|
k: WeightedPathAggregate(v)
|
|
|
|
for k, v in d.items()
|
|
|
|
})
|
|
|
|
# print(weighted_result)
|
|
|
|
# opt_model.solve() # solving with CBC
|
|
|
|
# opt_model.solve(solver=pulp.GLPK_CMD()) # solving with Glpk
|
|
|
|
return weighted_result
|
|
|
|
|
|
|
|
|
|
|
|
class MinMaxSinglePathEvaluator(AbstractPathEvaluator):
|
2019-06-28 21:35:39 +00:00
|
|
|
def __call__(self) -> Tuple['UsageStore', NoneType]:
|
|
|
|
return self.minmax(self._simnet.copy()), None
|
2019-06-28 08:33:02 +00:00
|
|
|
|
|
|
|
def minmax(self, net: SimulatableNetwork) -> 'UsageStore':
|
|
|
|
net = net.copy_normalized()
|
|
|
|
processed = UsageStore()
|
|
|
|
l1 = network_topo[0][:]
|
|
|
|
random.shuffle(l1)
|
|
|
|
for h1 in l1:
|
|
|
|
l2 = network_topo[0][:]
|
|
|
|
random.shuffle(l2)
|
|
|
|
for h2 in l2:
|
|
|
|
if h1 != h2 and not processed.contains((h1, h2)):
|
|
|
|
h1p = network_topo[0].index(h1)
|
|
|
|
h2p = network_topo[0].index(h2)
|
|
|
|
aggregates = list(pop_apa_candidates[h1p][h2p])
|
|
|
|
configurations = list()
|
|
|
|
for aggregate in aggregates:
|
|
|
|
links = [
|
|
|
|
_sort_pair(*aggregate[i:i+2])
|
|
|
|
for i in range(len(aggregate)-1)
|
|
|
|
]
|
|
|
|
newnet = net.with_modified_path(aggregate)
|
2019-07-05 08:52:59 +00:00
|
|
|
configurations.append((
|
|
|
|
newnet.get_max_path_load(aggregate),
|
|
|
|
len(aggregate),
|
|
|
|
aggregate,
|
|
|
|
newnet
|
|
|
|
))
|
2019-06-28 08:33:02 +00:00
|
|
|
configurations.sort()
|
|
|
|
configuration = configurations[0]
|
|
|
|
processed[(h1, h2)] = configuration[2]
|
|
|
|
net = configuration[3]
|
|
|
|
return processed
|
|
|
|
|
|
|
|
|
|
|
|
path_evaluators = {
|
|
|
|
'ospf': NullPathEvaluator,
|
2019-07-01 23:42:25 +00:00
|
|
|
'ldr-single': LDRSinglePathEvaluator,
|
|
|
|
'minmax-single': MinMaxSinglePathEvaluator,
|
2019-06-28 08:33:02 +00:00
|
|
|
'ecmp': NullPathEvaluator,
|
|
|
|
'ldr': LDRPathEvaluator,
|
|
|
|
}
|
|
|
|
|
|
|
|
PathEvaluator = path_evaluators[routing_algo]
|
|
|
|
|
2019-07-05 08:52:59 +00:00
|
|
|
ppe = ProcessPoolExecutor(1)
|
|
|
|
last_process = None
|
|
|
|
last_process_lock = threading.Lock()
|
|
|
|
initialized_switch_writing_lock = threading.Lock()
|
2019-06-28 08:33:02 +00:00
|
|
|
|
2019-06-09 17:11:04 +00:00
|
|
|
class LatencyApp(app_manager.RyuApp):
|
|
|
|
OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
|
2019-06-05 07:17:36 +00:00
|
|
|
|
2019-06-09 17:11:04 +00:00
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
self.mac_to_port = {}
|
|
|
|
self.controllers = {}
|
2019-06-14 07:51:57 +00:00
|
|
|
self._paths = host_path_from_list(network_topo[0])
|
|
|
|
self._loads = usage_store_from_list(enumerate([]))
|
2019-06-11 01:19:33 +00:00
|
|
|
self.max_speed = bandwidth_from_list(enumerate(network_topo[2]))
|
|
|
|
self.last_speed = UsageStoreProxyFromCallableIterable(
|
2019-06-14 07:51:57 +00:00
|
|
|
AttrCallableIterable(
|
|
|
|
AttrCallableIterable(
|
|
|
|
self.controllers.values,
|
|
|
|
'_link_speed'
|
|
|
|
),
|
|
|
|
'as_regular_storage'
|
|
|
|
),
|
2019-06-11 01:19:33 +00:00
|
|
|
sum
|
2019-06-09 17:11:04 +00:00
|
|
|
)
|
2019-06-11 01:19:33 +00:00
|
|
|
self.flow_count = UsageStoreProxyFromCallableIterable(
|
|
|
|
AttrCallableIterable(self.controllers.values, '_link_rules'),
|
|
|
|
sum
|
|
|
|
)
|
|
|
|
self.ospf_dijkstra = Dijkstra(network_graph)
|
2019-06-28 08:33:02 +00:00
|
|
|
self.ecmp_group_ids = dict()
|
|
|
|
self.ecmp_group_id_buckets = dict()
|
2019-06-28 21:35:39 +00:00
|
|
|
self._last_process_data = None
|
2019-06-11 01:19:33 +00:00
|
|
|
self.interval = float(network_config['monitoring']['interval'])
|
|
|
|
hub.spawn(self._start_monitoring, self.interval)
|
2019-06-09 17:11:04 +00:00
|
|
|
|
|
|
|
@set_ev_cls(ofp_event.EventOFPStateChange, [MAIN_DISPATCHER, DEAD_DISPATCHER])
|
|
|
|
def _event_ofp_state_change(self, ev):
|
|
|
|
datapath = ev.datapath
|
|
|
|
dpid = datapath.id
|
|
|
|
if ev.state == MAIN_DISPATCHER:
|
|
|
|
if dpid not in self.controllers:
|
|
|
|
self.controllers[dpid] = LatencyController(datapath)
|
2019-06-11 01:19:33 +00:00
|
|
|
self.controllers[dpid].connected(self)
|
|
|
|
self._request_stats(datapath)
|
2019-06-09 17:11:04 +00:00
|
|
|
elif ev.state == DEAD_DISPATCHER:
|
|
|
|
if dpid in self.controllers:
|
|
|
|
self.controllers[dpid].disconnected()
|
|
|
|
del self.controllers[dpid]
|
|
|
|
|
|
|
|
def _start_monitoring(self, repeat):
|
2019-07-05 08:52:59 +00:00
|
|
|
global last_process
|
2019-06-09 17:11:04 +00:00
|
|
|
while True:
|
2019-07-05 08:52:59 +00:00
|
|
|
print("Monitoring loop called")
|
2019-06-09 17:11:04 +00:00
|
|
|
for ctrl in self.controllers.values():
|
2019-07-01 23:42:25 +00:00
|
|
|
hub.spawn(self._request_stats, ctrl._datapath)
|
|
|
|
hub.spawn(self._update_performance_state)
|
2019-07-05 08:52:59 +00:00
|
|
|
hub.spawn(self._trigger_path_refresh)
|
|
|
|
sleep(repeat)
|
|
|
|
|
|
|
|
def _trigger_path_refresh(self):
|
|
|
|
with last_process_lock:
|
|
|
|
# if last_process is not None:
|
|
|
|
# last_process.cancel()
|
|
|
|
last_process = ppe.submit(
|
|
|
|
PathEvaluator(
|
|
|
|
*self._monitored_data(),
|
|
|
|
self._last_process_data
|
|
|
|
)
|
|
|
|
)
|
|
|
|
last_process.add_done_callback(self._update_topo_done)
|
2019-06-09 17:11:04 +00:00
|
|
|
|
|
|
|
def _request_stats(self, datapath):
|
|
|
|
datapath.send_msg(
|
2019-06-11 01:19:33 +00:00
|
|
|
datapath.ofproto_parser.OFPFlowStatsRequest(
|
|
|
|
datapath
|
|
|
|
)
|
|
|
|
)
|
2019-06-09 17:11:04 +00:00
|
|
|
|
|
|
|
@set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
|
|
|
|
def _flow_stats_reply_handler(self, ev):
|
|
|
|
global last_process
|
|
|
|
dpid = ev.msg.datapath.id
|
|
|
|
ctrl = self.controllers.get(dpid)
|
|
|
|
if ctrl is not None:
|
|
|
|
ctrl._on_stats(ev, self)
|
|
|
|
|
2019-06-11 01:19:33 +00:00
|
|
|
def _monitored_data(self):
|
2019-06-18 06:50:25 +00:00
|
|
|
self._loads = self.max_speed.calculate(
|
2019-06-14 07:51:57 +00:00
|
|
|
self.last_speed.as_regular_storage,
|
|
|
|
lambda maxspeed, lastspeed: (
|
|
|
|
lastspeed / max(maxspeed, 0.0000000000001)
|
|
|
|
)
|
2019-06-11 01:19:33 +00:00
|
|
|
)
|
2019-06-18 06:50:25 +00:00
|
|
|
return (
|
|
|
|
SimulatableNetwork([
|
|
|
|
ctrl.simulatable
|
|
|
|
for ctrl in self.controllers.values()
|
|
|
|
], self._paths),
|
|
|
|
)
|
2019-06-09 17:11:04 +00:00
|
|
|
|
|
|
|
def _update_topo_done(self, future):
|
2019-07-05 08:52:59 +00:00
|
|
|
if not future.cancelled():
|
2019-06-28 21:35:39 +00:00
|
|
|
res, keepdata = future.result()
|
2019-06-11 01:19:33 +00:00
|
|
|
self._update_topo_done_successfully(res)
|
2019-06-28 21:35:39 +00:00
|
|
|
self._last_process_data = keepdata
|
2019-07-05 08:52:59 +00:00
|
|
|
self._update_performance_state()
|
|
|
|
else:
|
|
|
|
print(future.exception)
|
2019-06-09 17:11:04 +00:00
|
|
|
|
2019-06-11 01:19:33 +00:00
|
|
|
def _update_topo_done_successfully(self, processed):
|
|
|
|
print(f'-> Topo updated {datetime.datetime.now()}')
|
|
|
|
for (h1, h2), path in processed._pairs.items():
|
2019-06-14 07:51:57 +00:00
|
|
|
self._paths[(h1, h2)] = path
|
2019-06-11 01:19:33 +00:00
|
|
|
ip1 = id2ip(int(h1[1:])-1)
|
|
|
|
ip2 = id2ip(int(h2[1:])-1)
|
2019-07-01 23:42:25 +00:00
|
|
|
if isinstance(path, WeightedPathAggregate):
|
|
|
|
for src, weighted_destinations in path.transitions(h1).items():
|
|
|
|
if src.startswith('s'):
|
|
|
|
ctrl = self.controllers.get(int(src[1:]))
|
|
|
|
if ctrl is not None:
|
2019-07-03 20:52:11 +00:00
|
|
|
ctrl.add_weighted_latency(
|
|
|
|
self, ip1, ip2, weighted_destinations)
|
2019-07-01 23:42:25 +00:00
|
|
|
for src, weighted_destinations in path.transitions(h2).items():
|
|
|
|
if src.startswith('s'):
|
|
|
|
ctrl = self.controllers.get(int(src[1:]))
|
|
|
|
if ctrl is not None:
|
2019-07-03 20:52:11 +00:00
|
|
|
ctrl.add_weighted_latency(
|
|
|
|
self, ip2, ip1, weighted_destinations)
|
2019-07-01 23:42:25 +00:00
|
|
|
else:
|
|
|
|
for i in range(len(path)-1):
|
|
|
|
segment = path[i:i+2]
|
|
|
|
if segment[0].startswith('s'):
|
|
|
|
ctrl = self.controllers.get(int(segment[0][1:]))
|
|
|
|
if ctrl is not None:
|
|
|
|
ctrl.add_latency_segment(ip1, ip2, segment[1])
|
|
|
|
if segment[1].startswith('s'):
|
|
|
|
ctrl = self.controllers.get(int(segment[1][1:]))
|
|
|
|
if ctrl is not None:
|
|
|
|
ctrl.add_latency_segment(ip2, ip1, segment[0])
|
2019-06-14 07:51:57 +00:00
|
|
|
|
|
|
|
def _update_performance_state(self):
|
|
|
|
sio = StringIO()
|
|
|
|
print(f'@start.path', file=sio)
|
|
|
|
for _, path in sorted(self._paths._pairs.items()):
|
2019-07-01 23:42:25 +00:00
|
|
|
if isinstance(path, WeightedPathAggregate):
|
|
|
|
print(repr(dict(sorted(path.items()))), file=sio)
|
|
|
|
else:
|
|
|
|
print(repr({tuple(path): 1.0}), file=sio)
|
2019-06-14 07:51:57 +00:00
|
|
|
print(f'@end.path', file=sio)
|
|
|
|
print(f'@start.load', file=sio)
|
|
|
|
for (ne1, ne2), load in sorted(self._loads._pairs.items()):
|
|
|
|
print(repr((ne1, ne2, load)), file=sio)
|
|
|
|
print(f'@end.load', file=sio)
|
2019-07-02 03:14:03 +00:00
|
|
|
# print(sio.getvalue(), end='')
|
2019-07-01 23:42:25 +00:00
|
|
|
with open(f'{base_net_name}.state', 'w') as f:
|
2019-06-14 07:51:57 +00:00
|
|
|
v = sio.getvalue()
|
|
|
|
f.write(v)
|
2019-07-01 23:42:25 +00:00
|
|
|
# os.rename(f'{base_net_name}.state2', f'{base_net_name}.state')
|
2019-06-09 17:11:04 +00:00
|
|
|
|
2019-06-11 01:19:33 +00:00
|
|
|
|
|
|
|
def portno_from_list(l):
|
2019-06-09 17:11:04 +00:00
|
|
|
return UsageStore(dict([
|
|
|
|
(tuple(x[:2]), i+1)
|
2019-06-11 01:19:33 +00:00
|
|
|
for i, x in l
|
|
|
|
]))
|
|
|
|
|
|
|
|
|
|
|
|
def usage_store_from_list(l):
|
|
|
|
return UsageStore(dict([
|
|
|
|
(tuple(x[:2]), 0)
|
|
|
|
for _, x in l
|
2019-06-09 17:11:04 +00:00
|
|
|
]))
|
|
|
|
|
|
|
|
|
2019-06-14 07:51:57 +00:00
|
|
|
def host_path_from_list(l):
|
|
|
|
return UsageStore(dict([
|
|
|
|
((x, y), list())
|
|
|
|
for x in l
|
|
|
|
for y in l
|
|
|
|
if x != y
|
2019-07-05 08:52:59 +00:00
|
|
|
]), default=list())
|
2019-06-14 07:51:57 +00:00
|
|
|
|
|
|
|
|
2019-06-09 17:11:04 +00:00
|
|
|
def bandwidth_from_list(l):
|
|
|
|
return UsageStore(dict([
|
|
|
|
(tuple(x[:2]), x[2])
|
2019-06-11 01:19:33 +00:00
|
|
|
for _, x in l
|
2019-06-09 17:11:04 +00:00
|
|
|
]))
|
|
|
|
|
|
|
|
|
2019-06-11 01:19:33 +00:00
|
|
|
class PairSorterMixin:
|
|
|
|
def _sort_pair(self, a, b):
|
|
|
|
return _sort_pair(a, b)
|
|
|
|
|
|
|
|
|
|
|
|
class UsageStore(PairSorterMixin):
|
2019-06-28 21:35:39 +00:00
|
|
|
def __init__(self, initial=dict(), default=0):
|
2019-06-09 17:11:04 +00:00
|
|
|
self._pairs = dict()
|
|
|
|
for k, v in initial.items():
|
|
|
|
self[k] = v
|
2019-06-28 21:35:39 +00:00
|
|
|
self._default = default
|
2019-06-09 17:11:04 +00:00
|
|
|
|
|
|
|
def __getitem__(self, idx) -> float:
|
|
|
|
srtd = self._sort_pair(*idx)
|
2019-06-14 07:51:57 +00:00
|
|
|
pairs = self._pairs
|
|
|
|
if srtd not in pairs:
|
2019-06-28 21:35:39 +00:00
|
|
|
return self._default
|
2019-06-05 07:17:36 +00:00
|
|
|
else:
|
2019-06-14 07:51:57 +00:00
|
|
|
return pairs[srtd]
|
2019-06-05 07:17:36 +00:00
|
|
|
|
2019-06-11 01:19:33 +00:00
|
|
|
def reverse_lookup(self, val):
|
|
|
|
return [k for k, v in self._pairs.items() if v == val]
|
|
|
|
|
|
|
|
def calculate(self, other, operation):
|
|
|
|
return UsageStore({
|
|
|
|
k: operation(v, other[k])
|
|
|
|
for k, v in self._pairs.items()
|
|
|
|
})
|
|
|
|
|
|
|
|
def contains(self, idx) -> bool:
|
|
|
|
return self._sort_pair(*idx) in self._pairs.keys()
|
|
|
|
|
2019-06-09 17:11:04 +00:00
|
|
|
def __setitem__(self, idx, val: float):
|
|
|
|
self._pairs[self._sort_pair(*idx)] = val
|
2019-06-11 01:19:33 +00:00
|
|
|
|
2019-07-01 23:42:25 +00:00
|
|
|
def __str__(self):
|
|
|
|
return repr(self)
|
|
|
|
|
2019-06-11 01:19:33 +00:00
|
|
|
def __repr__(self):
|
|
|
|
return f"{type(self).__name__}({repr(self._pairs)})"
|
|
|
|
|
2019-06-18 06:50:25 +00:00
|
|
|
def copy(self):
|
2019-07-01 23:42:25 +00:00
|
|
|
return type(self)(self._pairs.copy(), self._default)
|
2019-06-18 06:50:25 +00:00
|
|
|
|
|
|
|
def to_regular_storage(self):
|
|
|
|
return self.copy()
|
|
|
|
|
|
|
|
@property
|
|
|
|
def as_regular_storage(self):
|
|
|
|
return self.to_regular_storage()
|
|
|
|
|
|
|
|
|
|
|
|
class ValueGetterCallable:
|
|
|
|
def __init__(self, value):
|
|
|
|
self._val = value
|
|
|
|
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
|
|
return self._val
|
|
|
|
|
2019-06-11 01:19:33 +00:00
|
|
|
|
|
|
|
class UsageStoreProxyFromCallableIterable(PairSorterMixin):
|
2019-06-28 21:35:39 +00:00
|
|
|
def __init__(self, clb, get_behaviour=None, default=0):
|
2019-06-11 01:19:33 +00:00
|
|
|
self._callable = clb
|
|
|
|
self._behaviour = get_behaviour
|
2019-06-18 06:50:25 +00:00
|
|
|
self.is_frozen = False
|
2019-06-28 21:35:39 +00:00
|
|
|
self._default = default
|
2019-06-11 01:19:33 +00:00
|
|
|
|
|
|
|
def __getitem__(self, idx) -> float:
|
|
|
|
r = list()
|
|
|
|
srtd = self._sort_pair(*idx)
|
|
|
|
for us in self._callable():
|
|
|
|
v = us._pairs.get(srtd)
|
|
|
|
if v is not None:
|
|
|
|
r.append(v)
|
|
|
|
if self._behaviour is None:
|
2019-06-28 21:35:39 +00:00
|
|
|
return self._default
|
2019-06-11 01:19:33 +00:00
|
|
|
else:
|
|
|
|
return self._behaviour(r)
|
|
|
|
|
|
|
|
def __setitem__(self, idx, val: float):
|
|
|
|
alo = False
|
|
|
|
srtd = self._sort_pair(*idx)
|
|
|
|
for us in self._callable():
|
|
|
|
v = us._pairs.get(srtd)
|
|
|
|
if v is not None:
|
|
|
|
us._pairs[srtd] = val
|
|
|
|
alo = True
|
|
|
|
if not alo:
|
|
|
|
raise ValueError(
|
|
|
|
f"There's no existing connection between" +
|
|
|
|
f" {idx[0]} and {idx[1]} " +
|
|
|
|
f"to update"
|
|
|
|
)
|
|
|
|
|
|
|
|
def to_regular_storage(self):
|
|
|
|
keys = set([
|
|
|
|
key
|
|
|
|
for us in self._callable()
|
|
|
|
for key in us._pairs.keys()
|
|
|
|
])
|
|
|
|
return UsageStore(dict([(key, self[key]) for key in keys]))
|
|
|
|
|
2019-06-14 07:51:57 +00:00
|
|
|
@property
|
|
|
|
def as_regular_storage(self):
|
|
|
|
return self.to_regular_storage()
|
|
|
|
|
2019-06-18 06:50:25 +00:00
|
|
|
@property
|
|
|
|
def frozen(self):
|
|
|
|
frz = type(self)(
|
|
|
|
ValueGetterCallable(self._callable()),
|
|
|
|
self._behaviour
|
|
|
|
)
|
|
|
|
frz.is_frozen = True
|
|
|
|
return frz
|
|
|
|
|
2019-06-14 07:51:57 +00:00
|
|
|
|
|
|
|
class UsageStoreProxyFromFlowDict(UsageStore):
|
|
|
|
def __init__(self, dct, aggregator=sum):
|
|
|
|
self._flows = dct
|
|
|
|
self._agg = aggregator
|
|
|
|
|
|
|
|
@property
|
|
|
|
def _pairs(self):
|
|
|
|
d = dict()
|
|
|
|
l = [(self._sort_pair(k[1], k[2]), v) for k, v in self._flows.items()]
|
|
|
|
for k, _ in l:
|
|
|
|
if k not in d:
|
|
|
|
d[k] = list()
|
|
|
|
for k, v in l:
|
|
|
|
d[k].append(v)
|
|
|
|
for k in d.keys():
|
|
|
|
d[k] = self._agg(d[k])
|
|
|
|
return d
|
|
|
|
|
|
|
|
def to_regular_storage(self):
|
|
|
|
return UsageStore(self._pairs)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def as_regular_storage(self):
|
|
|
|
return self.to_regular_storage()
|
|
|
|
|
2019-06-11 01:19:33 +00:00
|
|
|
|
|
|
|
class AttrCallableIterable:
|
|
|
|
def __init__(self, callablecollection, attr):
|
|
|
|
self._cc = callablecollection
|
|
|
|
self._attr = attr
|
|
|
|
|
|
|
|
def __call__(self):
|
|
|
|
for i in self._cc():
|
|
|
|
yield getattr(i, self._attr)
|
|
|
|
yield from EMPTY_ITER
|
2019-06-18 06:50:25 +00:00
|
|
|
|
|
|
|
|
|
|
|
class DictAggregator:
|
|
|
|
def __init__(self, ds):
|
|
|
|
self._ds = ds
|
|
|
|
|
|
|
|
def __getitem__(self, q):
|
|
|
|
for d in self._ds:
|
|
|
|
if q in d:
|
|
|
|
return d[q]
|
|
|
|
return self._ds[q]
|
|
|
|
|
|
|
|
def get(self, q, e=None):
|
|
|
|
for d in self._ds:
|
|
|
|
if q in d:
|
|
|
|
return d[q]
|
|
|
|
return e
|
|
|
|
|
|
|
|
def keys(self):
|
|
|
|
return {
|
|
|
|
k
|
|
|
|
for d in self._ds
|
|
|
|
for k in d.keys()
|
|
|
|
}
|
|
|
|
|
|
|
|
def values(self):
|
|
|
|
return {
|
|
|
|
v
|
|
|
|
for d in self._ds
|
|
|
|
for v in d.values()
|
|
|
|
}
|
|
|
|
|
|
|
|
def items(self):
|
|
|
|
return {
|
|
|
|
i
|
|
|
|
for d in self._ds
|
|
|
|
for i in d.items()
|
|
|
|
}
|
|
|
|
|
|
|
|
def to_regular_dict(self):
|
|
|
|
return dict(self.items())
|
|
|
|
|
|
|
|
def copy(self):
|
|
|
|
return type(self)(self._ds.copy())
|
2019-07-01 23:42:25 +00:00
|
|
|
|
|
|
|
|
|
|
|
class WeightedPathAggregate:
|
|
|
|
def __init__(self, weighted: Dict[Tuple[str], float] = dict()):
|
|
|
|
self._subscriptable = list(weighted.keys())[0]
|
|
|
|
self._weighted = weighted
|
|
|
|
|
|
|
|
def items(self):
|
|
|
|
return self._weighted.items()
|
|
|
|
|
|
|
|
def keys(self):
|
|
|
|
return self._weighted.keys()
|
|
|
|
|
|
|
|
def values(self):
|
|
|
|
return self._weighted.values()
|
2019-07-03 20:52:11 +00:00
|
|
|
|
2019-07-01 23:42:25 +00:00
|
|
|
def __getitem__(self, val: int) -> str:
|
|
|
|
return self._subscriptable[val]
|
|
|
|
|
|
|
|
def transitions(self, in_src):
|
|
|
|
transitions_algo_weighted = dict()
|
|
|
|
for path, weight in self.items():
|
2019-07-03 20:52:11 +00:00
|
|
|
if path[0] != in_src:
|
2019-07-01 23:42:25 +00:00
|
|
|
path = tuple(list(reversed(list(path))))
|
|
|
|
links = [
|
|
|
|
path[i:i+2]
|
|
|
|
for i in range(len(path)-1)
|
|
|
|
]
|
|
|
|
for ne1, ne2 in links:
|
|
|
|
if ne1.startswith('s'):
|
|
|
|
if ne1 not in transitions_algo_weighted:
|
|
|
|
transitions_algo_weighted[ne1] = dict()
|
|
|
|
if weight > transitions_algo_weighted[ne1].get(ne2, NEG_INF):
|
|
|
|
transitions_algo_weighted[ne1][ne2] = weight
|
|
|
|
transitions_reweighted = dict()
|
|
|
|
for src, d in transitions_algo_weighted.items():
|
|
|
|
if src not in transitions_reweighted:
|
|
|
|
transitions_reweighted[src] = dict()
|
|
|
|
sw = sum(d.values())
|
2019-07-03 20:52:11 +00:00
|
|
|
if sw <= 0:
|
2019-07-01 23:42:25 +00:00
|
|
|
for dst, w in d.items():
|
|
|
|
transitions_reweighted[src][dst] = 1.0/len(d)
|
|
|
|
else:
|
|
|
|
for dst, w in d.items():
|
|
|
|
transitions_reweighted[src][dst] = w/sw
|
|
|
|
return transitions_reweighted
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
return f"{type(self).__name__}({repr(self._weighted)})"
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return str(self)
|