diff options
Diffstat (limited to 'mqtt-client/mqtt-client.py')
-rw-r--r-- | mqtt-client/mqtt-client.py | 59 |
1 files changed, 59 insertions, 0 deletions
diff --git a/mqtt-client/mqtt-client.py b/mqtt-client/mqtt-client.py new file mode 100644 index 0000000..9b0df83 --- /dev/null +++ b/mqtt-client/mqtt-client.py @@ -0,0 +1,59 @@ +import paho.mqtt.client as paho +from influxdb_client import InfluxDBClient, Point, WritePrecision +from influxdb_client.client.write_api import SYNCHRONOUS +import json +import os + +class MQTTClient: + def __init__(self): + self.influxc = InfluxDBClient( + url = "http://%s:8086" % INFLUXDB_HOST, + token = os.environ["DOCKER_INFLUXDB_INIT_ADMIN_TOKEN"], + org = os.environ["DOCKER_INFLUXDB_INIT_ORG"] + ) + self.influxc.ping() + + self.mqttc = paho.Client('power-listener', clean_session = True) + self.mqttc.on_connect = self._on_connect_cb + self.mqttc.on_message = self._on_message_cb + + self.mqttc.username_pw_set(os.environ["MQTT_USER"], password = os.environ["MQTT_PASSWD"]) + self.mqttc.connect(MQTT_HOST, 1883, 60) + self.mqttc.loop_forever() + + def _on_connect_cb(self, mqtt, userdata, flags, rc): + print("Connected to broker") + self.mqttc.subscribe("tele/+/SENSOR") + + def _on_message_cb(self, mqtt, userdata, msg): + print('Topic: {0} | Message: {1}'.format(msg.topic, msg.payload)) + + if "Tasmota" in msg.topic: + self.handle_tasmota(msg) + + def handle_tasmota(self, msg): + from_ = msg.topic.split("/")[1] + msg_j = json.loads(msg.payload.decode()) + #print(from_) + fields = {k: v for k, v in msg_j["ENERGY"].items() if k not in {"TotalStartTime"}} + points = [{"measurement": "tasmota_power", "tags": {"plug": from_}, "fields": fields}] + write_api = self.influxc.write_api(write_options = SYNCHRONOUS) + write_api.write( + os.environ["DOCKER_INFLUXDB_INIT_BUCKET"], + os.environ["DOCKER_INFLUXDB_INIT_ORG"], + points, + write_precision = WritePrecision.S + ) + +if __name__ == "__main__": + env_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "config.env") + if os.path.exists(env_path): + import dotenv + dotenv.load_dotenv(dotenv_path = env_path) + INFLUXDB_HOST = "localhost" + MQTT_HOST = "localhost" + else: + INFLUXDB_HOST = "influxdb" + MQTT_HOST = "mqtt" + + mqtt_client = MQTTClient() |