X-Ryl669 / eMQTT5

An embedded MQTTv5 client in C++ with minimal footprint, maximal performance
MIT License
65 stars 14 forks source link

Publisher and Subscriber in the same client #12

Closed nitolsaha-ns closed 8 months ago

nitolsaha-ns commented 1 year ago

I want to implement publisher and subscriber in the same client. Publisher will send data in every 1 second and subscriber will listen for any incoming message in another topic. Can you provide me with any example?

X-Ryl669 commented 1 year ago

Sure:

// Usual programs
#include <stdio.h>
#include <stdlib.h>

// We need MQTT client 
#include "Network/Clients/MQTT.hpp"

struct MessageReceiver : public Network::Client::MessageReceived
{
    void messageReceived(const Network::Client::MQTTv5::DynamicStringView & topic, const Network::Client::MQTTv5::DynamicBinDataView & payload, 
                         const uint16 packetIdentifier, const Network::Client::MQTTv5::PropertiesView & properties)
    {
        fprintf(stdout, "Msg received: (%04X)\n", packetIdentifier);
        fprintf(stdout, "  Topic: %.*s\n", topic.length, topic.data);
        fprintf(stdout, "  Payload: %.*s\n", payload.length, payload.data);
    }

};

Network::Client::MQTTv5::QoSDelivery QoS = Network::Client::MQTTv5::QoSDelivery::AtMostOne;

volatile bool cont = true;
void ctrlc(int sig)
{
    if (sig == SIGINT) cont = false;
}

int main(int argc, const char ** argv)
{

    const char * server = "yourserver.com";
    const uint16 port = 1883;
    const char * username = "bob";
    const char * password = "sponge";
    const char * subscribe = "/topic/to/subscribe/to";
    const char * publishTopic = "/topic/to/publish/to";
    const char * publishMessage = "I'm alive... again";
    unsigned keepAlive = 300; 

    MessageReceiver receiver;

    Network::Client::MQTTv5 client(clientID, &receiver);
    Network::Client::MQTTv5::DynamicBinDataView pw(strlen(password), (const uint8*)password);

    if (Network::Client::MQTTv5::ErrorType ret = client.connectTo(server, port, false, 
                                                                  (uint16)min(65535U, keepAlive), true, username, &pw))
    {
        return fprintf(stderr, "Failed connection to %s with error: %d\n", (const char*)serverURL.asText(), (int)ret); 
    }
    printf("Connected to %s\n", (const char*)serverURL.asText());

    // Check if we have some subscription
        if (Network::Client::MQTTv5::ErrorType ret = client.subscribe(subscribe, Protocol::MQTT::V5::GetRetainedMessageAtSubscriptionTime, true, QoS, false))
        {
            return fprintf(stderr, "Failed subscribing to %s with error: %d\n", (const char*)subscribe, (int)ret);
        }
        printf("Subscribed to %s\nWaiting for messages...\n", (const char*)subscribe);

        // Then enter the event loop here
        signal(SIGINT, ctrlc);
        time_t last = time(NULL);
        while (cont)
        {
            if (Network::Client::MQTTv5::ErrorType ret = client.eventLoop())
                return fprintf(stderr, "Event loop failed with error: %d\n", (int)ret);

            // Is it time to publish ?
            if (time(NULL) - last >= 1) {
                if (Network::Client::MQTTv5::ErrorType ret = client.publish(publishTopic, publishMessage, strlen(publishMessage), false, QoS)) {
                  return fprintf(stderr, "Failed publishing %s to %s with error: %d\n", publishMessage, publishTopic, (int)ret);
              }
              printf("Published %s to %s\n", publishMessage, publishTopic);
              last = time(NULL);
           }
       }

        return 0;
}

This is more or less the same as in the test client file MQTTc.cpp. I've removed all utility code so it's as simple as it can be. I haven't tested the code, but you get the idea...

nitol-saha commented 1 year ago

Thanks! I will run the code today and will let you know if I face any issues.

nitol-saha commented 1 year ago

Hello, I was able to run the code. I modified the code a little bit. I have one question. In the modified code below whenever I am getting a data in subscriber topic, I am publishing the timestamp. I am checking the echoCheck variable and if I receive any message, I am publishing timestamp in while (cont) loop. Is this the correct way of doing it? or can I publish in the messageReceivedfunction? If possible can you guide me how to implement it? My concern is as I am running a infinte while loop and checking the echoCheckvariable continuously it will increase the CPU usage.

#include <stdio.h>
#include <stdlib.h>
#include <ctime>
#include <signal.h>

// We need MQTT client
#include "Network/Clients/MQTT.hpp"
int echoCheck=0;
char* data; 
struct MessageReceiver : public Network::Client::MessageReceived
{
    void messageReceived(const Network::Client::MQTTv5::DynamicStringView& topic,
                         const Network::Client::MQTTv5::DynamicBinDataView& payload,
                         const uint16_t packetIdentifier,
                         const Network::Client::MQTTv5::PropertiesView& properties)
    {
        fprintf(stdout, "Msg received: (%04X)\n", packetIdentifier);
        fprintf(stdout, "  Topic: %.*s\n", topic.length, topic.data);
        fprintf(stdout, "  Payload: %.*s\n", payload.length, payload.data);
        echoCheck=1;
    }
};

Network::Client::MQTTv5::QoSDelivery QoS = Network::Client::MQTTv5::QoSDelivery::AtMostOne;

volatile bool cont = true;

void ctrlc(int sig)
{
    if (sig == SIGINT)
        cont = false;
}

int main(int argc, const char** argv)
{
    const char* server = "localhost";
    const uint16_t port = 1883;
    const char* username = "bob1";
    const char* password = "sponge1";
    const char* subscribe = "topic1";
    const char* publishTopic = "topic2";
    const char* clientID = "Client2";
    unsigned keepAlive = 300;

    MessageReceiver receiver;

    Network::Client::MQTTv5 client(clientID, &receiver);
    Network::Client::MQTTv5::DynamicBinDataView pw(strlen(password), (const uint8_t*)password);

    if (Network::Client::MQTTv5::ErrorType ret = client.connectTo(server, port, false,
                                                                  (uint16_t)std::min(65535U, keepAlive), true,
                                                                  username, &pw))
    {
        return fprintf(stderr, "Failed connection to %s with error: %d\n", server, (int)ret);
    }
    printf("Connected to %s\n", server);

    // Check if we have some subscription
    if (Network::Client::MQTTv5::ErrorType ret =
            client.subscribe(subscribe, Protocol::MQTT::V5::GetRetainedMessageAtSubscriptionTime, true, QoS, false))
    {
        return fprintf(stderr, "Failed subscribing to %s with error: %d\n", subscribe, (int)ret);
    }
    printf("Subscribed to %s\nWaiting for messages...\n", subscribe);

    // Then enter the event loop here
    signal(SIGINT, ctrlc);
    time_t lastPublishTime = time(NULL);

    while (cont)
    {
        if (Network::Client::MQTTv5::ErrorType ret = client.eventLoop())
            return fprintf(stderr, "Event loop failed with error: %d\n", (int)ret);

        // Check if it's time to publish (every second)
        time_t currentTime = time(NULL);
        if (echoCheck == 1)
        {
            // Convert current time to string format
            char timestamp[20];
            strftime(timestamp, sizeof(timestamp), "%Y-%m-%d %H:%M:%S", localtime(&currentTime));

            // Publish the timestamp
            if (Network::Client::MQTTv5::ErrorType ret =
                    client.publish(publishTopic, (const uint8_t*)timestamp, strlen(timestamp), false, QoS))
            {
                return fprintf(stderr, "Failed publishing %s to %s with error: %d\n", timestamp, publishTopic,
                               (int)ret);
            }

            printf("Published %s to %s\n", timestamp, publishTopic);
            lastPublishTime = currentTime;
            echoCheck=0;
        }
    }

    return 0;
}
X-Ryl669 commented 1 year ago

There's no notion of threads in eMQTT5. This means that MessageReceiver::messageReceived is called inside the eventLoop method. So yes, you can publish in the messageReceived method. The while loop isn't an infinite loop either because inside the eventLoop there's a call to select (or epoll or kevent depending on the platform) for the MQTT socket. This gives the control to the kernel, and the kernel will sleep the CPU for the default timeout period (you can change with client.setDefaultTimeout) until there's an event on the socket, not wasting CPU resources.

On a microcontroller like the ESP32, this also works this way, the application CPU is waiting on a FreeRTOS' semaphore that will only be signaled when a WIFI packet is received, not burning CPU cycles. Yet, on such micro-controller, because the WIFI hardware device must listen to messages, it can't be put to sleep, so you'll get the usual WIFI consumption of 160mA.

nitol-saha commented 1 year ago

Thank you for the information. I have another question. On paho there is on_publish callback to check the message has been published or not.

On_publish Callback
When the message has been published to the Broker an acknowledgement is sent that results in the on_publish callback being called.

Is there any similar thing in the eMQTT5 library? I have only seen messageReceived method.

X-Ryl669 commented 1 year ago

Depends on what you expect. If you want to make sure the message is received by the broker, you can set the QoS to AtLeastOne that's QoS=1 or ExactlyOne that's QoS=2. Any missing ACK will return an error from publish method. If you are paranoid that the broker is acting as expected, you can also subscribe to the same topic you're publishing to and set the withAutoFeedback parameter to true when subscribing so you'll receive your own message.

If you want to make sure the message is just sent, then you'll check the return code of the publish method. If it's 0 (or Success) than you are sure the message was sent. If it's anything other than that, there was a failure sending or creating the message.

When you call publish, the whole publish cycle is performed so it either result in a successful publishing or a failed publishing, with no intermediate or transcient state

nitol-saha commented 1 year ago

Thanks for the information! I am getting CPU usage very low for my use case.

nitol-saha commented 1 year ago

I have some general questions about the library to find compatibility for my project. If you can answer the following questions that would be really helpful:

X-Ryl669 commented 1 year ago
* Is the library compatible or can be made compatible with ACE (Adaptive Communication Environment) (http://www.dre.vanderbilt.edu/~schmidt/ACE-overview.html)?

There is no such thing as ACE in the library. It can be made compatible with it, but you'll need to write a wrapper. Since the library use the minimum possible dependencies, there's almost nothing to wrap (no thread, no filesystem, no OS, etc...), only the socket code, so it shouldn't be hard I think. Please have a look at esp-eMQTT5 port for an example wrapping for FreeRTOS/Lwip, it's ~100 lines of code.

* Is there any dynamic memory allocation (new, delete)?

No, see here. There's only one heap allocation for the socket and the single network buffer when creating the MQTTClient instance, but you can substitute it for a stack based buffer instead if you need so (change the line 695 of MQTTClient.cpp to look like this:

        /** The receiving data buffer */
        uint8               recvBuffer[YOUR MAX EXPECTED PACKET SIZE HERE IN BYTE];
        /** The receiving VBInt size for the packet header */
        uint8               packetExpectedVBSize;
        BaseSocket    _socket; 

        uint16 allocatePacketID()
        {
            return ++publishCurrentId;
        }

        Impl(const char * clientID, MessageReceived * callback, const DynamicBinDataView * brokerCert)
             : _socket(YOUR TIMEOUT HERE IN MS), socket(0), brokerCert(brokerCert), clientID(clientID), cb(callback), timeoutMs({3, 0}), lastCommunication(0), publishCurrentId(0), keepAlive(300),
#if MQTTUseUnsubscribe == 1        
               unsubscribeId(0), lastUnsubscribeError(ErrorType::WaitingForResult),
#endif
               recvState(Ready), recvBufferSize(max(callback->maxPacketSize(), 8U)), maxPacketSize(65535), available(0), packetExpectedVBSize(Protocol::MQTT::Common::VBInt(recvBufferSize).getSize())
        {}
        ~Impl() { }

[...And around line 858...]
        void close()
        {
            _socket.close();
            socket = 0;
        }

        bool isOpen()
        {
            return socket;
        }

        int send(const char * buffer, const int size) { return socket ? socket->send(buffer, size) : -1; }

        int connectWith(const char * host, const uint16 port, const bool withTLS)
        {
            if (isOpen()) return -1;
            socket = &_socket;
            return socket ? socket->connect(host, port, brokerCert) : -1;
        }
* Does the library have threading model or single thread execution?

There's no thread used in the library. Your main thread/task call the library and everything is done in the eventLoop function. There's no mutex, no lock, whatsoever. When used in a single thread execution model, there's nothing to do. The library isn't re-entrant and because of this, if used in a multithreaded environment you need to protect access to it with your OS's mutex and primitives to avoid multiple thread from corrupting the internal state of the library.

* How are the exception handled? (Exception handling should not throw and end execution)

There is no exception thrown or managed in the library. Exceptions are not used in eMQTT5 since it bloats the application. Every error is returned by the function return type. Either it's an ErrorType (an enum) or a bool (with obvious true = success, false = failure) or a pointer (with nullptr = failure, IIRC).

Even more, there is no RTTI used either. It was made to shrink the application binary size to the absolute minimum possible, so any optional feature of C++ was disabled.

nitol-saha commented 1 year ago

Thank you so much for the information. It really helped me a lot. As this library supports v5, does this mean than it can also connect to MQTT broker running on v3.1.1?

X-Ryl669 commented 1 year ago

No, the protocol is different and it's not retro-compatible. Most brokers support v5.0 so it's not a big deal IRL. In fact, it's easier to write a MQTTv5 broker than it is for a MQTTv3.0/1.1, since you don't have to deal with so many failure. For those who don't, they will refuse to connect because the client only claims V5 on first connection and not V3.1.1 and it won't retry, so it's safe and won't break the old network's packet parsers. Supporting both would require adding 60% of binary size and I don't think it's worth it since MQTTv3 is now deprecated.

nitol-saha commented 1 year ago

Thank you so much for the information.