diff options
Diffstat (limited to 'mikrotik.py')
-rw-r--r-- | mikrotik.py | 99 |
1 files changed, 0 insertions, 99 deletions
diff --git a/mikrotik.py b/mikrotik.py deleted file mode 100644 index 42a6eeb..0000000 --- a/mikrotik.py +++ /dev/null @@ -1,99 +0,0 @@ -from dataclasses import dataclass -import threading -import fabric -import os -import re - -@dataclass -class MikroTikSSHDevice: - - def __post_init__(self): - self.interfaces = {} - for i in os.environ["MIKROTIK_INTERFACES"].split(";"): - self.interfaces.__setitem__(*i.split(",")) - self.is_being_polled = threading.Event() - self.interface_groups_cache = {} - - self.interface_groups = [] - temp = [] - for i, interface_name in enumerate(self.interfaces.keys(), 1): - temp.append(interface_name) - if i % 4 == 0: - self.interface_groups.append(tuple(temp)) - temp = [] - - # make sure we have some cache - # also use as sanity-test - for interface_group in self.interface_groups: - self._poll_interface_group(interface_group) - - def _get_conn(self): - return fabric.Connection( - user = os.environ["MIKROTIK_USER"], - host = os.environ["MIKROTIK_DEVICE"], - connect_kwargs = {"key_filename": os.environ["MIKROTIK_KEY_PATH"]} - ) - - def _get_interfacegroup_containing(self, interface_name): - for interface_group in self.interface_groups: - if interface_name in interface_group: - return interface_group - - def _poll_interface_group(self, interface_group): - self.is_being_polled.set() - result = self._get_conn().run("/interface/ethernet/poe/monitor %s once" % ",".join(interface_group), hide = True) - self.is_being_polled.clear() - parsed_result = self._parse_result(result) - self.interface_groups_cache[interface_group] = parsed_result - # print("Cached group:", interface_group) - return parsed_result - - def _parse_result(self, result): - r = result.stdout - # print(r) - s = [re.split(r" +", row.rstrip())[1:] for row in r.split("\r\n")][:-2] - out = {i: {} for i in s[0][1:]} - off_interfaces = set() - for row in s[1:]: - column_decrimator = 0 - output_name = row[0][:-1] - # print(output_name) - - for i, interface_name in enumerate(out.keys(), 0): - # print("off_interfaces:", off_interfaces) - # print(i, interface_name, row[1:][i]) - if interface_name in off_interfaces: - # print("Skipping '%s' for %s..." % (output_name, interface_name)) - column_decrimator += 1 - else: - out[interface_name][output_name] = row[1:][i - column_decrimator] - - if output_name == "poe-out-status": - if row[1:][i] != "powered-on": - # print("Adding %s to off interfaces" % interface_name) - off_interfaces.add(interface_name) - return out - - # i refuse to use async programming - def get_interface_poe(self, interface_name): - interface_group = self._get_interfacegroup_containing(interface_name) - if self.is_being_polled.is_set(): - result = self.interface_groups_cache[interface_group][interface_name] - result["cached"] = True - else: - result = self._poll_interface_group(interface_group)[interface_name] - result["cached"] = False - - return result - -if __name__ == "__main__": - if not os.path.exists(os.path.join("/app", ".docker")): - import dotenv - dotenv.load_dotenv(dotenv_path = "power.env") - - import time - mikrotik = MikroTikSSHDevice() - print("Ready.") - for interface_name in mikrotik.interfaces.keys(): - threading.Thread(target = lambda i: print(i, mikrotik.get_interface_poe(i)), args = (interface_name, )).start() - time.sleep(1)
\ No newline at end of file |