This shows you the differences between two versions of the page.
Next revision | Previous revision | ||
configuration:ein-mqtt-server-fuer-iot-telemetry-sensoren [2023/10/31 14:51] – ↷ Page moved from configuration:ein-mqtt-server-fuer-iot-telemetry-sensoren to app-notes:ein-mqtt-server-fuer-iot-telemetry-sensoren dodenhoeft | app-notes:ein-mqtt-server-fuer-iot-telemetry-sensoren [2024/06/14 09:28] (current) – [Prerequisites] gray | ||
---|---|---|---|
Line 1: | Line 1: | ||
+ | ====== An MQTT Server for IoT Telemetry Sensors ====== | ||
+ | |||
+ | |||
+ | ===== Goal ===== | ||
+ | The question is why IoT telemetry sensors are not simply operated via a client-server structure like other services. A number of sensors are only supplied via a battery. A permanent connection is not possible here. But even NFC sensors that are only active during use cannot make their information usable via classic network protocols. The asynchronous '' | ||
+ | |||
+ | |||
+ | ===== Prerequisites ===== | ||
+ | |||
+ | * NetModule Wireless Router with wireless connection | ||
+ | * NetModule SW [[https:// | ||
+ | * A license is required for the LXC container | ||
+ | * Not available for legacy products NB1600, NB2700, NB3700, NB3710 | ||
+ | ===== - Concept ===== | ||
+ | |||
+ | An MQTT broker can be used to collect data from IoT telemetry sensors. These can be published via SDK scripts of the Router API, the program tool Node-RED or directly via the MQTT protocol to the broker. | ||
+ | |||
+ | The data is stored in an InfluxDB database so that it can be used for analysis. Python scripts are used to re-aggregate the data and store it as a Json format. | ||
+ | |||
+ | Grafana is used for display and analysis. Here short term and long term evaluations are well visualized for the user. | ||
+ | |||
+ | ---- | ||
+ | |||
+ | {{: | ||
+ | |||
+ | ---- | ||
+ | |||
+ | ===== - An NB2800 with MQTT server and IoT devices for data collection===== | ||
+ | |||
+ | ==== - Collect telemetry data with the help of SDK scripts ==== | ||
+ | |||
+ | Query measurement data from an IoT device (in this example a Shelly Plus H&T) with an SDK script via the MQTT interface.The '' | ||
+ | |||
+ | [[https:// | ||
+ | |||
+ | What data is required: | ||
+ | * Temperatur | ||
+ | * Humanity | ||
+ | * DevicePower | ||
+ | * SysTime | ||
+ | |||
+ | The information on the IoT devices is requested via the SDK function ''" | ||
+ | |||
+ | < | ||
+ | MESSAGE=sprintf(' | ||
+ | </ | ||
+ | |||
+ | ---- | ||
+ | |||
+ | This SDK script will publish information from the IoT device. | ||
+ | |||
+ | <code c shellyplusht publish_SDK.are> | ||
+ | HOST = "< | ||
+ | PORT = 1883; | ||
+ | USERNAME = ""; | ||
+ | PASSWORD = ""; | ||
+ | TOPIC = " | ||
+ | QOS = 1; | ||
+ | RETAIN = 0; | ||
+ | MESSAGE = ""; | ||
+ | |||
+ | DEBUG=false; | ||
+ | |||
+ | while (true){ | ||
+ | | ||
+ | | ||
+ | /* publish the message to mqtt broker Temperature */ | ||
+ | ret = nb_mqtt_publish(HOST, | ||
+ | if (ret<0){ | ||
+ | | ||
+ | } | ||
+ | | ||
+ | | ||
+ | | ||
+ | /* publish the message to mqtt broker Humidity */ | ||
+ | ret = nb_mqtt_publish(HOST, | ||
+ | if (ret<0){ | ||
+ | | ||
+ | } | ||
+ | | ||
+ | | ||
+ | | ||
+ | /* publish the message to mqtt broker DevicePower */ | ||
+ | ret = nb_mqtt_publish(HOST, | ||
+ | if (ret<0){ | ||
+ | | ||
+ | } | ||
+ | | ||
+ | | ||
+ | | ||
+ | /* publish the message to mqtt broker Sys */ | ||
+ | ret = nb_mqtt_publish(HOST, | ||
+ | if (ret<0){ | ||
+ | | ||
+ | } | ||
+ | | ||
+ | } | ||
+ | exit(0); | ||
+ | </ | ||
+ | |||
+ | ---- | ||
+ | |||
+ | It is important here that the request is not sent directly to the device. The '' | ||
+ | |||
+ | < | ||
+ | {" | ||
+ | {" | ||
+ | {" | ||
+ | {" | ||
+ | </ | ||
+ | |||
+ | Through the SDK script "'' | ||
+ | |||
+ | |||
+ | ==== - Make telemetry data usable via SDK scripts Subscribe ==== | ||
+ | |||
+ | The sensor then retrieves the requests from the broker and passes the information block back to the broker with a publish. This would then be displayed in the console. | ||
+ | |||
+ | < | ||
+ | {“id”: | ||
+ | </ | ||
+ | |||
+ | ---- | ||
+ | |||
+ | If you want to be able to easily use the information values for other SDK scripts. The data must be retrieved from the broker using the "'' | ||
+ | |||
+ | However, only the required information must first be extracted from the data block. The SDK script "'' | ||
+ | |||
+ | |||
+ | In this script, the values from the memory cells are also used to calculate the absolute final humidity. | ||
+ | |||
+ | <code c shellyplusht subscribe_SDK.are> | ||
+ | HOST = "IP MQTT Broker"; | ||
+ | PORT = 1883; | ||
+ | KEEPALIVE = 60; | ||
+ | PROTOCOL = " | ||
+ | USERNAME = ""; | ||
+ | PASSWORD = ""; | ||
+ | CLIENT_ID = " | ||
+ | TOPIC = " | ||
+ | QOS = 1; | ||
+ | RETAIN = 0; | ||
+ | MESSAGE = ""; | ||
+ | TIMEOUT = 1000; | ||
+ | CLEAN_SESSION = true; | ||
+ | MQTT_HANDLE = 0; | ||
+ | |||
+ | nb_config_set(" | ||
+ | nb_config_set(" | ||
+ | nb_config_set(" | ||
+ | nb_config_set(" | ||
+ | nb_config_set(" | ||
+ | |||
+ | DEBUG=false; | ||
+ | |||
+ | template shellyplusht_< | ||
+ | temperatur_zahl = ""; | ||
+ | luftfeuchte_zahl = ""; | ||
+ | percent_zahl = ""; | ||
+ | battery_zahl = ""; | ||
+ | letzteaenderung = ""; | ||
+ | } | ||
+ | |||
+ | /*create new template instance*/ | ||
+ | shelly_data = new shellyplusht_< | ||
+ | |||
+ | /*create new mqtt instance*/ | ||
+ | MQTT_HANDLE = nb_mqttlib_new(CLIENT_ID, | ||
+ | |||
+ | if (nb_mqttlib_set_protocol_version(MQTT_HANDLE, | ||
+ | printf(" | ||
+ | exit(1); | ||
+ | } | ||
+ | |||
+ | if (nb_mqttlib_set_user_pw(MQTT_HANDLE, | ||
+ | printf(" | ||
+ | exit(1); | ||
+ | } | ||
+ | |||
+ | if (nb_mqttlib_connect(MQTT_HANDLE, | ||
+ | printf(" | ||
+ | exit(1); | ||
+ | } | ||
+ | |||
+ | if (nb_mqttlib_subscribe(MQTT_HANDLE, | ||
+ | printf(" | ||
+ | exit(1); | ||
+ | } | ||
+ | |||
+ | while (true){ | ||
+ | |||
+ | ret = nb_mqttlib_get_callback_message(MQTT_HANDLE, | ||
+ | |||
+ | buffer=(string) ret.msg.msg; | ||
+ | |||
+ | if(DEBUG) printf(" | ||
+ | temperatur_finden = strstr( buffer, '" | ||
+ | if(temperatur_finden != NULL){ | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | }else { | ||
+ | printf(" | ||
+ | } | ||
+ | if(DEBUG) printf(" | ||
+ | luftfeuchte_finden = strstr( buffer, '" | ||
+ | | ||
+ | if(DEBUG) printf(" | ||
+ | laenger_vom_string = strlen(buffer); | ||
+ | if(DEBUG) printf(" | ||
+ | neuer_daten_string = substr(buffer, | ||
+ | if(DEBUG) printf(" | ||
+ | luftfeuchte_block = strstr(neuer_daten_string, | ||
+ | if(DEBUG) printf(" | ||
+ | neuerblock = substr(neuer_daten_string, | ||
+ | if(DEBUG) printf(" | ||
+ | luftfeuchte = strstr( neuerblock, ':' | ||
+ | if(DEBUG) printf(" | ||
+ | luftfeuchte_zahl = substr(neuerblock, | ||
+ | printf(" | ||
+ | shelly_data.luftfeuchte_zahl = luftfeuchte_zahl; | ||
+ | nb_config_set(sprintf(" | ||
+ | }else { | ||
+ | | ||
+ | } | ||
+ | if(DEBUG) printf(" | ||
+ | battery_finden = strstr( buffer, '" | ||
+ | if(battery_finden != NULL){ | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | if(DEBUG) printf(" | ||
+ | percent_finden = strstr( buffer, '" | ||
+ | if(percent_finden != NULL){ | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | } else { | ||
+ | | ||
+ | } | ||
+ | } else { | ||
+ | printf(" | ||
+ | shelly_data.percent_zahl = " | ||
+ | shelly_data.battery_zahl = battery_zahl; | ||
+ | nb_config_set(sprintf(" | ||
+ | nb_config_set(sprintf(" | ||
+ | } | ||
+ | } else { | ||
+ | | ||
+ | } | ||
+ | speicherzelle_1 = nb_config_get(" | ||
+ | speicherzelle_2 = nb_config_get(" | ||
+ | speicherzelle_3 = nb_config_get(" | ||
+ | speicherzelle_4 = nb_config_get(" | ||
+ | speicherzelle_5 = nb_config_get(" | ||
+ | if(DEBUG) printf(" | ||
+ | if(DEBUG) printf(" | ||
+ | if(DEBUG) printf(" | ||
+ | if(DEBUG) printf(" | ||
+ | if(DEBUG) printf(" | ||
+ | |||
+ | # Bestimmung der Absolutendluftfeuchtigkeit in der Luft Gramm pro Kubikmeter | ||
+ | I1 = (float) nb_config_get(" | ||
+ | I2 = (float) nb_config_get(" | ||
+ | if(DEBUG) printf(" | ||
+ | if(DEBUG) dump(I1, I2); | ||
+ | if(DEBUG) printf(" | ||
+ | # Berechnung der Absolutendluftfeuchtigkeit | ||
+ | result = (10 ** 5 * 18.016/ | ||
+ | if(DEBUG) printf(" | ||
+ | if(DEBUG) print(result); | ||
+ | if(DEBUG) printf(" | ||
+ | result = (string) result; | ||
+ | if(DEBUG) printf(" | ||
+ | nb_config_set(sprintf(" | ||
+ | speicherzelle_result = nb_config_get(" | ||
+ | if(DEBUG) printf(" | ||
+ | |||
+ | //For debugging | ||
+ | // | ||
+ | | ||
+ | nb_syslog(" | ||
+ | } | ||
+ | |||
+ | sleep(1); | ||
+ | } | ||
+ | | ||
+ | </ | ||
+ | |||
+ | ---- | ||
+ | |||
+ | Below this example you can see how the script takes the temperature value from the data block. | ||
+ | |||
+ | < | ||
+ | Ausgabe unforamtierten Daten | ||
+ | {" | ||
+ | " | ||
+ | Temperatur Wert tC ist entalten | ||
+ | Hier wird die laenge des Daten String bestimmt: 121 Zeichen | ||
+ | Neuer Daten String verkleinert: | ||
+ | tC": | ||
+ | Bestimmen des Temperaturblock: | ||
+ | Ausgabe des Tempaturblock | ||
+ | tC": | ||
+ | Nur der Tempaturwert 3 Zeichen | ||
+ | Der Zahlenwert Temperatur: 22.5 | ||
+ | Ausgabe des Datum und der Zeit 30/ | ||
+ | </ | ||
+ | |||
+ | |||
+ | |||
+ | ---- | ||
+ | |||
+ | |||
+ | ===== - Bring data from the MQTT broker structured into a database ===== | ||
+ | |||
+ | An Influx database is created via the console. Only the access and the name of the thank you bank need to be stored. A Python script retrieves the data from the MQTT broker and also builds the structure based on the data. | ||
+ | |||
+ | ---- | ||
+ | |||
+ | <code c shellyplusht subscribe_datenbank.py> | ||
+ | #! / | ||
+ | import re | ||
+ | import json | ||
+ | from typing import NamedTuple | ||
+ | |||
+ | import paho.mqtt.client as mqtt | ||
+ | from influxdb import InfluxDBClient | ||
+ | |||
+ | from datetime import datetime | ||
+ | |||
+ | INFLUXDB_ADDRESS = 'IP Datenbank' | ||
+ | INFLUXDB_USER = ' | ||
+ | INFLUXDB_PASSWORD = ' | ||
+ | INFLUXDB_DATABASE = 'Name Datenbank' | ||
+ | |||
+ | MQTT_ADDRESS = 'IP MQTT Broker' | ||
+ | MQTT_USER = '' | ||
+ | MQTT_PASSWORD = '' | ||
+ | MQTT_TOPIC = ' | ||
+ | MQTT_REGEX = ' | ||
+ | MQTT_CLIENT_ID = ' | ||
+ | |||
+ | MQTT_FIELDS_1 = [ | ||
+ | ' | ||
+ | # hier weitere relevante auflisten | ||
+ | ] | ||
+ | MQTT_FIELDS_2 = [ | ||
+ | ' | ||
+ | # hier weitere relevante auflisten | ||
+ | ] | ||
+ | |||
+ | influxdb_client = InfluxDBClient(INFLUXDB_ADDRESS, | ||
+ | |||
+ | class SensorData(NamedTuple): | ||
+ | location: str | ||
+ | measurement: | ||
+ | value: float | ||
+ | |||
+ | def on_connect(client, | ||
+ | """ | ||
+ | # print(' | ||
+ | client.subscribe(MQTT_TOPIC) | ||
+ | client.subscribe(' | ||
+ | |||
+ | def _parse_mqtt_message(topic, | ||
+ | # print(topic) | ||
+ | match = re.match(MQTT_REGEX, | ||
+ | if match: | ||
+ | location = match.group(1) | ||
+ | measurement = match.group(2) | ||
+ | if measurement == ' | ||
+ | return None | ||
+ | #return SensorData(location, | ||
+ | # print(" | ||
+ | system1 = re.search(' | ||
+ | if system1: | ||
+ | # print(" | ||
+ | # print(" | ||
+ | system10 = re.search(' | ||
+ | if system10: | ||
+ | # print("######## | ||
+ | for field in MQTT_FIELDS_1: | ||
+ | # | ||
+ | # | ||
+ | # | ||
+ | yield SensorData(location, | ||
+ | # print(" | ||
+ | system2 = re.search(' | ||
+ | if system2: | ||
+ | # print(" | ||
+ | # print(" | ||
+ | system20 = re.search(' | ||
+ | if system20: | ||
+ | # print("######## | ||
+ | for field in MQTT_FIELDS_2: | ||
+ | # | ||
+ | # | ||
+ | # | ||
+ | yield SensorData(location, | ||
+ | else: | ||
+ | return None | ||
+ | |||
+ | def _send_sensor_data_to_influxdb(sensor_data): | ||
+ | # print(" | ||
+ | now = datetime.now() | ||
+ | # print("#####################################" | ||
+ | # print(" | ||
+ | # print("#####################################" | ||
+ | |||
+ | json_body = [ | ||
+ | { | ||
+ | ' | ||
+ | ' | ||
+ | ' | ||
+ | }, | ||
+ | ' | ||
+ | ' | ||
+ | } | ||
+ | } | ||
+ | ] | ||
+ | influxdb_client.write_points(json_body) | ||
+ | |||
+ | def on_message(client, | ||
+ | """ | ||
+ | # print(msg.topic + ' ' + str(msg.payload)) | ||
+ | sensor_data_sets = _parse_mqtt_message(msg.topic, | ||
+ | if sensor_data_sets is None: | ||
+ | print(" | ||
+ | return | ||
+ | for sensor_data in sensor_data_sets: | ||
+ | _send_sensor_data_to_influxdb(sensor_data) | ||
+ | |||
+ | def _init_influxdb_database(): | ||
+ | databases = influxdb_client.get_list_database() | ||
+ | if len(list(filter(lambda x: x[' | ||
+ | influxdb_client.create_database(INFLUXDB_DATABASE) | ||
+ | influxdb_client.switch_database(INFLUXDB_DATABASE) | ||
+ | |||
+ | def main(): | ||
+ | _init_influxdb_database() | ||
+ | mqtt_client = mqtt.Client(MQTT_CLIENT_ID) | ||
+ | mqtt_client.username_pw_set(MQTT_USER, | ||
+ | mqtt_client.on_connect = on_connect | ||
+ | mqtt_client.on_message = on_message | ||
+ | mqtt_client.connect(MQTT_ADDRESS, | ||
+ | mqtt_client.loop_forever() | ||
+ | |||
+ | if __name__ == ' | ||
+ | print(' | ||
+ | main() | ||
+ | </ | ||
+ | |||
+ | ---- | ||
+ | |||
+ | A cronjob starts the Python script and monitors whether it is still running. | ||
+ | |||
+ | < | ||
+ | #!/bin/bash | ||
+ | # abfrage | ||
+ | if pgrep sh_plus_ht.py > /dev/null; then | ||
+ | # Der Prozess ist vorhanden das skript laeuft | ||
+ | echo Das Skript ist gestartet | ||
+ | else | ||
+ | # Der Prozes ist nicht vorhanden das skript muss gestartet werden | ||
+ | echo Das Skript ist nicht gestartet | ||
+ | / | ||
+ | sleep 5s | ||
+ | echo Das Skript wurde gestartet | ||
+ | fi | ||
+ | exit 0 | ||
+ | </ | ||
+ | |||
+ | |||
+ | ---- | ||
+ | |||
+ | ===== - The visualization of the telemetry data with the help of Grafana ===== | ||
+ | |||
+ | |||
+ | With the program Grafana we want to evaluate and display the data that we have stored structured in the InfluxDB for the user. | ||
+ | |||
+ | First, integrate the InfluxDB into Grafana. You can execute this under option "'' | ||
+ | |||
+ | {{: | ||
+ | |||
+ | ---- | ||
+ | The Query Language: '' | ||
+ | |||
+ | From now on you have access to the data that is already in the database and that will be added over time. | ||
+ | |||
+ | |||
+ | ==== - The first visualization under Grafana ==== | ||
+ | |||
+ | |||
+ | For the first attempts with Grafana it is not necessary to start with the setup of a dashboard. The option Discover can be used to play with the program and to get to know its function. But also for the first test whether the connection with the database works, you can test here very nicely. | ||
+ | |||
+ | |||
+ | {{: | ||
+ | |||
+ | ---- | ||
+ | |||
+ | The data can be accessed with SQL commands. Under Grafana, however, you have the possibility to operate the whole thing via an interface without having deep knowledge of SQL tools. | ||
+ | |||
+ | Under the option '' | ||
+ | |||
+ | ---- | ||
+ | If everything works we get here in the example temperature value for a period of one hour displayed. | ||
+ | |||
+ | {{: | ||
+ | |||
+ | ---- | ||
+ | |||
+ | With this function Grafana Discover I would try more function under the programs. One comes here quickly on solutions that are helpful in the implementation in the dashboard. | ||
+ | |||
+ | ---- | ||
+ | |||