eclipse-paho / paho.mqtt.embedded-c

Paho MQTT C client library for embedded systems. Paho is an Eclipse IoT project (https://iot.eclipse.org/)
https://eclipse.org/paho
Other
1.37k stars 757 forks source link

Publish data is getting corrupted and changes into Publish ack/ Publish Release/ Publish complete #262

Open haneul49 opened 2 months ago

haneul49 commented 2 months ago

Hi all, I have a weird issue. I'm trying to create a test application. I have two clients in my embedded application. One subscribes to topics and the other publishes data. Both are running in two different tasks. When I try to send or receive data at a high rate for example: 1000 bytes of data every 20 milli second, at times my client is seen sending PUBCOMP, PUBREL commands but I've written the code to send data in QOS0. Same happens when I try to receive data at that rate. Here I tried to send data from an external python client. When such random PUBREL commands are frequently received, My connection is getting lost.

I'm attaching screenshots from wireshark as well as my code snippets

image

image

image

const char* messages[1] = {
    "Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book. It has survived not only five centuries, but also the leap into electronic typesetting, remaining essentially unchanged. It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing software like Aldus PageMaker including versions of Lorem Ipsum.It is a long established fact that a reader will be distracted by the readable content of a page when looking at its layout. The point of using Lorem Ipsum is that it has a more-or-less normal distribution of letters, as opposed to using 'Content here, content here', making it look like readable English. Many desktop publishing packages and web page editors now use Lorem Ipsum as their default model text, and a search.",
}
static void clientSubscribeTask(void* param)
{
    int ret;
    for(;;)
    {
        if(!g_mqttClientRx.isconnected)  // Use g_mqttClientRx for reception
        {
            MQTTDisconnect(&g_mqttClientRx);  // Disconnect the reception client if not connected
            connectBroker(&g_mqttClientRx, &g_netRx);  // Reconnect to the broker for reception (ensure connectBroker uses g_mqttClientRx and g_netRx)
            vTaskDelay(1000);
        }
        else
        {
            ret = MQTTYield(&g_mqttClientRx, 2000);  // Yield to handle incoming messages
            vTaskDelay(20);
        }
    }
}`

`static void clientPublishTask(void* param)
{
    TxTopicMessage data;
    int ret = 0;
    for(;;)
    {
        if(!g_mqttClientTx.isconnected)
        {
            MQTTDisconnect(&g_mqttClientTx);  // Disconnect if already connected
            connectBroker(&g_mqttClientTx, &g_netTx);  // Connect the Tx client
            vTaskDelay(1000);  // Delay between reconnection attempts
        }

        else
        {

            if (xQueueReceive(xPubDataQueue, &data, portMAX_DELAY) == pdTRUE)
            {
                NOTIFICATION("\nPublish Task:\n")
                ret = MQTTPublish(&g_mqttClientTx, data.topic, &data.message); // check return to see if the connection is active
    //              message.payload = (void*)str;
    //              message.payloadlen = strlen(str);

    //              ret = MQTTPublish(&g_mqttClient, "topic5", &message);

                if(ret != 0)
                {
                    switch (ret) {
                        case BUFFER_OVERFLOW:
                            NOTIFICATION("BUFFER_OVERFLOW\n");
                            break;
                        case FAILURE:
                            NOTIFICATION("\nData Publish failed\n");
                            break;
                        // Add other cases based on your MQTT library's error codes
                        default:
                            NOTIFICATION("Unknown error occurred\n");
                    }
                }
            }
        }
        vTaskDelay(100);
    }
}

I'm using lwip stack. It would be great if anyone could help me with this.

haneul49 commented 2 months ago

@icraggs can you please help me with this ?