diff options
Diffstat (limited to 'mikrotik.py')
-rw-r--r-- | mikrotik.py | 99 |
1 files changed, 99 insertions, 0 deletions
diff --git a/mikrotik.py b/mikrotik.py new file mode 100644 index 0000000..42a6eeb --- /dev/null +++ b/mikrotik.py @@ -0,0 +1,99 @@ +from dataclasses import dataclass +import threading +import fabric +import os +import re + +@dataclass +class MikroTikSSHDevice: + + def __post_init__(self): + self.interfaces = {} + for i in os.environ["MIKROTIK_INTERFACES"].split(";"): + self.interfaces.__setitem__(*i.split(",")) + self.is_being_polled = threading.Event() + self.interface_groups_cache = {} + + self.interface_groups = [] + temp = [] + for i, interface_name in enumerate(self.interfaces.keys(), 1): + temp.append(interface_name) + if i % 4 == 0: + self.interface_groups.append(tuple(temp)) + temp = [] + + # make sure we have some cache + # also use as sanity-test + for interface_group in self.interface_groups: + self._poll_interface_group(interface_group) + + def _get_conn(self): + return fabric.Connection( + user = os.environ["MIKROTIK_USER"], + host = os.environ["MIKROTIK_DEVICE"], + connect_kwargs = {"key_filename": os.environ["MIKROTIK_KEY_PATH"]} + ) + + def _get_interfacegroup_containing(self, interface_name): + for interface_group in self.interface_groups: + if interface_name in interface_group: + return interface_group + + def _poll_interface_group(self, interface_group): + self.is_being_polled.set() + result = self._get_conn().run("/interface/ethernet/poe/monitor %s once" % ",".join(interface_group), hide = True) + self.is_being_polled.clear() + parsed_result = self._parse_result(result) + self.interface_groups_cache[interface_group] = parsed_result + # print("Cached group:", interface_group) + return parsed_result + + def _parse_result(self, result): + r = result.stdout + # print(r) + s = [re.split(r" +", row.rstrip())[1:] for row in r.split("\r\n")][:-2] + out = {i: {} for i in s[0][1:]} + off_interfaces = set() + for row in s[1:]: + column_decrimator = 0 + output_name = row[0][:-1] + # print(output_name) + + for i, interface_name in enumerate(out.keys(), 0): + # print("off_interfaces:", off_interfaces) + # print(i, interface_name, row[1:][i]) + if interface_name in off_interfaces: + # print("Skipping '%s' for %s..." % (output_name, interface_name)) + column_decrimator += 1 + else: + out[interface_name][output_name] = row[1:][i - column_decrimator] + + if output_name == "poe-out-status": + if row[1:][i] != "powered-on": + # print("Adding %s to off interfaces" % interface_name) + off_interfaces.add(interface_name) + return out + + # i refuse to use async programming + def get_interface_poe(self, interface_name): + interface_group = self._get_interfacegroup_containing(interface_name) + if self.is_being_polled.is_set(): + result = self.interface_groups_cache[interface_group][interface_name] + result["cached"] = True + else: + result = self._poll_interface_group(interface_group)[interface_name] + result["cached"] = False + + return result + +if __name__ == "__main__": + if not os.path.exists(os.path.join("/app", ".docker")): + import dotenv + dotenv.load_dotenv(dotenv_path = "power.env") + + import time + mikrotik = MikroTikSSHDevice() + print("Ready.") + for interface_name in mikrotik.interfaces.keys(): + threading.Thread(target = lambda i: print(i, mikrotik.get_interface_poe(i)), args = (interface_name, )).start() + time.sleep(1)
\ No newline at end of file |