1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
from dataclasses import dataclass, field
import threading
import serial
import devices
import time
import os
import re
@dataclass
class MikroTikSerialDevice:
"""This is a horrible, horrible way of doing this
pretty much anything else would be better, for example connecting
over SSH instead of serial
Even using a serial connection like this is an abomination
Please seriously do not do this, this is some necromancy, like it doesn't
log out of the serial connection properly so make sure nothing else is plugged
into the switch serial port
I am doing it this way because I do not understand mikrotik scripting
"""
device: str = os.environ["MIKROTIK_DEVICE"]
user: str = os.environ["MIKROTIK_USER"]
passwd: str = os.environ["MIKROTIK_PASS"]
def __post_init__(self):
self.interfaces = {}
self.last_return = {}
for i in os.environ["MIKROTIK_INTERFACES"].split(";"):
self.interfaces.__setitem__(*i.split(","))
self.is_being_polled = threading.Event()
self.poe_cache = {interface: {} for interface in self.interfaces}
def get_poe_info(self, interface):
# fetch from cache so that multiple processes don't try to access serial at the same time
# this means that the same MikroTikSerialDevice object must be used for multiple threads
# if another thread is accessing the critical region, return from cache
if self.is_being_polled.is_set():
fetched_cache = self.poe_cache[interface]
fetched_cache["cached"] = True
return fetched_cache
self.is_being_polled.set()
self.ser = serial.Serial(self.device, int(os.environ["MIKROTIK_BAUD"]), timeout=0.25)
if self.last_return == {}:
self._push_serial("")
self._push_serial(self.user)
self._push_serial(self.passwd)
self._push_serial("/interface/ethernet/poe/monitor %s" % interface)
time.sleep(0.05)
self.ser.write(bytes("q", 'ISO-8859-1'))
out = self._read()
self.ser.close()
self.is_being_polled.clear()
return self._post_out(out, interface)
def _push_serial(self, text):
time.sleep(0.05)
self.ser.write(bytes(text + "\r\n", 'ISO-8859-1'))
time.sleep(0.05)
def _read(self):
return self.ser.readlines()
def _post_out(self, out, interface, was_cached = False):
d = {}
for line in out:
line = line.decode().strip()
# print("line:", line)
if line.startswith("poe"):
d.__setitem__(*line.split(": "))
# also fetch from cache if it returned nothing
if d == {}:
fetched_cache = self.poe_cache[interface]
fetched_cache["cached"] = True
return fetched_cache
self.last_return = d
self.poe_cache[interface] = d
d["cached"] = was_cached
return d
if __name__ == "__main__":
if not os.path.exists(os.path.join("/app", ".docker")):
import dotenv
dotenv.load_dotenv(dotenv_path = "power.env")
mikrotik = MikroTikSerialDevice()
for i in range(10):
for interface in mikrotik.interfaces:
print(interface, mikrotik.get_poe_info(interface))
|