aboutsummaryrefslogtreecommitdiffstats
path: root/mikrotik.py
blob: b7391320bcb1d51af2d03cbb33b5e8ecc9d160a7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from dataclasses import dataclass, field
import threading
import serial
import devices
import time
import os
import re

@dataclass
class MikroTikSerialDevice:
    """This is a horrible, horrible way of doing this
    pretty much anything else would be better, for example connecting
    over SSH instead of serial

    Even using a serial connection like this is an abomination
    Please seriously do not do this, this is some necromancy, like it doesn't
    log out of the serial connection properly so make sure nothing else is plugged
    into the switch serial port

    I am doing it this way because I do not understand mikrotik scripting
    """
    device: str = os.environ["MIKROTIK_DEVICE"]
    user: str = os.environ["MIKROTIK_USER"]
    passwd: str = os.environ["MIKROTIK_PASS"]

    def __post_init__(self):
        self.interfaces = {}
        self.last_return = {}
        for i in os.environ["MIKROTIK_INTERFACES"].split(";"):
            self.interfaces.__setitem__(*i.split(","))
        self.is_being_polled = threading.Event()
        self.poe_cache = {interface: {} for interface in self.interfaces}

    def get_poe_info(self, interface):
        # fetch from cache so that multiple processes don't try to access serial at the same time
        # this means that the same MikroTikSerialDevice object must be used for multiple threads
        # if another thread is accessing the critical region, return from cache
        if self.is_being_polled.is_set():
            fetched_cache = self.poe_cache[interface]
            fetched_cache["cached"] = True
            return fetched_cache

        self.is_being_polled.set()
        self.ser = serial.Serial(self.device, int(os.environ["MIKROTIK_BAUD"]), timeout=0.25)

        if self.last_return == {}:
            self._push_serial("")
            self._push_serial(self.user)
            self._push_serial(self.passwd)
        self._push_serial("/interface/ethernet/poe/monitor %s" % interface)
        time.sleep(0.05)
        self.ser.write(bytes("q", 'ISO-8859-1'))
        out = self._read()
        self.ser.close()
        self.is_being_polled.clear()

        return self._post_out(out, interface)

    def _push_serial(self, text):
        time.sleep(0.05)
        self.ser.write(bytes(text + "\r\n", 'ISO-8859-1'))
        time.sleep(0.05)

    def _read(self):
        return self.ser.readlines()

    def _post_out(self, out, interface, was_cached = False):
        d = {}
        for line in out:
            line = line.decode().strip()
            # print("line:", line)
            if line.startswith("poe"):
                d.__setitem__(*line.split(": "))

        # also fetch from cache if it returned nothing
        if d == {}:
            fetched_cache = self.poe_cache[interface]
            fetched_cache["cached"] = True
            return fetched_cache 

        self.last_return = d        
        self.poe_cache[interface] = d
        d["cached"] = was_cached
        return d




if __name__ == "__main__":
    if not os.path.exists(os.path.join("/app", ".docker")):
        import dotenv
        dotenv.load_dotenv(dotenv_path = "power.env")

    mikrotik = MikroTikSerialDevice()
    for i in range(10):
        for interface in mikrotik.interfaces:
            print(interface, mikrotik.get_poe_info(interface))