59 lines
1.4 KiB
Python
59 lines
1.4 KiB
Python
|
import time
|
||
|
import json
|
||
|
import os
|
||
|
import logging
|
||
|
|
||
|
from opentsdb import TSDBClient
|
||
|
import paho.mqtt.client as mqtt
|
||
|
|
||
|
|
||
|
logging.basicConfig(level=logging.INFO)
|
||
|
|
||
|
MQTT_PREFIX = 'zigbee2mqtt/'
|
||
|
WANTED_PAYLOAD_KEYS = ['temperature', 'humidity', 'linkquality', 'temperature', 'pressure']
|
||
|
|
||
|
tsdb = TSDBClient(uri=os.environ['OPENTSDB_URL'])
|
||
|
|
||
|
def on_connect(client, userdata, flags, rc):
|
||
|
client.subscribe(MQTT_PREFIX + "#")
|
||
|
|
||
|
def on_message(client, userdata, msg):
|
||
|
name = msg.topic.removeprefix(MQTT_PREFIX)
|
||
|
try:
|
||
|
payload = json.loads(msg.payload.decode())
|
||
|
except Exception as e:
|
||
|
print(e)
|
||
|
return
|
||
|
|
||
|
save_message(name, payload)
|
||
|
|
||
|
def save_message(name, payload):
|
||
|
now = int(time.time())
|
||
|
|
||
|
for key in WANTED_PAYLOAD_KEYS:
|
||
|
if key in payload:
|
||
|
value = payload[key]
|
||
|
print(f'SAVE {name}/{key} -> {value}')
|
||
|
|
||
|
send_tsdb(key, value, {'device': name, 'timestamp': now})
|
||
|
|
||
|
|
||
|
|
||
|
def send_tsdb(key: str, value: float, tags: dict[str, str]):
|
||
|
metric = 'fvd-temp.'+key
|
||
|
tsdb.send(metric, value, **tags)
|
||
|
|
||
|
|
||
|
client = mqtt.Client()
|
||
|
client.on_connect = on_connect
|
||
|
client.on_message = on_message
|
||
|
|
||
|
|
||
|
client.connect(os.environ['MQTT_HOST'], 1883, 60)
|
||
|
|
||
|
# Blocking call that processes network traffic, dispatches callbacks and
|
||
|
# handles reconnecting.
|
||
|
# Other loop*() functions are available that give a threaded interface and a
|
||
|
# manual interface.
|
||
|
client.loop_forever()
|