aboutsummaryrefslogtreecommitdiffstats
path: root/mikrotik.py
diff options
context:
space:
mode:
Diffstat (limited to 'mikrotik.py')
-rw-r--r--mikrotik.py99
1 files changed, 0 insertions, 99 deletions
diff --git a/mikrotik.py b/mikrotik.py
deleted file mode 100644
index 42a6eeb..0000000
--- a/mikrotik.py
+++ /dev/null
@@ -1,99 +0,0 @@
-from dataclasses import dataclass
-import threading
-import fabric
-import os
-import re
-
-@dataclass
-class MikroTikSSHDevice:
-
- def __post_init__(self):
- self.interfaces = {}
- for i in os.environ["MIKROTIK_INTERFACES"].split(";"):
- self.interfaces.__setitem__(*i.split(","))
- self.is_being_polled = threading.Event()
- self.interface_groups_cache = {}
-
- self.interface_groups = []
- temp = []
- for i, interface_name in enumerate(self.interfaces.keys(), 1):
- temp.append(interface_name)
- if i % 4 == 0:
- self.interface_groups.append(tuple(temp))
- temp = []
-
- # make sure we have some cache
- # also use as sanity-test
- for interface_group in self.interface_groups:
- self._poll_interface_group(interface_group)
-
- def _get_conn(self):
- return fabric.Connection(
- user = os.environ["MIKROTIK_USER"],
- host = os.environ["MIKROTIK_DEVICE"],
- connect_kwargs = {"key_filename": os.environ["MIKROTIK_KEY_PATH"]}
- )
-
- def _get_interfacegroup_containing(self, interface_name):
- for interface_group in self.interface_groups:
- if interface_name in interface_group:
- return interface_group
-
- def _poll_interface_group(self, interface_group):
- self.is_being_polled.set()
- result = self._get_conn().run("/interface/ethernet/poe/monitor %s once" % ",".join(interface_group), hide = True)
- self.is_being_polled.clear()
- parsed_result = self._parse_result(result)
- self.interface_groups_cache[interface_group] = parsed_result
- # print("Cached group:", interface_group)
- return parsed_result
-
- def _parse_result(self, result):
- r = result.stdout
- # print(r)
- s = [re.split(r" +", row.rstrip())[1:] for row in r.split("\r\n")][:-2]
- out = {i: {} for i in s[0][1:]}
- off_interfaces = set()
- for row in s[1:]:
- column_decrimator = 0
- output_name = row[0][:-1]
- # print(output_name)
-
- for i, interface_name in enumerate(out.keys(), 0):
- # print("off_interfaces:", off_interfaces)
- # print(i, interface_name, row[1:][i])
- if interface_name in off_interfaces:
- # print("Skipping '%s' for %s..." % (output_name, interface_name))
- column_decrimator += 1
- else:
- out[interface_name][output_name] = row[1:][i - column_decrimator]
-
- if output_name == "poe-out-status":
- if row[1:][i] != "powered-on":
- # print("Adding %s to off interfaces" % interface_name)
- off_interfaces.add(interface_name)
- return out
-
- # i refuse to use async programming
- def get_interface_poe(self, interface_name):
- interface_group = self._get_interfacegroup_containing(interface_name)
- if self.is_being_polled.is_set():
- result = self.interface_groups_cache[interface_group][interface_name]
- result["cached"] = True
- else:
- result = self._poll_interface_group(interface_group)[interface_name]
- result["cached"] = False
-
- return result
-
-if __name__ == "__main__":
- if not os.path.exists(os.path.join("/app", ".docker")):
- import dotenv
- dotenv.load_dotenv(dotenv_path = "power.env")
-
- import time
- mikrotik = MikroTikSSHDevice()
- print("Ready.")
- for interface_name in mikrotik.interfaces.keys():
- threading.Thread(target = lambda i: print(i, mikrotik.get_interface_poe(i)), args = (interface_name, )).start()
- time.sleep(1) \ No newline at end of file