177 lines
5.4 KiB
Python
177 lines
5.4 KiB
Python
#!/usr/bin/env python3
|
|
# -*- encoding: utf-8 -*-
|
|
|
|
import sys
|
|
import json
|
|
from time import sleep
|
|
from configparser import ConfigParser
|
|
from ryu.lib import hub
|
|
from ryu.base import app_manager
|
|
from ryu.lib import ofctl_v1_3
|
|
from ryu.lib.packet import ether_types
|
|
from ryu.lib.packet import ethernet
|
|
from ryu.ofproto import ofproto_v1_3
|
|
from ryu.controller.handler import set_ev_cls
|
|
from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER, DEAD_DISPATCHER
|
|
from ryu.controller import ofp_event
|
|
from ryu.lib.packet import packet
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from concurrent.futures import ProcessPoolExecutor
|
|
|
|
print(f"loading network {sys.argv[2]}")
|
|
network_topo = ''
|
|
with open(sys.argv[2]) as f:
|
|
network_topo = json.loads(f.read())
|
|
# ryu uses eventlet, which, on some versions, breaks pathlib's read_text
|
|
|
|
print(f"reading network configuration variables.ini")
|
|
network_config = ConfigParser()
|
|
with open('variables.ini') as f:
|
|
network_config.read_string(f.read())
|
|
|
|
|
|
class LatencyController:
|
|
def __init__(self, datapath):
|
|
super().__init__()
|
|
self._datapath = datapath
|
|
self._sw = f"s{self._datapath.id}"
|
|
l = [
|
|
x for x in network_topo[2]
|
|
if x[0] == self._sw or x[1] == self._sw
|
|
]
|
|
self._links = usage_store_from_list(l)
|
|
self._bandwidth = bandwidth_from_list(l)
|
|
|
|
def connected(self):
|
|
print(f"Switch connected: {self._sw}")
|
|
|
|
def disconnected(self):
|
|
print(f"Switch disconnected: {self._sw}")
|
|
|
|
def add_flow(self, datapath, priority, match, actions, buffer_id=None, idle_timeout=0, hard_timeout=0):
|
|
ofproto = datapath.ofproto
|
|
parser = datapath.ofproto_parser
|
|
|
|
inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,
|
|
actions)]
|
|
if buffer_id:
|
|
mod = parser.OFPFlowMod(datapath=datapath, buffer_id=buffer_id,
|
|
priority=priority, match=match,
|
|
instructions=inst, idle_timeout=idle_timeout, hard_timeout=hard_timeout)
|
|
else:
|
|
mod = parser.OFPFlowMod(datapath=datapath, priority=priority,
|
|
match=match, instructions=inst, idle_timeout=idle_timeout, hard_timeout=hard_timeout)
|
|
datapath.send_msg(mod)
|
|
|
|
def _on_stats(self, ev, la):
|
|
pass
|
|
|
|
|
|
ppe = ProcessPoolExecutor(1)
|
|
last_process = None
|
|
|
|
|
|
class LatencyApp(app_manager.RyuApp):
|
|
OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.mac_to_port = {}
|
|
self.controllers = {}
|
|
self.last_speed = UsageStore()
|
|
self.last_xfer = UsageStore()
|
|
hub.spawn(
|
|
self._start_monitoring,
|
|
float(network_config['monitoring']['interval'])
|
|
)
|
|
|
|
@set_ev_cls(ofp_event.EventOFPStateChange, [MAIN_DISPATCHER, DEAD_DISPATCHER])
|
|
def _event_ofp_state_change(self, ev):
|
|
datapath = ev.datapath
|
|
dpid = datapath.id
|
|
if ev.state == MAIN_DISPATCHER:
|
|
if dpid not in self.controllers:
|
|
self.controllers[dpid] = LatencyController(datapath)
|
|
self.controllers[dpid].connected()
|
|
elif ev.state == DEAD_DISPATCHER:
|
|
if dpid in self.controllers:
|
|
self.controllers[dpid].disconnected()
|
|
del self.controllers[dpid]
|
|
|
|
def _start_monitoring(self, repeat):
|
|
while True:
|
|
for ctrl in self.controllers.values():
|
|
self._request_stats(ctrl._datapath)
|
|
sleep(repeat)
|
|
|
|
def _request_stats(self, datapath):
|
|
datapath.send_msg(
|
|
datapath.ofproto_parser.OFPFlowStatsRequest(datapath))
|
|
|
|
@set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
|
|
def _flow_stats_reply_handler(self, ev):
|
|
global last_process
|
|
dpid = ev.msg.datapath.id
|
|
ctrl = self.controllers.get(dpid)
|
|
if ctrl is not None:
|
|
ctrl._on_stats(ev, self)
|
|
if last_process is not None:
|
|
last_process.cancel()
|
|
last_process = ppe.submit(
|
|
self._update_topo,
|
|
self._monitoring_data(ev)
|
|
)
|
|
last_process.add_done_callback(self._update_topo_done)
|
|
|
|
def _monitoring_data(self, ev):
|
|
print(ev)
|
|
|
|
def _update_topo(self, ev):
|
|
print(ev)
|
|
|
|
def _update_topo_done(self, future):
|
|
if not future.cancelled():
|
|
print('_update_topo_done')
|
|
print(future.result())
|
|
|
|
|
|
def usage_store_from_list(l):
|
|
return UsageStore(dict([
|
|
(tuple(x[:2]), i+1)
|
|
for i, x in enumerate(l)
|
|
]))
|
|
|
|
|
|
def bandwidth_from_list(l):
|
|
return UsageStore(dict([
|
|
(tuple(x[:2]), x[2])
|
|
for x in l
|
|
]))
|
|
|
|
|
|
class UsageStore:
|
|
def __init__(self, initial=dict()):
|
|
self._pairs = dict()
|
|
for k, v in initial.items():
|
|
self[k] = v
|
|
|
|
def _sort_pair(self, a, b):
|
|
if a[0] == 'h' and b[0] == 's':
|
|
return b, a # h5,s7 => s7,h5
|
|
elif a[0] != b[0]:
|
|
return a, b # s7,h5 => s7,h5
|
|
elif int(a[1:]) > int(b[1:]):
|
|
return b, a # s7,s5 => s5,s7
|
|
else:
|
|
return a, b # s5,s7 => s5,s7
|
|
|
|
def __getitem__(self, idx) -> float:
|
|
srtd = self._sort_pair(*idx)
|
|
if srtd not in self._pairs:
|
|
return 0
|
|
else:
|
|
return self._pairs[srtd]
|
|
|
|
def __setitem__(self, idx, val: float):
|
|
self._pairs[self._sort_pair(*idx)] = val
|