aboutsummaryrefslogtreecommitdiffstats
path: root/mikrotik.py
blob: 42a6eeb777cd46d93db3e18fa7e0f7859aad629b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from dataclasses import dataclass
import threading
import fabric
import os
import re

@dataclass
class MikroTikSSHDevice:

    def __post_init__(self):
        self.interfaces = {}
        for i in os.environ["MIKROTIK_INTERFACES"].split(";"):
            self.interfaces.__setitem__(*i.split(","))
        self.is_being_polled = threading.Event()
        self.interface_groups_cache = {}

        self.interface_groups = []
        temp = []
        for i, interface_name in enumerate(self.interfaces.keys(), 1):
            temp.append(interface_name)
            if i % 4 == 0:
                self.interface_groups.append(tuple(temp))
                temp = []
        
        # make sure we have some cache
        # also use as sanity-test
        for interface_group in self.interface_groups:
            self._poll_interface_group(interface_group)

    def _get_conn(self):
        return fabric.Connection(
            user = os.environ["MIKROTIK_USER"],
            host = os.environ["MIKROTIK_DEVICE"],
            connect_kwargs = {"key_filename": os.environ["MIKROTIK_KEY_PATH"]}
        )

    def _get_interfacegroup_containing(self, interface_name):
        for interface_group in self.interface_groups:
            if interface_name in interface_group:
                return interface_group

    def _poll_interface_group(self, interface_group):
        self.is_being_polled.set()
        result = self._get_conn().run("/interface/ethernet/poe/monitor %s once" % ",".join(interface_group), hide = True)
        self.is_being_polled.clear()
        parsed_result = self._parse_result(result)
        self.interface_groups_cache[interface_group] = parsed_result
        # print("Cached group:", interface_group)
        return parsed_result

    def _parse_result(self, result):
        r = result.stdout
        # print(r)
        s = [re.split(r" +", row.rstrip())[1:] for row in r.split("\r\n")][:-2]
        out = {i: {} for i in s[0][1:]}
        off_interfaces = set()
        for row in s[1:]:
            column_decrimator = 0
            output_name = row[0][:-1]
            # print(output_name)

            for i, interface_name in enumerate(out.keys(), 0):
                # print("off_interfaces:", off_interfaces)
                # print(i, interface_name, row[1:][i])
                if interface_name in off_interfaces:
                    # print("Skipping '%s' for %s..." % (output_name, interface_name))
                    column_decrimator += 1
                else:
                    out[interface_name][output_name] = row[1:][i - column_decrimator]

                if output_name == "poe-out-status":
                    if row[1:][i] != "powered-on":
                        # print("Adding %s to off interfaces" % interface_name)
                        off_interfaces.add(interface_name)
        return out

    # i refuse to use async programming
    def get_interface_poe(self, interface_name):
        interface_group = self._get_interfacegroup_containing(interface_name)
        if self.is_being_polled.is_set():
            result = self.interface_groups_cache[interface_group][interface_name]
            result["cached"] = True
        else:
            result = self._poll_interface_group(interface_group)[interface_name]
            result["cached"] = False
        
        return result

if __name__ == "__main__":
    if not os.path.exists(os.path.join("/app", ".docker")):
        import dotenv
        dotenv.load_dotenv(dotenv_path = "power.env")

    import time
    mikrotik = MikroTikSSHDevice()
    print("Ready.")
    for interface_name in mikrotik.interfaces.keys():
        threading.Thread(target = lambda i: print(i, mikrotik.get_interface_poe(i)), args = (interface_name, )).start()
        time.sleep(1)