tuanpmt / esp32-mqtt

ESP32 MQTT sample project for
https://github.com/tuanpmt/espmqtt
Apache License 2.0
178 stars 51 forks source link

Only one retained message received #10

Open julianschill opened 7 years ago

julianschill commented 7 years ago

When the client subscribes to multiple topics with a wild card and receives multiple retained messages, the callback is only called once. The data is somehow in the data buffer, but I expect that the callback function is called for every message.

I think the problem is in line 376 in mqtt.c: if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length)

this if statement is probably always true and so the loop stops after one run even when there are more messages left in the buffer

julianschill commented 7 years ago

I tried to fix the problem, but I was very confused about the construction of the deliver_publish function. After some debugging and guesswork I found a working solution (at least for me):

void deliver_publish(mqtt_client *client, uint8_t *message, int length)
{
    mqtt_event_data_t event_data;
    int len_read = 0, total_mqtt_len = 0, mqtt_len = 0, mqtt_offset = 0;
    do
    {
        do{
            event_data.topic_length = length;
            event_data.topic = mqtt_get_publish_topic(message+mqtt_offset, &event_data.topic_length);
            event_data.data_length = length;
            event_data.data = mqtt_get_publish_data(message+mqtt_offset, &event_data.data_length);

            //mqtt_info("message_length: %d bytes message_length_read: %d bytes, data_length: %d bytes ", client->mqtt_state.message_length, client->mqtt_state.message_length_read, event_data.data_length);
            total_mqtt_len = client->mqtt_state.message_length_read;
            mqtt_len = event_data.data_length + event_data.topic_length + 4;

            event_data.data_total_length = total_mqtt_len;
            event_data.data_offset = mqtt_offset;
            if(client->settings->data_cb) {
                client->settings->data_cb(client, &event_data);
            }
            mqtt_offset += mqtt_len;
        }while (mqtt_offset < total_mqtt_len);

        if (client->mqtt_state.message_length_read >= client->mqtt_state.message_length)
            break;

        len_read = client->settings->read_cb(client, client->mqtt_state.in_buffer, CONFIG_MQTT_BUFFER_SIZE_BYTE, 0);
        if(len_read < 0) {
            mqtt_info("Read error: %d", errno);
            break;
        }
        client->mqtt_state.message_length_read += len_read;
    } while (1);
}

It works for me now, except if the messages are getting larger than the buffer size. But as I use only small messages, this is not a problem for me.

I'm not sure if the function now works as intended, perhaps someone can have a look at it.

DavidAntliff commented 6 years ago

I think this is due to a fundamental misunderstanding of TCP in the design. The code treats the TCP connection as a packet source, making an assumption that each "read" will return one and only one message. However in reality TCP is a stream and there may be multiple packets waiting, including fragmented ones. My own testing found that this library often drops messages in both directions, especially when many are sent at once (e.g. reconnection followed by receipt of retained messages), because only the first message is processed. The rest are thrown away.

I spent some time looking over the code and I think it can be fixed, I just don't have the time or motivation at the moment to do so myself, unfortunately.