1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
from dataclasses import dataclass, field
import threading
import serial
import devices
import time
import os
import re
@dataclass
class MikroTikSerialDevice:
"""This is a horrible, horrible way of doing this
pretty much anything else would be better, for example connecting
over SSH instead of serial
Even using a serial connection like this is an abomination
Please seriously do not do this, this is some necromancy, like it doesn't
log out of the serial connection properly so make sure nothing else is plugged
into the switch serial port
I am doing it this way because I do not understand mikrotik scripting
"""
device: str = os.environ["MIKROTIK_DEVICE"]
user: str = os.environ["MIKROTIK_USER"]
passwd: str = os.environ["MIKROTIK_PASS"]
def __post_init__(self):
self.interfaces = {}
self.last_return = {}
for i in os.environ["MIKROTIK_INTERFACES"].split(","):
self.interfaces.__setitem__(*i.split(":"))
self.is_being_polled = threading.Event()
self.poe_cache = {interface: {} for interface in self.interfaces}
def get_poe_info(self, interface):
print(self.poe_cache)
if self.is_being_polled.is_set():
fetched_cache = self.poe_cache[interface]
fetched_cache["cached"] = True
return fetched_cache
self.is_being_polled.set()
self.ser = serial.Serial(self.device, int(os.environ["MIKROTIK_BAUD"]), timeout=0.25)
if self.last_return == {}:
self._push_serial("")
self._push_serial(self.user)
self._push_serial(self.passwd)
self._push_serial("/interface/ethernet/poe/monitor %s" % interface)
time.sleep(0.05)
self.ser.write(bytes("q", 'ISO-8859-1'))
out = self._read()
self.ser.close()
self.is_being_polled.clear()
return self._post_out(out, interface)
def _push_serial(self, text):
time.sleep(0.05)
self.ser.write(bytes(text + "\r\n", 'ISO-8859-1'))
time.sleep(0.05)
def _read(self):
return self.ser.readlines()
def _post_out(self, out, interface, was_cached = False):
d = {}
for line in out:
line = line.decode().strip()
# print("line:", line)
if line.startswith("poe"):
d.__setitem__(*line.split(": "))
self.last_return = d
self.poe_cache[interface] = d
d["cached"] = was_cached
return d
if __name__ == "__main__":
if not os.path.exists(os.path.join("/app", ".docker")):
import dotenv
dotenv.load_dotenv(dotenv_path = "power.env")
mikrotik = MikroTikSerialDevice()
for interface in mikrotik.interfaces:
print(interface, mikrotik.get_poe_info(interface))
|