wolfSSL / wolfMQTT

wolfMQTT is a small, fast, portable MQTT client implementation, including support for TLS 1.3.
https://www.wolfssl.com
GNU General Public License v2.0
518 stars 156 forks source link

publish and subscribe at the same time in the same client #338

Closed nitol-saha closed 1 year ago

nitol-saha commented 1 year ago

I am facing difficulties to publish and subscribe at the same time in the same client. I am publishing a data to Topic A in every 1 second. While publishing, I want to get incoming message to another topic B. I subscribed to topic B but when I start publishing to topic A , I am not getting any data to topic B. I am publish data to topic B from another client. I have modified the code from this link: https://github.com/wolfSSL/wolfMQTT/tree/master/examples/mqttsimple. I am publishing the data using this code:

for (int i = 0; i < 10; i++) {  
    sleep(1); // wait for 1 second  
    time_t now = time(NULL);  
    struct tm *local_time = localtime(&now);  
    char timestamp[20];  
    strftime(timestamp, 20, "%Y-%m-%d %H:%M:%S", local_time);  
    mqttObj.publish.buffer = (byte*)timestamp;  
    mqttObj.publish.total_len = XSTRLEN(timestamp);  
    rc = MqttClient_Publish(&mClient, &mqttObj.publish);  
    if (rc != MQTT_CODE_SUCCESS) {  
        break;  
    }  
    PRINTF("MQTT Publish: Topic %s, Qos %d, Message %s",  
        mqttObj.publish.topic_name, mqttObj.publish.qos, mqttObj.publish.buffer);  
}

If I use MqttClient_WaitMessage_ex(&mClient, &mqttObj, MQTT_CMD_TIMEOUT_MS); for subscription in the above code it stops publishing.

Do I have to use threading to achieve this? or am I missing something? For other libraries like: mosquitto, paho when I subscribe to a topic and also publish at the same time to another topic I do not have create threading. Does this library has the same feature?

embhorn commented 1 year ago

Hi @nitol-saha

Messages on the subscribed topics will only be received when the application is in MqttClient_WaitMessage_ex API. So your application would want to stay in a loop calling that API. You can set the timeout to 1s, then call the publish API. After the publish completes, it will loop back to the wait API.

Here is some similar code that waits for incoming messages, and also is able to publish if a message is typed into STDIN: https://github.com/wolfSSL/wolfMQTT/blob/dfe99964890a9bed07c334a2a827f3dc90751def/examples/mqttclient/mqttclient.c#L532-L599

Alternatively, you could write a multithreaded application, with a publish thread and a receive thread. Here is a good starting point: https://github.com/wolfSSL/wolfMQTT/tree/master/examples/multithread

Is this project related to a commercial application? Please feel free to email our support team if you'd rather keep this information private. support@wolfssl.com

Thanks, Eric - wolfSSL Support

nitol-saha commented 1 year ago

No, this is not a commercial application, just a personal project. I am able to run the client using pthread. It seems it is working for few seconds, then I get this error during publishing MQTT Error -4: Error (Packet Type Mismatch). How to handle this error?

embhorn commented 1 year ago

Please enable debug logging with --enable-debug=trace and share the log.

nitol-saha commented 1 year ago

I am now able to run the multithread code you provided without any error. I missed this documentation previously: https://www.wolfssl.com/documentation/manuals/wolfmqtt/index.html. One question: how can I check if the message is published to the broker in the multithread code: https://github.com/wolfSSL/wolfMQTT/tree/master/examples/multithread? I have seen there is MqttPublishCb pointer but struggling how to implement it.

embhorn commented 1 year ago

Excellent! For checking if the message is published, you could enable verbose mode in the broker. For example, with mosquitto you can start the broker with -v. You could also use another client to subscribe to the same topic.

The publish callback is used to send payloads larger than the transmit buffer size:

/*! \brief      Mqtt Publish Callback.
 *  If the publish payload is larger than the maximum TX buffer
    then this callback is called multiple times. This callback is executed from
    within a call to MqttPublish. It is expected to provide a buffer and it's
    size and return >=0 for success.
    Each callback populates the payload in MqttPublish.buffer.
    The MqttPublish.buffer_len is the size of the buffer payload.
    The MqttPublish.total_len is the length of the complete payload message.
 *  \param      publish     Pointer to MqttPublish structure
 *  \return     >= 0        Indicates success
 */
typedef int (*MqttPublishCb)(MqttPublish* publish);
nitol-saha commented 1 year ago

On paho there is on_publish callback to check the message has been published or not.

On_publish Callback
When the message has been published to the Broker an acknowledgement is sent that results in the on_publish callback being called.

Is there any similar thing in WolfMQTT library?

embhorn commented 1 year ago

That is interesting. No, there is only the return value from the publish call in wolfMQTT. Sounds like a neat feature, though. You can open a feature request by emailing support@wolfssl.com

nitol-saha commented 1 year ago

Thank you, will do that. I am facing another problem. I want to run this code on the docker environment. I am using following command to build the docker image.

FROM ubuntu:latest    
ENV TZ=America/New_York   
RUN apt-get update && apt-get install -y gcc make libtool autoconf git sudo libbsd-dev    
RUN apt-get install -y openssl libssl-dev    
RUN apt-get install -y    

RUN git clone https://github.com/wolfssl/wolfssl.git  
WORKDIR /wolfssl  
RUN ./autogen.sh  
RUN ./configure --enable-ssh  
RUN make  
RUN sudo make install  
WORKDIR /
RUN git clone https://github.com/wolfSSL/wolfMQTT.git  
WORKDIR /wolfMQTT 
# RUN rm /wolfMQTT/examples/mqttexample.h
# COPY mqttexample.h /wolfMQTT/examples/
# RUN rm /wolfMQTT/examples/multithread/multithread.c
# COPY multithread.c /wolfMQTT/examples/multithread/   
RUN ./autogen.sh    
RUN ldconfig 
RUN ./configure --enable-mt --disable-tls --enable-debug=trace

RUN make && sudo make install    

CMD ["./examples/multithread/multithread"]  

But when I run the container, I am getting following error:

NetConnect: Host test.mosquitto.org, Port 1883, Timeout 5000 ms, Use TLS 0
MqttSocket_Connect: Rc=0
MqttClient_EncodePacket: Len 28, Type Connect (1), ID 0, QoS 0
PendResp Add: 0x560117ceb200, Type Connect Ack (2), ID 0
MqttSocket_Write: Len=28, Rc=28
MqttClient_WaitType: Type Connect Ack (2), ID 0, State 0-0
PendResp Find: Type Connect Ack (2), ID 0
PendResp Found: 0x560117ceb200, Type Connect Ack (2), ID 0, InProc 0, Done 0
lockRecv: (MqttClient_WaitType:960)
MqttSocket_ReadDo: Len=2, Rc=-102
MqttClient_WaitType: rc -102, state 1-0
unlockRecv: (MqttClient_WaitType:1278)
MqttClient_WaitType: Failure: STDIN Wake (-102)
PendResp Remove: 0x560117ceb200
Cancel Msg: 0x560117ceb1f0
MqttClient_EncodePacket: Len 2, Type Disconnect (14), ID 0, QoS 0
MqttSocket_Write: Len=2, Rc=2
MqttSocket_Disconnect: Rc=0

Can you help me to find any problem with my code?

embhorn commented 1 year ago

Hi @nitol-saha

Looks like an issue with STDIN. The examples can be stopped using <ctrl>c. If the environment doesn't support input, you can use the configure option --disable-stdincap. You can also run the test with the -T option to set "test mode", which will force an exit after successful execution.

nitol-saha commented 1 year ago

Thanks for the suggestions! Now everything is working properly.

nitol-saha commented 1 year ago

I have some general questions about the library to find compatibility for my project. If you can answer the following questions that would be really helpful:

embhorn commented 1 year ago

Hi @nitol-saha

Happy to help answer these questions:

Is the library compatible or can be made compatible with ACE (Adaptive Communication Environment) (http://www.dre.vanderbilt.edu/~schmidt/ACE-overview.html)?

wolfMQTT is very flexible, and you can define the IO interface using the network layer definition: https://github.com/wolfSSL/wolfMQTT/blob/master/examples/mqttnet.c

Is there any dynamic memory allocation (new, delete)?

wolfMQTT does not use malloc and free in the library. IO buffers are allocated by the application and pointers are passed in into the initialization.

Does the library have threading model or single thread execution?

Yes, wolfMQTT is compatible with multi or single threaded application. We support non-blocking applications as well.

How are the exception handled? (Exception handling should not throw and end execution)

All API are C style, and return an error code to indicate failure.

Thanks,

embhorn commented 1 year ago

Hi @nitol-saha

I'll go ahead and close this issue. Please feel free to reopen or ask more questions in a new issue.

Thanks, @embhorn