diff options
Diffstat (limited to 'mikrotik.py')
-rw-r--r-- | mikrotik.py | 159 |
1 files changed, 80 insertions, 79 deletions
diff --git a/mikrotik.py b/mikrotik.py index b739132..42a6eeb 100644 --- a/mikrotik.py +++ b/mikrotik.py @@ -1,98 +1,99 @@ -from dataclasses import dataclass, field +from dataclasses import dataclass import threading -import serial -import devices -import time +import fabric import os import re @dataclass -class MikroTikSerialDevice: - """This is a horrible, horrible way of doing this - pretty much anything else would be better, for example connecting - over SSH instead of serial - - Even using a serial connection like this is an abomination - Please seriously do not do this, this is some necromancy, like it doesn't - log out of the serial connection properly so make sure nothing else is plugged - into the switch serial port - - I am doing it this way because I do not understand mikrotik scripting - """ - device: str = os.environ["MIKROTIK_DEVICE"] - user: str = os.environ["MIKROTIK_USER"] - passwd: str = os.environ["MIKROTIK_PASS"] +class MikroTikSSHDevice: def __post_init__(self): self.interfaces = {} - self.last_return = {} for i in os.environ["MIKROTIK_INTERFACES"].split(";"): self.interfaces.__setitem__(*i.split(",")) self.is_being_polled = threading.Event() - self.poe_cache = {interface: {} for interface in self.interfaces} - - def get_poe_info(self, interface): - # fetch from cache so that multiple processes don't try to access serial at the same time - # this means that the same MikroTikSerialDevice object must be used for multiple threads - # if another thread is accessing the critical region, return from cache - if self.is_being_polled.is_set(): - fetched_cache = self.poe_cache[interface] - fetched_cache["cached"] = True - return fetched_cache - + self.interface_groups_cache = {} + + self.interface_groups = [] + temp = [] + for i, interface_name in enumerate(self.interfaces.keys(), 1): + temp.append(interface_name) + if i % 4 == 0: + self.interface_groups.append(tuple(temp)) + temp = [] + + # make sure we have some cache + # also use as sanity-test + for interface_group in self.interface_groups: + self._poll_interface_group(interface_group) + + def _get_conn(self): + return fabric.Connection( + user = os.environ["MIKROTIK_USER"], + host = os.environ["MIKROTIK_DEVICE"], + connect_kwargs = {"key_filename": os.environ["MIKROTIK_KEY_PATH"]} + ) + + def _get_interfacegroup_containing(self, interface_name): + for interface_group in self.interface_groups: + if interface_name in interface_group: + return interface_group + + def _poll_interface_group(self, interface_group): self.is_being_polled.set() - self.ser = serial.Serial(self.device, int(os.environ["MIKROTIK_BAUD"]), timeout=0.25) - - if self.last_return == {}: - self._push_serial("") - self._push_serial(self.user) - self._push_serial(self.passwd) - self._push_serial("/interface/ethernet/poe/monitor %s" % interface) - time.sleep(0.05) - self.ser.write(bytes("q", 'ISO-8859-1')) - out = self._read() - self.ser.close() + result = self._get_conn().run("/interface/ethernet/poe/monitor %s once" % ",".join(interface_group), hide = True) self.is_being_polled.clear() - - return self._post_out(out, interface) - - def _push_serial(self, text): - time.sleep(0.05) - self.ser.write(bytes(text + "\r\n", 'ISO-8859-1')) - time.sleep(0.05) - - def _read(self): - return self.ser.readlines() - - def _post_out(self, out, interface, was_cached = False): - d = {} - for line in out: - line = line.decode().strip() - # print("line:", line) - if line.startswith("poe"): - d.__setitem__(*line.split(": ")) - - # also fetch from cache if it returned nothing - if d == {}: - fetched_cache = self.poe_cache[interface] - fetched_cache["cached"] = True - return fetched_cache - - self.last_return = d - self.poe_cache[interface] = d - d["cached"] = was_cached - return d - - - + parsed_result = self._parse_result(result) + self.interface_groups_cache[interface_group] = parsed_result + # print("Cached group:", interface_group) + return parsed_result + + def _parse_result(self, result): + r = result.stdout + # print(r) + s = [re.split(r" +", row.rstrip())[1:] for row in r.split("\r\n")][:-2] + out = {i: {} for i in s[0][1:]} + off_interfaces = set() + for row in s[1:]: + column_decrimator = 0 + output_name = row[0][:-1] + # print(output_name) + + for i, interface_name in enumerate(out.keys(), 0): + # print("off_interfaces:", off_interfaces) + # print(i, interface_name, row[1:][i]) + if interface_name in off_interfaces: + # print("Skipping '%s' for %s..." % (output_name, interface_name)) + column_decrimator += 1 + else: + out[interface_name][output_name] = row[1:][i - column_decrimator] + + if output_name == "poe-out-status": + if row[1:][i] != "powered-on": + # print("Adding %s to off interfaces" % interface_name) + off_interfaces.add(interface_name) + return out + + # i refuse to use async programming + def get_interface_poe(self, interface_name): + interface_group = self._get_interfacegroup_containing(interface_name) + if self.is_being_polled.is_set(): + result = self.interface_groups_cache[interface_group][interface_name] + result["cached"] = True + else: + result = self._poll_interface_group(interface_group)[interface_name] + result["cached"] = False + + return result if __name__ == "__main__": if not os.path.exists(os.path.join("/app", ".docker")): import dotenv dotenv.load_dotenv(dotenv_path = "power.env") - mikrotik = MikroTikSerialDevice() - for i in range(10): - for interface in mikrotik.interfaces: - print(interface, mikrotik.get_poe_info(interface)) - + import time + mikrotik = MikroTikSSHDevice() + print("Ready.") + for interface_name in mikrotik.interfaces.keys(): + threading.Thread(target = lambda i: print(i, mikrotik.get_interface_poe(i)), args = (interface_name, )).start() + time.sleep(1)
\ No newline at end of file |