1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
from dataclasses import dataclass
import pymysql
import os
@dataclass
class PowerDatabase:
host: str = None
user: str = "root"
passwd: str = None
db: str = "power"
port: int = 3306
def __enter__(self):
if self.passwd is None:
self.passwd = os.environ["MYSQL_ROOT_PASSWORD"]
if self.host is None:
self.host = os.environ["MYSQL_HOST"]
try:
self.__connection = self.__get_connection()
except Exception as e:
print(e)
if e.args[0] == 1049:
self.__connection = self.__build_db()
with self.__connection.cursor() as cursor:
if "TASMOTA_DEVICES" in os.environ.keys():
for host, username, password in self.get_tasmota_devices():
cursor.execute("""
INSERT INTO tasmota_devices (host, username, password)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE username = %s, password = %s;
""", (host, username, password, username, password))
return self
def __exit__(self, type, value, traceback):
self.__connection.commit()
self.__connection.close()
def __get_connection(self):
return pymysql.connect(
host = self.host,
port = self.port,
user = self.user,
passwd = self.passwd,
charset = "utf8mb4",
database = self.db
)
def __build_db(self):
print("Building database...")
self.__connection = pymysql.connect(
host = self.host,
port = self.port,
user = self.user,
passwd = self.passwd,
charset = "utf8mb4",
)
with self.__connection.cursor() as cursor:
# unsafe:
cursor.execute("CREATE DATABASE %s" % self.db)
cursor.execute("USE %s" % self.db)
cursor.execute("""
CREATE TABLE tasmota_devices (
host VARCHAR(25) NOT NULL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
password VARCHAR(50) NOT NULL
);
""")
cursor.execute("""
CREATE TABLE watt_readings (
host VARCHAR(25) NOT NULL,
`datetime` DATETIME DEFAULT NOW(),
reading FLOAT(24) NOT NULL,
FOREIGN KEY (host) REFERENCES tasmota_devices (host),
PRIMARY KEY (host, `datetime`)
);
""")
cursor.execute("""
CREATE TABLE kwh_readings (
host VARCHAR(25) NOT NULL,
`datetime` DATETIME DEFAULT NOW(),
reading FLOAT(24) NOT NULL,
FOREIGN KEY (host) REFERENCES tasmota_devices (host),
PRIMARY KEY (host, `datetime`)
);
""")
self.__connection.commit()
return self.__connection
def get_tasmota_devices(self):
o = []
for d in os.environ["TASMOTA_DEVICES"].split(","):
o.append(d.split(":"))
return o
def append_watt_readings(self, host, reading):
with self.__connection.cursor() as cursor:
cursor.execute("DELETE FROM watt_readings WHERE `datetime` < DATE_SUB(NOW(), INTERVAL 1 DAY);")
cursor.execute("INSERT INTO watt_readings (host, reading) VALUES (%s, %s);", (host, reading))
def append_kwh_readings(self, host, reading):
with self.__connection.cursor() as cursor:
cursor.execute("INSERT INTO kwh_readings (host, reading) VALUES (%s, %s);", (host, reading))
def get_last_plug_readings(self):
plugs = [i[0] for i in self.get_tasmota_devices()]
with self.__connection.cursor() as cursor:
cursor.execute("SELECT host, MAX(datetime) FROM watt_readings WHERE host IN %s GROUP BY host;", (plugs, ))
plugtimes = cursor.fetchall()
readings = []
for host, datetime in plugtimes:
cursor.execute("SELECT host, datetime, reading FROM watt_readings WHERE host = %s AND datetime = %s;", (host, datetime))
readings.append(cursor.fetchone())
return readings
def get_watt_chart(self):
with self.__connection.cursor() as cursor:
cursor.execute("SELECT DISTINCT host FROM watt_readings;")
hosts = [i[0] for i in cursor.fetchall()]
out = {}
for host in hosts:
cursor.execute("SELECT datetime, reading FROM watt_readings WHERE host = %s ORDER BY datetime;", (host, ))
out[host] = cursor.fetchall()
return out
if __name__ == "__main__":
if not os.path.exists(".docker"):
import dotenv
dotenv.load_dotenv(dotenv_path = "power.env")
host = "srv.athome"
else:
host = None
with PowerDatabase(host = host) as db:
print(db.get_watt_chart())
|