Ein MQTT Server für IoT Telemetry Sensoren

Ziel

Die Frage ist wieso man IoT Telemetry Sensoren nicht einfach über eine Client Server Struktur, wie andere Dienste betreibt. Eine reihe von Sensoren werden nur über eine Batterie versorgt. Eine permanent Verbindung ist hier nicht möglich. Aber auch NFC Sensoren die nur bei der Benutzung aktive sind, können ihre Information über die Klassische Netzwerk Protokolle nicht nutzbar machen. Hier hat sich das asynchrones Protokoll MQTT stark etabliert. Sensoren haben nur eine Verbindung wenn Sie neue Information weitergeben an einen Broker. Die Clients bekommen Information wenn welche vorhanden sind vom Broker.

1. Konzept

Mit Hilfe eines MQTT Broker können Daten von IoT Telemetry Sensoren gesammelt werden. Diese können über SDK Skripte der Router API, dem Programmtool Node-RED oder direkt über das MQTT Protokoll an den Broker Publish werden.

Das die Daten für eine Auswertung verwendet werden können, werden die Daten in eine Influx DB Datenbank abgelegt. Python Skripte werden dafür verwendet die Daten neu aggregieren und als ein Json Format abzulegen.

Für die Anzeige und Auswertung wird Grafana genutzt. Hier sind Kurzzeitige und Langzeit Auswertungen für den Benutzer gut zu Visualisieren.

Für das Signalisieren von Information wird ein XMPP Server genutzt. Über diesen werden Nachrichten gesendet an einen Handy oder Computer.



2. IoT Geräte und MQTT Server in Verbindung mit LXC

2.1 Telemetriedaten über SDK Skripte Publish

Über die SDK API vom Router eine Nachricht veröffentlichen(published) von einem IoT Gerät, dass über MQTT angesprochen wird.

link SDK API

Für dieses IoT Device wird die Funktion „nb_mqtt_publish“ genutzt. Diese kann verwendet werden, um MQTT- Nachrichten für bestimmt Inhalte zu veröffentlichen.

Die Nachricht, die veröffentlicht werden soll wurde durch Parameter „message“ vorgeben.

Hier werden Daten der Temperatur veröffentliche:

MESSAGE=sprintf('{"id":123, "src":"shellies/shelly-ID", 
"method":"Temperature.GetStatus", "params":{"id":0}');

Mit diesem SDK Skript werden Information von dem IoT Gerät publish.

shellyplusht publish_SDK.are
HOST = "<IP MQTT Broker>";
PORT = 1883;
USERNAME = "";
PASSWORD = "";
TOPIC = "shellyplusht-<ID>/rpc";
QOS = 1;
RETAIN = 0;
MESSAGE = "";
 
DEBUG=false;  # Debug aktivieren mit true oder deaktivieren mit false.
 
while (true){
 MESSAGE=sprintf('{"id":123, "src":"shellies/shelly-<ID>", "method":"Temperature.GetStatus", "params":{"id":0}');
 if(DEBUG) printf("%s\n", MESSAGE);
    /* publish the message to mqtt broker Temperature */
 ret = nb_mqtt_publish(HOST, PORT, USERNAME, PASSWORD, TOPIC, QOS, RETAIN, MESSAGE);
 if (ret<0){
   nb_syslog("Failed to publish mqtt message");
 }
 usleep(250000);
 MESSAGE2=sprintf('{"id":123, "src":"shellies/shelly-<ID>", "method":"Humidity.GetStatus", "params":{"id":0}');
 if(DEBUG) printf("%s\n", MESSAGE2);
    /* publish the message to mqtt broker Humidity */
 ret = nb_mqtt_publish(HOST, PORT, USERNAME, PASSWORD, TOPIC, QOS, RETAIN, MESSAGE2);
 if (ret<0){
   nb_syslog("Failed to publish mqtt message");
 }
 usleep(250000);
 MESSAGE3=sprintf('{"id":123, "src":"shellies/shelly-<ID>", "method":"DevicePower.GetStatus", "params":{"id":0}');
 if(DEBUG) printf("%s\n", MESSAGE3);
    /* publish the message to mqtt broker DevicePower */
 ret = nb_mqtt_publish(HOST, PORT, USERNAME, PASSWORD, TOPIC, QOS, RETAIN, MESSAGE3);
 if (ret<0){
   nb_syslog("Failed to publish mqtt message");
 }
 usleep(250000);
 MESSAGE4=sprintf('{"id":123, "src":"shellies/shelly-<ID>", "method":"Sys.GetStatus"}');
 if(DEBUG) printf("%s\n", MESSAGE4);
    /* publish the message to mqtt broker Sys */
 ret = nb_mqtt_publish(HOST, PORT, USERNAME, PASSWORD, TOPIC, QOS, RETAIN, MESSAGE4);
 if (ret<0){
   nb_syslog("Failed to publish mqtt message");
 }
 sleep(1);
}
exit(0);

In der MESSAGE ist dann die Information enthalten die man benötigt.

USER
{“id”:123, “src”:“shellies/shelly-ID”, “method”:“Temperature.GetStatus”, “params”:{“id”:0} {“id”:123, “src”:“shellies/shelly-ID”, “method”:“Humidity.GetStatus”, “params”:{“id”:0} {“id”:123, “src”:“shellies/shelly-ID”, “method”:“DevicePower.GetStatus”, “params”:{“id”:0} {“id”:123, “src”:“shellies/shelly-ID”, “method”:“Sys.GetStatus”} {“id”:123, “src”:“shellies/shelly-ID”, “method”:“Temperature.GetStatus”, “params”:{“id”:0} {“id”:123, “src”:“shellies/shelly-ID”, “method”:“Humidity.GetStatus”, “params”:{“id”:0} {“id”:123, “src”:“shellies/shelly-ID”, “method”:“DevicePower.GetStatus”, “params”:{“id”:0} {“id”:123, “src”:“shellies/shelly-ID”, “method”:“Sys.GetStatus”}

Durch das SDK Skript „shellyplusht_publish_SDK“ werden die Daten im MQTT Broker hinterlegt. Im zweiten Schritte werden wir mit einem SDK die Daten abonnieren(subscribe) und diese Speicherzellen schreiben. So sind wir dann in der Lage die Informationswerte für andere SDK Skript zu nutzen.

2.2 Telemetriedaten über SDK Skripte Subscribe

Für diese Funktion nutzen wir „nb_mqttlib_subscribe“. Diese Funktion kann verwendet werden, um MQTT- Nachrichten für bestimmt Themen(Topic) zu abonnieren.

Das SDK Skript holt zu erste einmal die unformatierten Werte aus den Daten String und Speichert diese Wert in Speicherzellen dann ab.

Unter diesem Skript werden Werte aus den Speicherzellen für die Berechnung der Absolutendluftfeuchtigkeit genutzt.

shellyplusht subscribe_SDK.are
HOST = "IP MQTT Broker";
PORT = 1883;
KEEPALIVE = 60;
PROTOCOL = "V31";
USERNAME = "";
PASSWORD = "";
CLIENT_ID = "shellyplusht-<ID>/rpc";
TOPIC = "shellies/shelly-<ID>/rpc";
QOS = 1;
RETAIN = 0;
MESSAGE = "";
TIMEOUT = 1000;
CLEAN_SESSION = true;
MQTT_HANDLE = 0;
 
nb_config_set("custom.var0=0");
nb_config_set("custom.var1=0");
nb_config_set("custom.var2=0");
nb_config_set("custom.var3=0");
nb_config_set("custom.var4=0");
 
DEBUG=false;  # Debug aktivieren mit true oder deaktivieren mit false.
 
template shellyplusht_<ID>{
    temperatur_zahl = "";
    luftfeuchte_zahl = "";
    percent_zahl = "";
    battery_zahl = "";
    letzteaenderung = "";
}
 
/*create new template instance*/
shelly_data = new shellyplusht_<ID>();
 
/*create new mqtt instance*/
MQTT_HANDLE = nb_mqttlib_new(CLIENT_ID, CLEAN_SESSION);
 
if (nb_mqttlib_set_protocol_version(MQTT_HANDLE, PROTOCOL)  < 0 ) {
  printf("Unable to set Protocol version\n");
    exit(1);
}
 
if (nb_mqttlib_set_user_pw(MQTT_HANDLE, USERNAME, PASSWORD) < 0 ) {
	printf("Unable to set Username and Passsword\n");
 
	exit(1);
}
 
if (nb_mqttlib_connect(MQTT_HANDLE, HOST, PORT, KEEPALIVE) < 0 ) {
	printf("Unable to connect\n");
	exit(1);
}
 
if (nb_mqttlib_subscribe(MQTT_HANDLE, TOPIC, QOS) < 0 ) {
	printf("Unable to subscribe\n");
	exit(1);
}
 
while (true){
 
ret = nb_mqttlib_get_callback_message(MQTT_HANDLE, TIMEOUT);
 
buffer=(string) ret.msg.msg;
 
if(DEBUG) printf("Ausgabe unforamtierten Daten\n%s\n",buffer);
temperatur_finden = strstr( buffer, '"tC"');  // Hier wird tC verarbeitet
if(temperatur_finden != NULL){
 if(DEBUG) printf("Temperatur Wert tC ist entalten\n");
 laenger_vom_string = strlen(buffer);
 if(DEBUG) printf("Hier wird die laenge des Daten String bestimmt: %s Zeichen \n",laenger_vom_string);
 neuer_daten_string = substr(buffer, temperatur_finden+1,laenger_vom_string);
 if(DEBUG) printf("Neuer Daten String verkleinert:\n%s\n",neuer_daten_string);
 temperatur_block = strstr( neuer_daten_string, ',');
 if(DEBUG) printf("Bestimmen des Temperaturblock: %s Zeichen\n",temperatur_block);
 neuerblock = substr(neuer_daten_string, 0,temperatur_block);
 if(DEBUG) printf("Ausgabe des Tempaturblock\n%s\n",neuerblock);
 temperatur = strstr( neuerblock, ':');
 if(DEBUG) printf("Nur der Tempaturwert %s Zeichen\n",temperatur);
 temperatur_zahl = substr(neuerblock, temperatur+1,laenger_vom_string);
 printf("Der Zahlenwert Temperatur: %s\n",temperatur_zahl);
 shelly_data.temperatur_zahl = temperatur_zahl;
 nb_config_set(sprintf("custom.var0=%s",shelly_data.temperatur_zahl));
 datum_zeit = strftime("%d/%m/%Y__%H:%M:%S",localtime(time()));
 if(DEBUG) printf("Ausgabe des Datum und der Zeit %s\n", datum_zeit);
 shelly_data.letzteaenderung = datum_zeit;
 nb_config_set(sprintf("custom.var1=%s",shelly_data.letzteaenderung));
}else {
  printf("Der String tC ist nicht vorhanden\n");
 }
if(DEBUG) printf("Ausgabe unforamtierten Daten\n%s\n",buffer);
luftfeuchte_finden = strstr( buffer, '"rh"');  // Hier wird rh verarbeitet
 if(luftfeuchte_finden != NULL){
  if(DEBUG) printf("Luftfeuchte Wert rh ist entalten\n");
  laenger_vom_string = strlen(buffer);
  if(DEBUG) printf("Hier wird die laenge des Daten String bestimmt: %s Zeichen \n",laenger_vom_string);
  neuer_daten_string = substr(buffer, luftfeuchte_finden+1,laenger_vom_string);
  if(DEBUG) printf("Neuer Daten String verkleinert:\n%s\n",neuer_daten_string);
  luftfeuchte_block = strstr(neuer_daten_string, '}');
  if(DEBUG) printf("Bestimmen des Luftfeuchteblock: %s Zeichen\n",luftfeuchte_block);
  neuerblock = substr(neuer_daten_string, 0,luftfeuchte_block);
  if(DEBUG) printf("Ausgabe des Luftfeuchteblock\n%s\n",neuerblock);
  luftfeuchte = strstr( neuerblock, ':');
  if(DEBUG) printf("Nur der Luftfeuchtewert %s Zeichen\n",luftfeuchte);
  luftfeuchte_zahl = substr(neuerblock, luftfeuchte+1,laenger_vom_string);
  printf("Der Zahlenwert Luftfeuchtigkeit: %s\n",luftfeuchte_zahl);
  shelly_data.luftfeuchte_zahl = luftfeuchte_zahl;
  nb_config_set(sprintf("custom.var2=%s",shelly_data.luftfeuchte_zahl));
 }else {
   printf("Der String rh ist nicht vorhanden\n");
  }
if(DEBUG) printf("Ausgabe unforamtierten Daten\n%s\n",buffer);
battery_finden = strstr( buffer, '"battery"');  // Hier wird battery verarbeitet
if(battery_finden != NULL){
 if(DEBUG) printf("Batteryspannungswert battery ist entalten\n");
 laenger_vom_string = strlen(buffer);
 if(DEBUG) printf("Hier wird die laenge des Daten String bestimmt: %s Zeichen \n",laenger_vom_string);
 neuer_daten_string = substr(buffer, battery_finden+1,laenger_vom_string);
 if(DEBUG) printf("Neuer Daten String verkleinert:\n%s\n",neuer_daten_string);
 battery_block = strstr( neuer_daten_string, ',');
 if(DEBUG) printf("Bestimmen des batteryblock: %s Zeichen\n",battery_block);
 neuerblock = substr(neuer_daten_string, 0,battery_block);
 if(DEBUG) printf("Ausgabe des batteryblock\n%s\n",neuerblock);
 battery = strstr( neuerblock, ':');
 if(DEBUG) printf("Nur der Battery Wert %s Zeichen\n",battery);
 battery_zahl = substr(neuerblock, battery+6,laenger_vom_string);
 if(battery_zahl >= 4){
 printf("Die Batterie Spannung : %s Volt\n",battery_zahl);
 shelly_data.battery_zahl = battery_zahl;
 nb_config_set(sprintf("custom.var3=%s",shelly_data.battery_zahl));
if(DEBUG) printf("Ausgabe unforamtierten Daten\n%s\n",buffer);
percent_finden = strstr( buffer, '"percent"');  // Hier wird percent verarbeitet
if(percent_finden != NULL){
 if(DEBUG) printf("Der String percent ist entalten\n");
 laenger_vom_string = strlen(buffer);
 if(DEBUG) printf("Hier wird die laenge des Daten String bestimmt: %s Zeichen \n",laenger_vom_string);
 neuer_daten_string = substr(buffer, percent_finden+1,laenger_vom_string);
 if(DEBUG) printf("Neuer Daten String verkleinert:\n%s\n",neuer_daten_string);
 percent_block = strstr( neuer_daten_string, '}');
 if(DEBUG) printf("Bestimmen des percentblock: %s Zeichen\n",percent_block);
 neuerblock = substr(neuer_daten_string, 0,percent_block);
 if(DEBUG) printf("Ausgabe des percentblock\n%s\n",neuerblock);
 percent = strstr( neuerblock, ':');
 if(DEBUG) printf("Nur der percent Wert %s Zeichen\n",percent);
 percent_zahl = substr(neuerblock, percent+1,laenger_vom_string);
 printf("Der Prozentsatz der Batterie Spannung : %s Prozent\n",percent_zahl);
 shelly_data.percent_zahl = percent_zahl;
 nb_config_set(sprintf("custom.var4=%s",shelly_data.percent_zahl));
} else {
  printf("Der String percent ist nicht vorhanden\n");
 }
 } else {
	printf("Der Shelly wird über USB-Netzteil versorgt: %s Volt nur.\n",battery_zahl);
shelly_data.percent_zahl = "n/a";
shelly_data.battery_zahl = battery_zahl;
nb_config_set(sprintf("custom.var3=%s",shelly_data.battery_zahl));
nb_config_set(sprintf("custom.var4=%s",shelly_data.percent_zahl));
  }
} else {
  printf("Der String battery ist nicht vorhanden\n");
  }
speicherzelle_1 = nb_config_get("custom.var0");
speicherzelle_2 = nb_config_get("custom.var1");
speicherzelle_3 = nb_config_get("custom.var2");
speicherzelle_4 = nb_config_get("custom.var3");
speicherzelle_5 = nb_config_get("custom.var4");
if(DEBUG) printf("Ausgabe von custom Speicherzelle_1 Temperatur: %s\n",speicherzelle_1);
if(DEBUG) printf("Ausgabe von custom Speicherzelle_2 Datum Zeit: %s\n",speicherzelle_2);
if(DEBUG) printf("Ausgabe von custom Speicherzelle_3 Luftfeuchtewert: %s\n",speicherzelle_3);
if(DEBUG) printf("Ausgabe von custom Speicherzelle_4 Batteryspannungswert: %s\n",speicherzelle_4);
if(DEBUG) printf("Ausgabe von custom Speicherzelle_5 Prozentsatz: %s\n",speicherzelle_5);
 
# Bestimmung der Absolutendluftfeuchtigkeit in der Luft Gramm pro Kubikmeter
I1 = (float) nb_config_get("custom.var0");   # Der Wert Temperatur
I2 = (float) nb_config_get("custom.var2");   # Der Wert Luftfeuchtigkeit
if(DEBUG) printf("Ausgabe von I1: und I2:  \n");
if(DEBUG) dump(I1, I2);
if(DEBUG) printf("\n");
# Berechnung der Absolutendluftfeuchtigkeit
result =  (10 ** 5 * 18.016/8314.3 * I2/100 * 6.1078 * (10 ** ((7.5*I1)/(237.3+I1))/(I1 + 273.15)));
if(DEBUG) printf("\n");
if(DEBUG) print(result);
if(DEBUG) printf("\n");
result = (string) result;
if(DEBUG) printf("Absolutendluftfeuchtigkeit Berechnung: %s g/m³\n",result);
nb_config_set(sprintf("custom.table2.0.var0=%s",result));
speicherzelle_result = nb_config_get("custom.table2.0.var0");
if(DEBUG) printf("Ausgabe von custom Speicherzelle_result Absolutendluftfeuchtigkeit: %s g/m³\n",speicherzelle_result);
 
//For debugging
//printf("Alle buffer Ausgaben:\n%s\n",buffer);
 if(ret<0){
			nb_syslog("Failed to publish mqtt message");
	}
 
sleep(1);
}
 exit(0);

Ausgabe der unformatierten Daten erhalten den Temperaturwert. Dieser wird aus dem string heraus geholt und dann in einer Speicherzelle hinterlegt. Zusätzlich wird die Information gespeichert der Änderung als Datun und Zeit.

USER
Ausgabe unforamtierten Daten {“id”:123,“src”:“shellyplusht-ID”,“dst”:“shellies/shelly-ID”, “result”:{“id”: 0,“tC”:22.5, “tF”:72.4}} Temperatur Wert tC ist entalten Hier wird die laenge des Daten String bestimmt: 121 Zeichen Neuer Daten String verkleinert: tC“:22.5, “tF”:72.4}} Bestimmen des Temperaturblock: 8 Zeichen Ausgabe des Tempaturblock tC”:22.5 Nur der Tempaturwert 3 Zeichen Der Zahlenwert Temperatur: 22.5 Ausgabe des Datum und der Zeit 30/10/2023_21:19:00

Wenn alle Information aus den unformatierten Daten heraus geholt wurden, bekommt man einen Datenblock von Speicherzellen. Zusätzlich wird noch die Absolutendluftfeuchtigkeit berechnet aus den Daten und auch in Speicherzelle gespeichert

USER
Ausgabe unforamtierten Daten {“id”:123,“src”:“shellyplusht-ID”,“dst”:“shellies/shelly-ID”,“result”:{“id”: 0,“battery”:{“V”:0.43, “percent”:0},“external”:{“present”:true}}} Batteryspannungswert battery ist entalten Hier wird die laenge des Daten String bestimmt: 162 Zeichen Neuer Daten String verkleinert: battery“:{“V”:0.43, “percent”:0},”external“:{“present”:true}}} Bestimmen des batteryblock: 18 Zeichen Ausgabe des batteryblock battery”:{“V”:0.43 Nur der Battery Wert 8 Zeichen Der Shelly wird über USB-Netzteil versorgt: 0.43 Volt nur. Ausgabe von custom Speicherzelle_1 Temperatur: 22.5 Ausgabe von custom Speicherzelle_2 Datum Zeit: 30/10/2023_21:19:00 Ausgabe von custom Speicherzelle_3 Luftfeuchtewert: 49.3 Ausgabe von custom Speicherzelle_4 Batteryspannungswert: 0.43 Ausgabe von custom Speicherzelle_5 Prozentsatz: n/a Ausgabe von I1: und I2: float: 22.5 float: 49.3 9.847469223 Absolutendluftfeuchtigkeit Berechnung: 9.847469223 g/m³ Ausgabe von custom Speicherzelle_result Absolutendluftfeuchtigkeit: 9.847469223 g/m³


3. Weitere Telemetrie Daten über andere Wege

Hier sind weiterführen Beschreibungen hinterlegt wie Telemetrie Daten direkt an den MQTT Broker übergeben werden können.


Ein weiterführen Beschreibungen wie ein Smartmeter über das Programme Tool Node-RED sein Daten an ein MQTT Broker übergeben. Das ganze wurde unter einem LXC Container realisiert.


4. Daten aus dem MQTT Broker strukturiert in eine Datenbank bringen

Über die Konsole eine influx Datenbank erstellen und zusätlich einen Benutzer für den Zugriff hinterlegen.

Die Struktur in der Datenbank wird dann mit einem Python Skript, über ein Json Format erstellt.

   json_body = [
        {
            'measurement': sensor_data.measurement,
            'tags': {
                'location': sensor_data.location
            },
            'fields': {
                'value': sensor_data.value
            }
        }
    ]
    influxdb_client.write_points(json_body)

Mit Hilfe des Python Skripts kann man Daten gezielt vom Broker abfragen und in die Datenbank strukturiert ablegen.

shellyplusht subscribe_datenbank.py
#! /usr/bin/python3
import re
import json
from typing import NamedTuple
 
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
 
from datetime import datetime
 
INFLUXDB_ADDRESS = 'IP Datenbank'
INFLUXDB_USER = 'benutzer'
INFLUXDB_PASSWORD = 'passwort'
INFLUXDB_DATABASE = 'Name Datenbank'
 
MQTT_ADDRESS = 'IP MQTT Broker'
MQTT_USER = ''
MQTT_PASSWORD = ''
MQTT_TOPIC = 'SH-plus-HT/#'
MQTT_REGEX = 'SH-plus-HT/([^/]+)/([^/]+)'
MQTT_CLIENT_ID = 'MQTT_InfluxDB_Bridge'
 
MQTT_FIELDS_1 = [
    'absolute-luftfeuchtigkeit-keller'
    # hier weitere relevante auflisten
]
MQTT_FIELDS_2 = [
    'luftfeuchtigkeitgesamt-keller'
    # hier weitere relevante auflisten
]
 
influxdb_client = InfluxDBClient(INFLUXDB_ADDRESS, 8086, INFLUXDB_USER, INFLUXDB_PASSWORD, None)
 
class SensorData(NamedTuple):
    location: str
    measurement: str
    value: float
 
def on_connect(client, userdata, flags, rc):
    """ The callback for when the client receives a CONNACK response from the server."""
   # print('Connected with result code ' + str(rc))
    client.subscribe(MQTT_TOPIC)
    client.subscribe('SH-plus-HT/#', qos=1)
 
def _parse_mqtt_message(topic, payload):
#    print(topic)
    match = re.match(MQTT_REGEX, topic)
    if match:
        location = match.group(1)
        measurement = match.group(2)
        if measurement == 'status':
            return None
        #return SensorData(location, measurement, float(payload))
#    print(" Wert von Payload:", payload)
    system1 = re.search('shelly-ID/absolute-luftfeuchtigkeit-keller', str(topic))
    if system1:
#      print("Es wurde der shelly gefunden")
#      print("Welcher Wert wird uebergeben vom Sensor jetzt: ", payload)
      system10 = re.search('absolute-luftfeuchtigkeit-keller', str(topic))
      if system10:
#        print("######## Absolute-Luftfeuchtigkeit ist enthalten /########")
        for field in MQTT_FIELDS_1:
#           print("Was haben wir hier: ",location)
#           print("was fuer ein feld: ",field)
#           print("Welche daten werden hier alles ausgeben: ",payload)
           yield SensorData(location, field, float(payload))
#    print(" Wert von Payload:", payload)
    system2 = re.search('shelly-ID/luftfeuchtigkeitgesamt-keller', str(topic))
    if system2:
#      print("Es wurde der shelly gefunden")
#      print("Welcher Wert wird uebergeben vom Sensor jetzt: ", payload)
      system20 = re.search('luftfeuchtigkeitgesamt-keller', str(topic))
      if system20:
#        print("######## Absolute-Luftfeuchtigkeit ist enthalten /########")
        for field in MQTT_FIELDS_2:
#           print("Was haben wir hier: ",location)
#           print("was fuer ein feld: ",field)
#           print("Welche daten werden hier alles ausgeben: ",payload)
           yield SensorData(location, field, float(payload))
    else:
        return None
 
def _send_sensor_data_to_influxdb(sensor_data):
#    print("datenbank: ",sensor_data)
    now = datetime.now()
#    print("#####################################")
#    print("    Zeit: ", now)
#    print("#####################################")
 
    json_body = [
        {
            'measurement': sensor_data.measurement,
            'tags': {
                'location': sensor_data.location
            },
            'fields': {
                'value': sensor_data.value
            }
        }
    ]
    influxdb_client.write_points(json_body)
 
def on_message(client, userdata, msg):
    """The callback for when a PUBLISH message is received from the server."""
#    print(msg.topic + ' ' + str(msg.payload))
    sensor_data_sets = _parse_mqtt_message(msg.topic, msg.payload.decode('utf-8'))
    if sensor_data_sets is None:
        print("Couldn't parse sensor data!")
        return
    for sensor_data in sensor_data_sets:
        _send_sensor_data_to_influxdb(sensor_data)
 
def _init_influxdb_database():
    databases = influxdb_client.get_list_database()
    if len(list(filter(lambda x: x['name'] == INFLUXDB_DATABASE, databases))) == 0:
        influxdb_client.create_database(INFLUXDB_DATABASE)
    influxdb_client.switch_database(INFLUXDB_DATABASE)
 
def main():
    _init_influxdb_database()
    mqtt_client = mqtt.Client(MQTT_CLIENT_ID)
    mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
    mqtt_client.on_connect = on_connect
    mqtt_client.on_message = on_message
    mqtt_client.connect(MQTT_ADDRESS, 1883)
    mqtt_client.loop_forever()
 
if __name__ == '__main__':
    print('MQTT to InfluxDB bridge')
    main()

Die Python Skripte werden über einen Cronjob gestartet und überwacht.

USER

# Das Skript wird alle 1 Minuten plus 20 sekunden gestartet * * * * * sleep 20; /home/korris/plus_ht.sh

Das Skript dazu prüft ob der Prozess noch läuft. Wenn ja macht er nichts. Wenn der Prozess beende wurde wird er wieder gestartet. Die Python Skript laufen endlos. Der Cronjob prüft nur ob sie noch laufen.

USER
#!/bin/bash # abfrage ob der tcp server prozess vorhanden ist if pgrep sh_plus_ht.py > /dev/null; then # Der Prozess ist vorhanden das skript laeuft echo Das Skript ist gestartet else # Der Prozes ist nicht vorhanden das skript muss gestartet werden echo Das Skript ist nicht gestartet /home/korris/sh_plus_ht.py & sleep 5s echo Das Skript wurde gestartet fi exit 0


Eine Übersicht der genutzten Telemetry Sensoren im Netzwerk.


5. Die Visualisierung der Telemetry Daten mit Hilfe von Grafana

Mit dem Programm Grafana wollen wir die Daten die wir Strukturiert in in der InfluxDB hinterlegt haben für den Benutzer Auswerten und Anzeigen lassen.

Als erstes binden Sie die InfluxDB in Grafana ein. Diese können Sie unter Option „Connections“ ausführen.


Die Query Language: InfluxQl können Sie nutzen wenn die Datenbank Version kleiner als 2.0 ist. Viele vorgefertigte Grafana Oberflächen sind noch nicht auf diese Version umgesetzt, die Sie nutzen könnten.

Ab jetzt hat man Zugriff auf die Daten, die zum einem schon in der Datenbank enthalten sind und die mit der Zeit noch hinterlegt werden.

Welche Messungen in Datenbank hinterlegt wurden:

USER
:~ $ influx Connected to http://localhost:8086 version 1.8.10 InfluxDB shell version: 1.8.10 :> use Shelly Using database Shelly :> show series key — power,location=shellyplug-s-C8C9A3B8E8C6 rh,location=shelly-08b61fcf3380 tC,location=shelly-08b61fcf3380 tC,location=shellydw2-73BBC6 tC,location=shellyflood-244CAB430495 temperatur,location=shellyplug-s-80646F81A273 temperatur,location=shellyplug-s-80646F81AD26 temperatur,location=shellyplug-s-80646F81F06B temperatur,location=shellyplug-s-C8C9A3A4DE17 value,location=shellydw2-73BBC6 value,location=shellyflood-244CAB430495 voltage,location=shellydw2-73BBC6 voltage,location=shellyflood-244CAB430495

5.1 Die erste Visualisierung unter Grafana

Für die ersten versuche mit Grafana muss man nicht gleich mit er Einrichtung eines Dashboards anfangen. Über die Option Entdecken kann man mit dem Programm erst mal Spielen und Funktion näher bringen. Aber auch für den ersten Test ob die Anbindung mit Datenbank funktioniert kann man hier sehr schön testen.


An die Daten kommt mit SQL Befehlen ran. Unter Grafana hat man aber die Möglichkeit, dass ganze über eine Oberfläche zu bedienen ohne tiefe Kenntnisse in SQL Behelfen zu haben.

Unter der Option WHERE kann das IoT Gerät ausgewählt werden welchen man sich anzeigen lassen will. Unter der Option select measurement kann man den Messwert auswählen der von diesen Gerät angeboten wird und auch in der Datenbank vorhanden ist.


Wenn alles Funktioniert bekommen wir hier im Beispiel Temperaturwert für einen Zeitraum von einer Stunden angezeigt.


Mit dieser Funktion Grafana Entdecken würde ich weitere Funktion unter dem Programme ausprobieren. Man kommt hier schnell auf Lösungsansätze die bei der Umsetzung im Dashboard hilfreich sind.

Wenn man dann bei der Umsetzung für ein Dashboard ist, kann man auf Information hier zurück Greifen.


6. Information aus dem MQTT Netzwerk an einen Freien Messenger weitergeben

Als Messenger wird ein XMPP Server genutzt. Dadurch das ein Standardisiertes offenes Protokoll genutzt wird. Gibt es für fast alles einen XMPP Client.

Auf dem LXC Container wird ein XMPP Cleint unter der Kommandozeile genutzt. Mit ihm kann man Nachrichten über das XMPP Protokoll verschicken.

link Manpage sendxmpp

Wenn das Kommandozeilen Programme auf dem Container installiert wurde kann man mit diesen Befehle das ganze Testen. Wichtig ist das der Benutzer der Sende auch auf dem XMPP Server angelegt wurde mit Passwort. Das Programme sendxmpp ist kein IM-Client. Es kann nur Nachrichten verschicken aber nicht empfangen.

USER
echo 'Hello World' | sendxmpp -n -t -u “XMPP_Benutzer_Sender” -j Server_Adresse -p Passwort_vom_Benutzer_Sender Benutzer_Empfänger@xmpp_Server_Adresse

Auf dem LXC Container läuft ein TCP Server unter einem Python Skript.

tcp_server tcp-server.py
# a simple tcp server
import socket,os
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('192.168.143.2', 60100))
sock.listen(5)
while True:
    connection,address = sock.accept()
    buf = connection.recv(1024)
    if (str(buf)) != "b''":
#        print("ja")
        file = open('xmpp_text.txt','w')
        temp =  str(buf, 'utf-8')
        file.write(str(temp))
#        print(file.write(str(buf)))
        file.close()
  #  else:
 #    print("ist gleich")
    connection.send(buf)
    connection.close()

Dieser warte auf Nachrichten die von SDK Skript auf dem Router erzeugt werden. Neue Nachrichten werden dann in einer Datei „xmpp_text.txt“ hinterlegt.

Über einen cronjob wird alle 10 Sekunden ein Skript gestartet. Dieses prüft ob eine neu Nachricht vorhanden ist. Wenn ja wird diese Nachricht über xmpp verschickt.

USER
:~# crontab -e

# Das Skript wird alle 10 Sekunden gestartet * * * * * /root/xmpp_skript.sh * * * * * sleep 10; /root/xmpp_skript.sh * * * * * sleep 20; /root/xmpp_skript.sh * * * * * sleep 30; /root/xmpp_skript.sh * * * * * sleep 40; /root/xmpp_skript.sh * * * * * sleep 50; /root/xmpp_skript.sh

Das cronjob Skript:

xmpp-nachricht xmpp_skript.sh
#!/bin/bash
if [ -r /root/xmpp_text.txt ]
then
sendxmpp -m /root/xmpp_text.txt -n -t -u "benutzer" -j server -p passwort empfänger@server
sleep 1s
rm /root/xmpp_text.txt
fi