from dataclasses import dataclass, field import threading import serial import devices import time import os import re @dataclass class MikroTikSerialDevice: """This is a horrible, horrible way of doing this pretty much anything else would be better, for example connecting over SSH instead of serial Even using a serial connection like this is an abomination Please seriously do not do this, this is some necromancy, like it doesn't log out of the serial connection properly so make sure nothing else is plugged into the switch serial port I am doing it this way because I do not understand mikrotik scripting """ device: str = os.environ["MIKROTIK_DEVICE"] user: str = os.environ["MIKROTIK_USER"] passwd: str = os.environ["MIKROTIK_PASS"] def __post_init__(self): self.interfaces = {} self.last_return = {} for i in os.environ["MIKROTIK_INTERFACES"].split(";"): self.interfaces.__setitem__(*i.split(",")) self.is_being_polled = threading.Event() self.poe_cache = {interface: {} for interface in self.interfaces} def get_poe_info(self, interface): # fetch from cache so that multiple processes don't try to access serial at the same time # this means that the same MikroTikSerialDevice object must be used for multiple threads # if another thread is accessing the critical region, return from cache if self.is_being_polled.is_set(): fetched_cache = self.poe_cache[interface] fetched_cache["cached"] = True return fetched_cache self.is_being_polled.set() self.ser = serial.Serial(self.device, int(os.environ["MIKROTIK_BAUD"]), timeout=0.25) if self.last_return == {}: self._push_serial("") self._push_serial(self.user) self._push_serial(self.passwd) self._push_serial("/interface/ethernet/poe/monitor %s" % interface) time.sleep(0.05) self.ser.write(bytes("q", 'ISO-8859-1')) out = self._read() self.ser.close() self.is_being_polled.clear() return self._post_out(out, interface) def _push_serial(self, text): time.sleep(0.05) self.ser.write(bytes(text + "\r\n", 'ISO-8859-1')) time.sleep(0.05) def _read(self): return self.ser.readlines() def _post_out(self, out, interface, was_cached = False): d = {} for line in out: line = line.decode().strip() # print("line:", line) if line.startswith("poe"): d.__setitem__(*line.split(": ")) # also fetch from cache if it returned nothing if d == {}: fetched_cache = self.poe_cache[interface] fetched_cache["cached"] = True return fetched_cache self.last_return = d self.poe_cache[interface] = d d["cached"] = was_cached return d if __name__ == "__main__": if not os.path.exists(os.path.join("/app", ".docker")): import dotenv dotenv.load_dotenv(dotenv_path = "power.env") mikrotik = MikroTikSerialDevice() for i in range(10): for interface in mikrotik.interfaces: print(interface, mikrotik.get_poe_info(interface))