stefandreyer / CODESYS-MQTT

MQTT client library for CODESYS, supporting all QoS
MIT License
109 stars 24 forks source link

Subscriber Issue #67

Open GSNord opened 4 months ago

GSNord commented 4 months ago

Hallo, ich habe mir die Funktionen aus dem InteractionHowto.project in mein Projekt übernommen. Als Gegenstelle habe ich lokal den Server von mosquitto.org unter Windows laufen. Was soweit geht ist:

Beim Empfang von Nachrichten habe ich aber irgenwie ein grundsätzliches Verständnisproblem. Ich habe eine Hauptfunktion gemacht, aus der gemäß Demoprojekt zyklisch der Client FB, der Publisher und der Subscriber aufgerufen werden:

MQTT_IN_OUT.clientFB(
    MQTT_IN_OUT := MQTT_IN_OUT      // das muß direkt beim Aufruf übergeben werden
);

FirstPublish();         // Aufruf des Publishers
FirstSubscription();        // Aufruf des Subscribers

In FirstSubscription dann im Header:

FUNCTION_BLOCK FirstSubscription
VAR
    Collector : MQTT.CallbackCollector;             // Instanz eines ein FB aus der Library
    Subscriber : MQTT.MQTTSubscribe;                // Instanz eines ein FB aus der Library

    init : BOOL;
    Receiver : MQTT.ReceiveString;                  // Instanz eines ein FB aus der Library
    Receiver2 : MQTT.ReceiveString;             // Instanz eines ein FB aus der Library
END_VAR

und im Deklarationsteil:

// einmalige Initialisierung
IF NOT init THEN
    init := TRUE;
    Subscriber.SetMqttInOut(MQTT_IN_OUT:= MqttMain.MQTT_IN_OUT);
    Collector.put(instance:= Receiver);
    Receiver.initAsFindTopic(compString:= ADR('TestTopic/FromCodesys'), returnHit:= TRUE);

    Collector.put(instance:= Receiver2);
    Receiver2.initAsFindTopic(compString:= ADR('Room2/Temp'), returnHit:= TRUE);
END_IF

// Einstellen der Werte und Aufruf des Subscriber
Subscriber.Subscribe:= TRUE;
Subscriber.Topic:= ADR('TestTopic/FromCodesys');    // wenn hier nichts eingetragen wird, gibt es einen Laufzeitfehler
Subscriber.QoSSubscribe:= MQTT.QoS.ExactlyOnce;
Subscriber.ExpectingString:= TRUE;
Subscriber.Callback:= Collector;

Subscriber();

Der FB Subscriber ist also irgendwie die Hauptfunktion für den Empfang? Wird ja auch zyklisch aufgerufen.

Der FB Collector ist dann so eine Art Sammelfunktion, die dem Subscriber über .Callback gemeldet wird?

Und schließlich der FB Receiver, hier vom Typ ReceiveString, der beim Collector über den put-Befehl angemeldet wird. So wie ich das verstanden habe, brauche ich für jeden Topic, den ich abbonieren möchte, einen eigenen Receiver, den ich per put-Befehl beim Collector anmelde?

Ich habe mal einen zweiten Receiver2 erstellt, der einen zweiten Topic empfangen soll. Dem gebe ich "Room2/Temp" als Suchstring mit. Und wieso muß das überhaupt beim Subscriber nochmal angeben werden? Siehe Zeile Subscriber.Topic:= ADR('TestTopic/FromCodesys'); Da geht ja dann irgendwie doch nur ein Topic.

Was ich nun beim mosquitto Server sehe, ist daß sich der Client anmeldet und "TestTopic/FromCodesys" abonniert, jedoch nicht "Room2/Temp". Alles was ich mit "Room2/Temp" publishe, wird demnach auch niemals beim Client ankommen.

Was ankommt, ist "TestTopic/FromCodesys", das sehe ich im Debugger als Text in MQTTReceived des ersten Receivers.

Was dann noch fehlt, ist irgenwie eine Callbackfunktion, die im Falle eines Empfangs dann automatisch aufgerufen wird? Bzw. deren Methode .PublishReceived? Wo schreibe ich das hin? Der Receiver ist ist ja schon deklariert, muß also noch mit der PublishReceived Methode erweitert werden?

GSNord commented 4 months ago

Ich habe nir jetzt einen FB "MyReceiver" wie folgt definiert:

FUNCTION_BLOCK MyReceiver EXTENDS MQTT.ReceiveString

Diesen verwende ich im FB "FirstSubscription" anstelle des Receivers aus der Library. Die Vorgaben kamen automatisch vom Codesys, auch die Methode "PublishReceived". Diese wird nun bei jedem Empfang automatisch aufgerufen, und somit kann ich mir die empfangenen Daten in einen Buffer schreiben:

METHOD PublishReceived : BOOL
VAR_INPUT
    (*Collection of received Data*)
    Data    : MQTT.CALLBACK_DATA;
END_VAR

----

IF Data.PayloadIsString THEN
    sTopic := Data.TopicOut^;
    sMessage := Data.PayloadString^;
    MqttMain.AddMessage(Topic := sTopic, Message := sMessage);      // Dem Empfangs-Buffer hinzufügen   
END_IF

Nach wie vor habe ich nur einen einzigen Receiver und einen einzigen Topic, der empfangen wird, da muß ich mit Wildcards arbeiten. Geht auch irgendwie.

Sorry, mit dem ganzen objektorientierten Kram kenne ich mich wenig aus, seit ich vor 15 Jahren mit Codesys 2.3 angefangen habe und dann irgendwann auf die 3.5 umgestiegen bin.

stefandreyer commented 4 months ago

Hi,

okay, das ist vielleicht alles nicht ganz Intuitiv, aber es macht Spaß ;)

  1. der Subscriber macht nur die Subscription zum Broker, wenn diese Angemeldet ist, ist der raus und hat mit dem Empfangen nichts mehr zu tun. 2- wir ein Paket das zu dem Subscription Topic passt empfangen wir die PublishReceived methode des an den Subscription FB übergebenen FBs aufgerufen.
  2. Der Collector FB ist eine Dynamische liste. An diesem können verschiedene FBs übergeben werden. Der Collector FB wird dem Subscription FB Übergeben. Wird die PublishReceived Methode des Collector FBs aufgerufen, so ruft die PublishReceived methode des Collector FBs alle PublishReceived methoden auf, welche in der Liste des Collector FBs aufgenommen wurden.
  3. der Collector FB kann mehrere subscription FBs übergeben werden um auf mehrere Topics zu reagieren. z.B. hier um auf die set Topics von IOBroker in verschiedenen Ebenen zu reagieren: image Dafür müssen dann die dem Collector FB übergebene FBs in ihren PublishReceived entscheiden, ob die beim Aufruf der Methode Übergeben Daten, z.B. Topic, relevant für diese sind und entsprechend reagieren.
  4. Vorteil, man kann die FBs so implementieren, das sich diese selber beim Collector FB registrieren. für den Universellen Empfang gibt es FB ReceiveString und FB ReceiveValue diese können für die Reaktion auf die Passende Topic oder Payload initialisiert werden.

Reicht das erst mal als Ansatz oder soll ich noch was erklären?

Grüße Stefan

GSNord commented 4 months ago

Danke für die Erklärung mit den Subscribern, die haben ja dann mit dem Receiver nichts zu tun. Ich hab mir 2 Subscriber gemacht, die jeweils einen Topic mit nur 1 Wildcard abonnieren. Dazu einen Receiver mit etwas breiterer Wildcard, die alles nötige abdeckt. Das kommt dann sowieso erst mal alles in einen Ringbuffer rein und wird dann entsprechend ausgewertet. Da brauche ich keine 2 Receiver, die sich möglicherweise noch überschneiden. Läuft soweit!