aboutsummaryrefslogtreecommitdiffstats
path: root/mikrotik.py
diff options
context:
space:
mode:
Diffstat (limited to 'mikrotik.py')
-rw-r--r--mikrotik.py159
1 files changed, 80 insertions, 79 deletions
diff --git a/mikrotik.py b/mikrotik.py
index b739132..42a6eeb 100644
--- a/mikrotik.py
+++ b/mikrotik.py
@@ -1,98 +1,99 @@
-from dataclasses import dataclass, field
+from dataclasses import dataclass
import threading
-import serial
-import devices
-import time
+import fabric
import os
import re
@dataclass
-class MikroTikSerialDevice:
- """This is a horrible, horrible way of doing this
- pretty much anything else would be better, for example connecting
- over SSH instead of serial
-
- Even using a serial connection like this is an abomination
- Please seriously do not do this, this is some necromancy, like it doesn't
- log out of the serial connection properly so make sure nothing else is plugged
- into the switch serial port
-
- I am doing it this way because I do not understand mikrotik scripting
- """
- device: str = os.environ["MIKROTIK_DEVICE"]
- user: str = os.environ["MIKROTIK_USER"]
- passwd: str = os.environ["MIKROTIK_PASS"]
+class MikroTikSSHDevice:
def __post_init__(self):
self.interfaces = {}
- self.last_return = {}
for i in os.environ["MIKROTIK_INTERFACES"].split(";"):
self.interfaces.__setitem__(*i.split(","))
self.is_being_polled = threading.Event()
- self.poe_cache = {interface: {} for interface in self.interfaces}
-
- def get_poe_info(self, interface):
- # fetch from cache so that multiple processes don't try to access serial at the same time
- # this means that the same MikroTikSerialDevice object must be used for multiple threads
- # if another thread is accessing the critical region, return from cache
- if self.is_being_polled.is_set():
- fetched_cache = self.poe_cache[interface]
- fetched_cache["cached"] = True
- return fetched_cache
-
+ self.interface_groups_cache = {}
+
+ self.interface_groups = []
+ temp = []
+ for i, interface_name in enumerate(self.interfaces.keys(), 1):
+ temp.append(interface_name)
+ if i % 4 == 0:
+ self.interface_groups.append(tuple(temp))
+ temp = []
+
+ # make sure we have some cache
+ # also use as sanity-test
+ for interface_group in self.interface_groups:
+ self._poll_interface_group(interface_group)
+
+ def _get_conn(self):
+ return fabric.Connection(
+ user = os.environ["MIKROTIK_USER"],
+ host = os.environ["MIKROTIK_DEVICE"],
+ connect_kwargs = {"key_filename": os.environ["MIKROTIK_KEY_PATH"]}
+ )
+
+ def _get_interfacegroup_containing(self, interface_name):
+ for interface_group in self.interface_groups:
+ if interface_name in interface_group:
+ return interface_group
+
+ def _poll_interface_group(self, interface_group):
self.is_being_polled.set()
- self.ser = serial.Serial(self.device, int(os.environ["MIKROTIK_BAUD"]), timeout=0.25)
-
- if self.last_return == {}:
- self._push_serial("")
- self._push_serial(self.user)
- self._push_serial(self.passwd)
- self._push_serial("/interface/ethernet/poe/monitor %s" % interface)
- time.sleep(0.05)
- self.ser.write(bytes("q", 'ISO-8859-1'))
- out = self._read()
- self.ser.close()
+ result = self._get_conn().run("/interface/ethernet/poe/monitor %s once" % ",".join(interface_group), hide = True)
self.is_being_polled.clear()
-
- return self._post_out(out, interface)
-
- def _push_serial(self, text):
- time.sleep(0.05)
- self.ser.write(bytes(text + "\r\n", 'ISO-8859-1'))
- time.sleep(0.05)
-
- def _read(self):
- return self.ser.readlines()
-
- def _post_out(self, out, interface, was_cached = False):
- d = {}
- for line in out:
- line = line.decode().strip()
- # print("line:", line)
- if line.startswith("poe"):
- d.__setitem__(*line.split(": "))
-
- # also fetch from cache if it returned nothing
- if d == {}:
- fetched_cache = self.poe_cache[interface]
- fetched_cache["cached"] = True
- return fetched_cache
-
- self.last_return = d
- self.poe_cache[interface] = d
- d["cached"] = was_cached
- return d
-
-
-
+ parsed_result = self._parse_result(result)
+ self.interface_groups_cache[interface_group] = parsed_result
+ # print("Cached group:", interface_group)
+ return parsed_result
+
+ def _parse_result(self, result):
+ r = result.stdout
+ # print(r)
+ s = [re.split(r" +", row.rstrip())[1:] for row in r.split("\r\n")][:-2]
+ out = {i: {} for i in s[0][1:]}
+ off_interfaces = set()
+ for row in s[1:]:
+ column_decrimator = 0
+ output_name = row[0][:-1]
+ # print(output_name)
+
+ for i, interface_name in enumerate(out.keys(), 0):
+ # print("off_interfaces:", off_interfaces)
+ # print(i, interface_name, row[1:][i])
+ if interface_name in off_interfaces:
+ # print("Skipping '%s' for %s..." % (output_name, interface_name))
+ column_decrimator += 1
+ else:
+ out[interface_name][output_name] = row[1:][i - column_decrimator]
+
+ if output_name == "poe-out-status":
+ if row[1:][i] != "powered-on":
+ # print("Adding %s to off interfaces" % interface_name)
+ off_interfaces.add(interface_name)
+ return out
+
+ # i refuse to use async programming
+ def get_interface_poe(self, interface_name):
+ interface_group = self._get_interfacegroup_containing(interface_name)
+ if self.is_being_polled.is_set():
+ result = self.interface_groups_cache[interface_group][interface_name]
+ result["cached"] = True
+ else:
+ result = self._poll_interface_group(interface_group)[interface_name]
+ result["cached"] = False
+
+ return result
if __name__ == "__main__":
if not os.path.exists(os.path.join("/app", ".docker")):
import dotenv
dotenv.load_dotenv(dotenv_path = "power.env")
- mikrotik = MikroTikSerialDevice()
- for i in range(10):
- for interface in mikrotik.interfaces:
- print(interface, mikrotik.get_poe_info(interface))
-
+ import time
+ mikrotik = MikroTikSSHDevice()
+ print("Ready.")
+ for interface_name in mikrotik.interfaces.keys():
+ threading.Thread(target = lambda i: print(i, mikrotik.get_interface_poe(i)), args = (interface_name, )).start()
+ time.sleep(1) \ No newline at end of file