Table of Contents

An MQTT Server for IoT Telemetry Sensors

Goal

The question is why IoT telemetry sensors are not simply operated via a client-server structure like other services. A number of sensors are only supplied via a battery. A permanent connection is not possible here. But even NFC sensors that are only active during use cannot make their information usable via classic network protocols. The asynchronous MQTT protocol has become well established here. Sensors only have a connection when they pass on new information to a broker. The clients receive information from the broker if it is available.

Prerequisites

1. Concept

An MQTT broker can be used to collect data from IoT telemetry sensors. These can be published via SDK scripts of the Router API, the program tool Node-RED or directly via the MQTT protocol to the broker.

The data is stored in an InfluxDB database so that it can be used for analysis. Python scripts are used to re-aggregate the data and store it as a Json format.

Grafana is used for display and analysis. Here short term and long term evaluations are well visualized for the user.



2. An NB2800 with MQTT server and IoT devices for data collection

2.1 Collect telemetry data with the help of SDK scripts

Query measurement data from an IoT device (in this example a Shelly Plus H&T) with an SDK script via the MQTT interface.The SDK-API for the MQTT commands can be found under this link.

link SDK API

What data is required:

The information on the IoT devices is requested via the SDK function “nb_mqtt_publish”. We can use the “MESSAGE” option to specify which devices are to be addressed. But also which information block we need.

MESSAGE=sprintf('{"id":123, "src":"shellies/shelly-<ID>", "method":"Temperature.GetStatus", "params":{"id":0}');

This SDK script will publish information from the IoT device.

shellyplusht publish_SDK.are
HOST = "<IP MQTT Broker>";
PORT = 1883;
USERNAME = "";
PASSWORD = "";
TOPIC = "shellyplusht-<ID>/rpc";
QOS = 1;
RETAIN = 0;
MESSAGE = "";
 
DEBUG=false;  # Debug aktivieren mit true oder deaktivieren mit false.
 
while (true){
 MESSAGE=sprintf('{"id":123, "src":"shellies/shelly-<ID>", "method":"Temperature.GetStatus", "params":{"id":0}');
 if(DEBUG) printf("%s\n", MESSAGE);
    /* publish the message to mqtt broker Temperature */
 ret = nb_mqtt_publish(HOST, PORT, USERNAME, PASSWORD, TOPIC, QOS, RETAIN, MESSAGE);;
 if (ret<0){
   nb_syslog("Failed to publish mqtt message");
 }
 usleep(250000);
 MESSAGE2=sprintf('{"id":123, "src":"shellies/shelly-<ID>", "method":"Humidity.GetStatus", "params":{"id":0}');
 if(DEBUG) printf("%s\n", MESSAGE2);
    /* publish the message to mqtt broker Humidity */
 ret = nb_mqtt_publish(HOST, PORT, USERNAME, PASSWORD, TOPIC, QOS, RETAIN, MESSAGE2);
 if (ret<0){
   nb_syslog("Failed to publish mqtt message");
 }
 usleep(250000);
 MESSAGE3=sprintf('{"id":123, "src":"shellies/shelly-<ID>", "method":"DevicePower.GetStatus", "params":{"id":0}');
 if(DEBUG) printf("%s\n", MESSAGE3);
    /* publish the message to mqtt broker DevicePower */
 ret = nb_mqtt_publish(HOST, PORT, USERNAME, PASSWORD, TOPIC, QOS, RETAIN, MESSAGE3);
 if (ret<0){
   nb_syslog("Failed to publish mqtt message");
 }
 usleep(250000);
 MESSAGE4=sprintf('{"id":123, "src":"shellies/shelly-<ID>", "method":"Sys.GetStatus"}');
 if(DEBUG) printf("%s\n", MESSAGE4);
    /* publish the message to mqtt broker Sys */
 ret = nb_mqtt_publish(HOST, PORT, USERNAME, PASSWORD, TOPIC, QOS, RETAIN, MESSAGE4); 
 if (ret<0){
   nb_syslog("Failed to publish mqtt message");
 }
 sleep(1);
}
exit(0);

It is important here that the request is not sent directly to the device. The MESSAGE goes to the MQTT broker. This IoT device then retrieves the information from the broker. You can see the publish to the broker under the console.

USER
{“id”:123, “src”:“shellies/shelly-ID”, “method”:“Temperature.GetStatus”, “params”:{“id”:0} {“id”:123, “src”:“shellies/shelly-ID”, “method”:“Humidity.GetStatus”, “params”:{“id”:0} {“id”:123, “src”:“shellies/shelly-ID”, “method”:“DevicePower.GetStatus”, “params”:{“id”:0} {“id”:123, “src”:“shellies/shelly-ID”, “method”:“Sys.GetStatus”}

Through the SDK script “shellyplusht_publish_SDK” the data will be stored in the MQTT broker. In the second step we will subscribe the data with a SDK and write these memory cells. So we are then able to use the information values for other SDK script.

2.2 Make telemetry data usable via SDK scripts Subscribe

The sensor then retrieves the requests from the broker and passes the information block back to the broker with a publish. This would then be displayed in the console.

USER
{“id”:123,“src”:“shellyplusht-ID”,“dst”:“shellies/shelly-ID”, “result”:{“id”: 0,“tC”:22.5, “tF”:72.4}}


If you want to be able to easily use the information values for other SDK scripts. The data must be retrieved from the broker using the “nb_mqttlib_subscribe” function. This function is used to subscribe to MQTT messages for specific topics.

However, only the required information must first be extracted from the data block. The SDK script “shellyplusht subscribe_SDK.are” for this and then only stores the value in a memory cell. The advantage is that other scripts can use the value for other functions without having to query it again.

In this script, the values from the memory cells are also used to calculate the absolute final humidity.

shellyplusht subscribe_SDK.are
HOST = "IP MQTT Broker";
PORT = 1883;
KEEPALIVE = 60;
PROTOCOL = "V31";
USERNAME = "";
PASSWORD = "";
CLIENT_ID = "shellyplusht-<ID>/rpc";
TOPIC = "shellies/shelly-<ID>/rpc";
QOS = 1;
RETAIN = 0;
MESSAGE = "";
TIMEOUT = 1000;
CLEAN_SESSION = true;
MQTT_HANDLE = 0;
 
nb_config_set("custom.var0=0");
nb_config_set("custom.var1=0");
nb_config_set("custom.var2=0");
nb_config_set("custom.var3=0");
nb_config_set("custom.var4=0");
 
DEBUG=false;  # Debug aktivieren mit true oder deaktivieren mit false.
 
template shellyplusht_<ID>{
    temperatur_zahl = "";
    luftfeuchte_zahl = "";
    percent_zahl = "";
    battery_zahl = "";
    letzteaenderung = "";
}
 
/*create new template instance*/
shelly_data = new shellyplusht_<ID>();
 
/*create new mqtt instance*/
MQTT_HANDLE = nb_mqttlib_new(CLIENT_ID, CLEAN_SESSION);
 
if (nb_mqttlib_set_protocol_version(MQTT_HANDLE, PROTOCOL)  < 0 ){
        printf("Unable to set Protocol version\n");
        exit(1);
}
 
if (nb_mqttlib_set_user_pw(MQTT_HANDLE, USERNAME, PASSWORD) < 0 ){
	printf("Unable to set Username and Passsword\n");
	exit(1);
}
 
if (nb_mqttlib_connect(MQTT_HANDLE, HOST, PORT, KEEPALIVE) < 0 ){
	printf("Unable to connect\n");
	exit(1);
}
 
if (nb_mqttlib_subscribe(MQTT_HANDLE, TOPIC, QOS) < 0 ){
	printf("Unable to subscribe\n");
	exit(1);
}
 
while (true){
 
ret = nb_mqttlib_get_callback_message(MQTT_HANDLE, TIMEOUT);
 
buffer=(string) ret.msg.msg;
 
if(DEBUG) printf("Ausgabe unforamtierten Daten\n%s\n",buffer);
temperatur_finden = strstr( buffer, '"tC"');  // Hier wird tC verarbeitet
if(temperatur_finden != NULL){
 if(DEBUG) printf("Temperatur Wert tC ist entalten\n");
 laenger_vom_string = strlen(buffer);
 if(DEBUG) printf("Hier wird die laenge des Daten String bestimmt: %s Zeichen \n",laenger_vom_string);
 neuer_daten_string = substr(buffer, temperatur_finden+1,laenger_vom_string);
 if(DEBUG) printf("Neuer Daten String verkleinert:\n%s\n",neuer_daten_string);
 temperatur_block = strstr( neuer_daten_string, ',');
 if(DEBUG) printf("Bestimmen des Temperaturblock: %s Zeichen\n",temperatur_block);
 neuerblock = substr(neuer_daten_string, 0,temperatur_block);
 if(DEBUG) printf("Ausgabe des Tempaturblock\n%s\n",neuerblock);
 temperatur = strstr( neuerblock, ':');
 if(DEBUG) printf("Nur der Tempaturwert %s Zeichen\n",temperatur);
 temperatur_zahl = substr(neuerblock, temperatur+1,laenger_vom_string);
 printf("Der Zahlenwert Temperatur: %s\n",temperatur_zahl);
 shelly_data.temperatur_zahl = temperatur_zahl;
 nb_config_set(sprintf("custom.var0=%s",shelly_data.temperatur_zahl));
 datum_zeit = strftime("%d/%m/%Y__%H:%M:%S",localtime(time()));
 if(DEBUG) printf("Ausgabe des Datum und der Zeit %s\n", datum_zeit);
 shelly_data.letzteaenderung = datum_zeit;
 nb_config_set(sprintf("custom.var1=%s",shelly_data.letzteaenderung));
}else {
  printf("Der String tC ist nicht vorhanden\n");
 }
if(DEBUG) printf("Ausgabe unforamtierten Daten\n%s\n",buffer);
luftfeuchte_finden = strstr( buffer, '"rh"');  // Hier wird rh verarbeitet
 if(luftfeuchte_finden != NULL){
  if(DEBUG) printf("Luftfeuchte Wert rh ist entalten\n");
  laenger_vom_string = strlen(buffer);
  if(DEBUG) printf("Hier wird die laenge des Daten String bestimmt: %s Zeichen \n",laenger_vom_string);
  neuer_daten_string = substr(buffer, luftfeuchte_finden+1,laenger_vom_string);
  if(DEBUG) printf("Neuer Daten String verkleinert:\n%s\n",neuer_daten_string);
  luftfeuchte_block = strstr(neuer_daten_string, '}');
  if(DEBUG) printf("Bestimmen des Luftfeuchteblock: %s Zeichen\n",luftfeuchte_block);
  neuerblock = substr(neuer_daten_string, 0,luftfeuchte_block);
  if(DEBUG) printf("Ausgabe des Luftfeuchteblock\n%s\n",neuerblock);
  luftfeuchte = strstr( neuerblock, ':');
  if(DEBUG) printf("Nur der Luftfeuchtewert %s Zeichen\n",luftfeuchte);
  luftfeuchte_zahl = substr(neuerblock, luftfeuchte+1,laenger_vom_string);
  printf("Der Zahlenwert Luftfeuchtigkeit: %s\n",luftfeuchte_zahl);
  shelly_data.luftfeuchte_zahl = luftfeuchte_zahl;
  nb_config_set(sprintf("custom.var2=%s",shelly_data.luftfeuchte_zahl));
 }else {
   printf("Der String rh ist nicht vorhanden\n");
  }
if(DEBUG) printf("Ausgabe unforamtierten Daten\n%s\n",buffer);
battery_finden = strstr( buffer, '"battery"');  // Hier wird battery verarbeitet
if(battery_finden != NULL){
 if(DEBUG) printf("Batteryspannungswert battery ist entalten\n");
 laenger_vom_string = strlen(buffer);
 if(DEBUG) printf("Hier wird die laenge des Daten String bestimmt: %s Zeichen \n",laenger_vom_string);
 neuer_daten_string = substr(buffer, battery_finden+1,laenger_vom_string);
 if(DEBUG) printf("Neuer Daten String verkleinert:\n%s\n",neuer_daten_string);
 battery_block = strstr( neuer_daten_string, ',');
 if(DEBUG) printf("Bestimmen des batteryblock: %s Zeichen\n",battery_block);
 neuerblock = substr(neuer_daten_string, 0,battery_block);
 if(DEBUG) printf("Ausgabe des batteryblock\n%s\n",neuerblock);
 battery = strstr( neuerblock, ':');
 if(DEBUG) printf("Nur der Battery Wert %s Zeichen\n",battery);
 battery_zahl = substr(neuerblock, battery+6,laenger_vom_string);
 if(battery_zahl >= 4){
 printf("Die Batterie Spannung : %s Volt\n",battery_zahl);
 shelly_data.battery_zahl = battery_zahl;
 nb_config_set(sprintf("custom.var3=%s",shelly_data.battery_zahl));
if(DEBUG) printf("Ausgabe unforamtierten Daten\n%s\n",buffer);
percent_finden = strstr( buffer, '"percent"');  // Hier wird percent verarbeitet
if(percent_finden != NULL){
 if(DEBUG) printf("Der String percent ist entalten\n");
 laenger_vom_string = strlen(buffer);
 if(DEBUG) printf("Hier wird die laenge des Daten String bestimmt: %s Zeichen \n",laenger_vom_string);
 neuer_daten_string = substr(buffer, percent_finden+1,laenger_vom_string);
 if(DEBUG) printf("Neuer Daten String verkleinert:\n%s\n",neuer_daten_string);
 percent_block = strstr( neuer_daten_string, '}');
 if(DEBUG) printf("Bestimmen des percentblock: %s Zeichen\n",percent_block);
 neuerblock = substr(neuer_daten_string, 0,percent_block);
 if(DEBUG) printf("Ausgabe des percentblock\n%s\n",neuerblock);
 percent = strstr( neuerblock, ':');
 if(DEBUG) printf("Nur der percent Wert %s Zeichen\n",percent);
 percent_zahl = substr(neuerblock, percent+1,laenger_vom_string);
 printf("Der Prozentsatz der Batterie Spannung : %s Prozent\n",percent_zahl);
 shelly_data.percent_zahl = percent_zahl;
 nb_config_set(sprintf("custom.var4=%s",shelly_data.percent_zahl));
} else {
   printf("Der String percent ist nicht vorhanden\n");
 }
 } else {
    printf("Der Shelly wird über USB-Netzteil versorgt: %s Volt nur.\n",battery_zahl);
shelly_data.percent_zahl = "n/a";
shelly_data.battery_zahl = battery_zahl;
nb_config_set(sprintf("custom.var3=%s",shelly_data.battery_zahl));
nb_config_set(sprintf("custom.var4=%s",shelly_data.percent_zahl));
  }
} else {
   printf("Der String battery ist nicht vorhanden\n");
  }
speicherzelle_1 = nb_config_get("custom.var0");
speicherzelle_2 = nb_config_get("custom.var1");
speicherzelle_3 = nb_config_get("custom.var2");
speicherzelle_4 = nb_config_get("custom.var3");
speicherzelle_5 = nb_config_get("custom.var4");
if(DEBUG) printf("Ausgabe von custom Speicherzelle_1 Temperatur: %s\n",speicherzelle_1);
if(DEBUG) printf("Ausgabe von custom Speicherzelle_2 Datum Zeit: %s\n",speicherzelle_2);
if(DEBUG) printf("Ausgabe von custom Speicherzelle_3 Luftfeuchtewert: %s\n",speicherzelle_3);
if(DEBUG) printf("Ausgabe von custom Speicherzelle_4 Batteryspannungswert: %s\n",speicherzelle_4);
if(DEBUG) printf("Ausgabe von custom Speicherzelle_5 Prozentsatz: %s\n",speicherzelle_5);
 
# Bestimmung der Absolutendluftfeuchtigkeit in der Luft Gramm pro Kubikmeter
I1 = (float) nb_config_get("custom.var0");   # Der Wert Temperatur
I2 = (float) nb_config_get("custom.var2");   # Der Wert Luftfeuchtigkeit
if(DEBUG) printf("Ausgabe von I1: und I2:  \n");
if(DEBUG) dump(I1, I2);
if(DEBUG) printf("\n");
# Berechnung der Absolutendluftfeuchtigkeit
result =  (10 ** 5 * 18.016/8314.3 * I2/100 * 6.1078 * (10 ** ((7.5*I1)/(237.3+I1))/(I1 + 273.15)));
if(DEBUG) printf("\n");
if(DEBUG) print(result);
if(DEBUG) printf("\n");
result = (string) result;
if(DEBUG) printf("Absolutendluftfeuchtigkeit Berechnung: %s g/m³\n",result);
nb_config_set(sprintf("custom.table2.0.var0=%s",result));
speicherzelle_result = nb_config_get("custom.table2.0.var0");
if(DEBUG) printf("Ausgabe von custom Speicherzelle_result Absolutendluftfeuchtigkeit: %s g/m³\n",speicherzelle_result);
 
//For debugging
//printf("Alle buffer Ausgaben:\n%s\n",buffer);
 if(ret<0){
			nb_syslog("Failed to publish mqtt message");
	}
 
sleep(1);
}
 exit(0);

Below this example you can see how the script takes the temperature value from the data block.

USER
Ausgabe unforamtierten Daten {“id”:123,“src”:“shellyplusht-ID”,“dst”:“shellies/shelly-ID”, “result”:{“id”: 0,“tC”:22.5, “tF”:72.4}} Temperatur Wert tC ist entalten Hier wird die laenge des Daten String bestimmt: 121 Zeichen Neuer Daten String verkleinert: tC“:22.5, “tF”:72.4}} Bestimmen des Temperaturblock: 8 Zeichen Ausgabe des Tempaturblock tC”:22.5 Nur der Tempaturwert 3 Zeichen Der Zahlenwert Temperatur: 22.5 Ausgabe des Datum und der Zeit 30/10/2023_21:19:00


3. Bring data from the MQTT broker structured into a database

An Influx database is created via the console. Only the access and the name of the thank you bank need to be stored. A Python script retrieves the data from the MQTT broker and also builds the structure based on the data.


shellyplusht subscribe_datenbank.py
#! /usr/bin/python3
import re
import json
from typing import NamedTuple
 
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
 
from datetime import datetime
 
INFLUXDB_ADDRESS = 'IP Datenbank'
INFLUXDB_USER = 'benutzer'
INFLUXDB_PASSWORD = 'passwort'
INFLUXDB_DATABASE = 'Name Datenbank'
 
MQTT_ADDRESS = 'IP MQTT Broker'
MQTT_USER = ''
MQTT_PASSWORD = ''
MQTT_TOPIC = 'SH-plus-HT/#'
MQTT_REGEX = 'SH-plus-HT/([^/]+)/([^/]+)'
MQTT_CLIENT_ID = 'MQTT_InfluxDB_Bridge'
 
MQTT_FIELDS_1 = [
    'absolute-luftfeuchtigkeit-keller'
    # hier weitere relevante auflisten
]
MQTT_FIELDS_2 = [
    'luftfeuchtigkeitgesamt-keller'
    # hier weitere relevante auflisten
]
 
influxdb_client = InfluxDBClient(INFLUXDB_ADDRESS, 8086, INFLUXDB_USER, INFLUXDB_PASSWORD, None)
 
class SensorData(NamedTuple):
    location: str
    measurement: str
    value: float
 
def on_connect(client, userdata, flags, rc):
    """ The callback for when the client receives a CONNACK response from the server."""
   # print('Connected with result code ' + str(rc))
    client.subscribe(MQTT_TOPIC)
    client.subscribe('SH-plus-HT/#', qos=1)
 
def _parse_mqtt_message(topic, payload):
#    print(topic)
    match = re.match(MQTT_REGEX, topic)
    if match:
        location = match.group(1)
        measurement = match.group(2)
        if measurement == 'status':
            return None
        #return SensorData(location, measurement, float(payload))
#    print(" Wert von Payload:", payload)
    system1 = re.search('shelly-ID/absolute-luftfeuchtigkeit-keller', str(topic))
    if system1:
#      print("Es wurde der shelly gefunden")
#      print("Welcher Wert wird uebergeben vom Sensor jetzt: ", payload)
      system10 = re.search('absolute-luftfeuchtigkeit-keller', str(topic))
      if system10:
#        print("######## Absolute-Luftfeuchtigkeit ist enthalten /########")
        for field in MQTT_FIELDS_1:
#           print("Was haben wir hier: ",location)
#           print("was fuer ein feld: ",field)
#           print("Welche daten werden hier alles ausgeben: ",payload)
           yield SensorData(location, field, float(payload))
#    print(" Wert von Payload:", payload)
    system2 = re.search('shelly-ID/luftfeuchtigkeitgesamt-keller', str(topic))
    if system2:
#      print("Es wurde der shelly gefunden")
#      print("Welcher Wert wird uebergeben vom Sensor jetzt: ", payload)
      system20 = re.search('luftfeuchtigkeitgesamt-keller', str(topic))
      if system20:
#        print("######## Absolute-Luftfeuchtigkeit ist enthalten /########")
        for field in MQTT_FIELDS_2:
#           print("Was haben wir hier: ",location)
#           print("was fuer ein feld: ",field)
#           print("Welche daten werden hier alles ausgeben: ",payload)
           yield SensorData(location, field, float(payload))
    else:
        return None
 
def _send_sensor_data_to_influxdb(sensor_data):
#    print("datenbank: ",sensor_data)
    now = datetime.now()
#    print("#####################################")
#    print("    Zeit: ", now)
#    print("#####################################")
 
    json_body = [
        {
            'measurement': sensor_data.measurement,
            'tags': {
                'location': sensor_data.location
            },
            'fields': {
                'value': sensor_data.value
            }
        }
    ]
    influxdb_client.write_points(json_body)
 
def on_message(client, userdata, msg):
    """The callback for when a PUBLISH message is received from the server."""
#    print(msg.topic + ' ' + str(msg.payload))
    sensor_data_sets = _parse_mqtt_message(msg.topic, msg.payload.decode('utf-8'))
    if sensor_data_sets is None:
        print("Couldn't parse sensor data!")
        return
    for sensor_data in sensor_data_sets:
        _send_sensor_data_to_influxdb(sensor_data)
 
def _init_influxdb_database():
    databases = influxdb_client.get_list_database()
    if len(list(filter(lambda x: x['name'] == INFLUXDB_DATABASE, databases))) == 0:
        influxdb_client.create_database(INFLUXDB_DATABASE)
    influxdb_client.switch_database(INFLUXDB_DATABASE)
 
def main():
    _init_influxdb_database()
    mqtt_client = mqtt.Client(MQTT_CLIENT_ID)
    mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
    mqtt_client.on_connect = on_connect
    mqtt_client.on_message = on_message
    mqtt_client.connect(MQTT_ADDRESS, 1883)
    mqtt_client.loop_forever()
 
if __name__ == '__main__':
    print('MQTT to InfluxDB bridge')
    main()

A cronjob starts the Python script and monitors whether it is still running.

USER
#!/bin/bash # abfrage ob der tcp server prozess vorhanden ist if pgrep sh_plus_ht.py > /dev/null; then # Der Prozess ist vorhanden das skript laeuft echo Das Skript ist gestartet else # Der Prozes ist nicht vorhanden das skript muss gestartet werden echo Das Skript ist nicht gestartet /home/korris/sh_plus_ht.py & sleep 5s echo Das Skript wurde gestartet fi exit 0


4. The visualization of the telemetry data with the help of Grafana

With the program Grafana we want to evaluate and display the data that we have stored structured in the InfluxDB for the user.

First, integrate the InfluxDB into Grafana. You can execute this under option “Connections”.


The Query Language: InfluxQl can be used if the database version is lower than 2.0. Many prefabricated Grafana interfaces are not yet converted to this version, which you could use.

From now on you have access to the data that is already in the database and that will be added over time.

4.1 The first visualization under Grafana

For the first attempts with Grafana it is not necessary to start with the setup of a dashboard. The option Discover can be used to play with the program and to get to know its function. But also for the first test whether the connection with the database works, you can test here very nicely.


The data can be accessed with SQL commands. Under Grafana, however, you have the possibility to operate the whole thing via an interface without having deep knowledge of SQL tools.

Under the option WHERE you can select the IoT device which you want to display. Under the option select measurement you can select the measurement value that is offered by this device and is also available in the database.


If everything works we get here in the example temperature value for a period of one hour displayed.


With this function Grafana Discover I would try more function under the programs. One comes here quickly on solutions that are helpful in the implementation in the dashboard.