mqtt-to-opentsdb/main.py
Frank Villaro-Dixon 4d2559cec6 init
Signed-off-by: Frank Villaro-Dixon <frank@villaro-dixon.eu>
2023-07-11 22:04:01 +02:00

59 lines
1.4 KiB
Python

import time
import json
import os
import logging
from opentsdb import TSDBClient
import paho.mqtt.client as mqtt
logging.basicConfig(level=logging.INFO)
MQTT_PREFIX = 'zigbee2mqtt/'
WANTED_PAYLOAD_KEYS = ['temperature', 'humidity', 'linkquality', 'temperature', 'pressure']
tsdb = TSDBClient(uri=os.environ['OPENTSDB_URL'])
def on_connect(client, userdata, flags, rc):
client.subscribe(MQTT_PREFIX + "#")
def on_message(client, userdata, msg):
name = msg.topic.removeprefix(MQTT_PREFIX)
try:
payload = json.loads(msg.payload.decode())
except Exception as e:
print(e)
return
save_message(name, payload)
def save_message(name, payload):
now = int(time.time())
for key in WANTED_PAYLOAD_KEYS:
if key in payload:
value = payload[key]
print(f'SAVE {name}/{key} -> {value}')
send_tsdb(key, value, {'device': name, 'timestamp': now})
def send_tsdb(key: str, value: float, tags: dict[str, str]):
metric = 'fvd-temp.'+key
tsdb.send(metric, value, **tags)
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(os.environ['MQTT_HOST'], 1883, 60)
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()