#! /usr/bin/python3 import re import json from typing import NamedTuple import paho.mqtt.client as mqtt from influxdb import InfluxDBClient from datetime import datetime INFLUXDB_ADDRESS = 'IP Datenbank' INFLUXDB_USER = 'benutzer' INFLUXDB_PASSWORD = 'passwort' INFLUXDB_DATABASE = 'Name Datenbank' MQTT_ADDRESS = 'IP MQTT Broker' MQTT_USER = '' MQTT_PASSWORD = '' MQTT_TOPIC = 'SH-plus-HT/#' MQTT_REGEX = 'SH-plus-HT/([^/]+)/([^/]+)' MQTT_CLIENT_ID = 'MQTT_InfluxDB_Bridge' MQTT_FIELDS_1 = [ 'absolute-luftfeuchtigkeit-keller' # hier weitere relevante auflisten ] MQTT_FIELDS_2 = [ 'luftfeuchtigkeitgesamt-keller' # hier weitere relevante auflisten ] influxdb_client = InfluxDBClient(INFLUXDB_ADDRESS, 8086, INFLUXDB_USER, INFLUXDB_PASSWORD, None) class SensorData(NamedTuple): location: str measurement: str value: float def on_connect(client, userdata, flags, rc): """ The callback for when the client receives a CONNACK response from the server.""" # print('Connected with result code ' + str(rc)) client.subscribe(MQTT_TOPIC) client.subscribe('SH-plus-HT/#', qos=1) def _parse_mqtt_message(topic, payload): # print(topic) match = re.match(MQTT_REGEX, topic) if match: location = match.group(1) measurement = match.group(2) if measurement == 'status': return None #return SensorData(location, measurement, float(payload)) # print(" Wert von Payload:", payload) system1 = re.search('shelly-ID/absolute-luftfeuchtigkeit-keller', str(topic)) if system1: # print("Es wurde der shelly gefunden") # print("Welcher Wert wird uebergeben vom Sensor jetzt: ", payload) system10 = re.search('absolute-luftfeuchtigkeit-keller', str(topic)) if system10: # print("######## Absolute-Luftfeuchtigkeit ist enthalten /########") for field in MQTT_FIELDS_1: # print("Was haben wir hier: ",location) # print("was fuer ein feld: ",field) # print("Welche daten werden hier alles ausgeben: ",payload) yield SensorData(location, field, float(payload)) # print(" Wert von Payload:", payload) system2 = re.search('shelly-ID/luftfeuchtigkeitgesamt-keller', str(topic)) if system2: # print("Es wurde der shelly gefunden") # print("Welcher Wert wird uebergeben vom Sensor jetzt: ", payload) system20 = re.search('luftfeuchtigkeitgesamt-keller', str(topic)) if system20: # print("######## Absolute-Luftfeuchtigkeit ist enthalten /########") for field in MQTT_FIELDS_2: # print("Was haben wir hier: ",location) # print("was fuer ein feld: ",field) # print("Welche daten werden hier alles ausgeben: ",payload) yield SensorData(location, field, float(payload)) else: return None def _send_sensor_data_to_influxdb(sensor_data): # print("datenbank: ",sensor_data) now = datetime.now() # print("#####################################") # print(" Zeit: ", now) # print("#####################################") json_body = [ { 'measurement': sensor_data.measurement, 'tags': { 'location': sensor_data.location }, 'fields': { 'value': sensor_data.value } } ] influxdb_client.write_points(json_body) def on_message(client, userdata, msg): """The callback for when a PUBLISH message is received from the server.""" # print(msg.topic + ' ' + str(msg.payload)) sensor_data_sets = _parse_mqtt_message(msg.topic, msg.payload.decode('utf-8')) if sensor_data_sets is None: print("Couldn't parse sensor data!") return for sensor_data in sensor_data_sets: _send_sensor_data_to_influxdb(sensor_data) def _init_influxdb_database(): databases = influxdb_client.get_list_database() if len(list(filter(lambda x: x['name'] == INFLUXDB_DATABASE, databases))) == 0: influxdb_client.create_database(INFLUXDB_DATABASE) influxdb_client.switch_database(INFLUXDB_DATABASE) def main(): _init_influxdb_database() mqtt_client = mqtt.Client(MQTT_CLIENT_ID) mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD) mqtt_client.on_connect = on_connect mqtt_client.on_message = on_message mqtt_client.connect(MQTT_ADDRESS, 1883) mqtt_client.loop_forever() if __name__ == '__main__': print('MQTT to InfluxDB bridge') main()