tuanpmt / esp32-mqtt

ESP32 MQTT sample project for
https://github.com/tuanpmt/espmqtt
Apache License 2.0
180 stars 51 forks source link

Pending Subscription message is overwritten by Publish => subscribe callback is missed #18

Open DavidAntliff opened 7 years ago

DavidAntliff commented 7 years ago

In connected_cb() there is a subscribe immediately followed by a publish:

https://github.com/tuanpmt/esp32-mqtt/blob/da0e9769c979bf1bc91fd979f65431ac3b4536c2/main/app_main.c#L28-L33

I was monitoring the behaviour with Wireshark and noticed that although the SUBACK is returned by my mosquitto server, the client is not processing it and therefore not calling subscribe_cb(). I uncommented this line:

https://github.com/tuanpmt/espmqtt/blob/2967332b95454d4b53068a0d5484ae60e312eb12/mqtt.c#L425

Well, actually, the commented code was incorrect so I wrote this instead:

mqtt_info("msg_type %d, msg_id %d, pending_type %d, pending_id %d", msg_type, msg_id, client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);

When only the subscribe happens (the publish is removed), there is a SUBACK (9) received, with ID 1, and the pending message is a SUBSCRIBE (8), so the MQTT_MSG_TYPE_SUBACK case is executed and the subscribe_cb() is called.

[MQTT INFO] mqtt_start_receive_schedule
[MQTT INFO] Read len 5
[MQTT INFO] msg_type 9, msg_id 1, pending_type 8, pending_id 1
[MQTT INFO] Subscribe successful
I (4770) MQTT_SAMPLE: [APP] Subscribe ok, test publish msg

This successfully matches up the SUBACK (9) with the previous SUBSCRIBE (8) and fires the subscribe callback.

However if the publish happens immediately after the subscribe, it seems to overwrite the pending message, resulting in:

[MQTT INFO] mqtt_start_receive_schedule
[MQTT INFO] Read len 5
[MQTT INFO] msg_type 9, msg_id 1, pending_type 3, pending_id 0
[MQTT INFO] Read len 16
[MQTT INFO] msg_type 3, msg_id 0, pending_type 3, pending_id 0
[MQTT INFO] deliver_publish

The pending message's type is no longer SUBSCRIBE (8) but PUBLISH (3), so when the SUBACK (9) is received there is no pending SUBSCRIBE (8) to compare against and the subscribe_cb() callback is not executed.

Oddly I don't see any debug from mqtt_start_receive_schedule() indicating it received a SUBACK (8) in this case at all - where does it go? Wireshark confirms that it is delivered.

I need to check what the MQTT spec says about ignoring ACKs, however missing the subscribe_cb() as a result is surely a bug. I think the solution could be to maintain a list of pending messages, and then look for a corresponding match when processing ACKs to ensure the callbacks are all fired.

Alternatively, the API could be changed to support an asynchronous model where the API consumer is responsible for handling incoming future acknowledgements, which also allows the API consumer to handle timeouts and retries. However it would be nice if this library took care of that because the current API is nice and simple.

shirish47 commented 7 years ago

Hi I also face similar problem. I genereally raise a flag to show that scubscription is done if not I restart the process after some tries. subscribtion call back is not called. as you post above said not to immediately publish while subscribing. so I pushed my publish call to subscribe_cb. still I don't get subscription callback this use to work before.

this is my codes out put. (sorry code is complex now )

`
[MQTT INFO] Connected to MQTT broker, create sending thread before call connected callback
[MQTT INFO] mqtt_sending_task
I (7792) BYTELENSCLIENT: Connected to BROKER. 

I (7802) BYTELENSCLIENT: 

I (7802) BYTELENSCLIENT: /27/24:a:c4:1:1b:34/auth  <----- I first create topic strings( concate string etc)

I (7812) BYTELENSCLIENT: /27/24:a:c4:1:1b:34/authdone

I (7822) BYTELENSCLIENT: /37/24:a:c4:1:1b:34/list
(then I subscribe to topics, only 2 topics for now)
[MQTT INFO] Queue subscribe, topic"/27/24:a:c4:1:1b:34/authdone", id: 1
[MQTT INFO] Sending...50 bytes
[MQTT INFO] Queue subscribe, topic"/24:a:c4:1:1b:34/list", id: 2
[MQTT INFO] Sending...49 bytes
[MQTT INFO] mqtt_start_receive_schedule
[MQTT INFO] Read len 5
[MQTT INFO] Read len 50
[MQTT INFO] Queue response QoS: 1
[MQTT INFO] Sending...4 bytes
[MQTT INFO] deliver_publish
(no subscribe_cb called)
[MQTT INFO] Data received: 1/1 bytes 
I (8072) BYTELENSCLIENT: Data cb.
I (8082) BYTELENSCLIENT: Publish topic: /27/24:a:c4:1:1b:34/authdone 
[MQTT INFO] Read len 5
I (12892) wifi: pm start, type:0

[MQTT INFO] Read len 50
[MQTT INFO] deliver_publish
[MQTT INFO] Data received: 3/3 bytes 
I (19102) BYTELENSCLIENT: Data cb.
I (19102) BYTELENSCLIENT: Publish topic: /27/24:a:c4:1:1b:34/authdone <---- but I receive data when I publish from hivemq.com try demo
I (31872) GATTC_DEMO: Ticker: conn: 0 ,Noti: 0, scan: 0
I (31872) GATTC_DEMO: Ticker: wificon: 1, mqttcon: 1, mqttsub: 0
I (31872) GATTC_DEMO: Ticker: MQTT broker not Subscribed, retrying 0
I (62872) GATTC_DEMO: Ticker: conn: 0 ,Noti: 0, scan: 0
I (62872) GATTC_DEMO: Ticker: wificon: 1, mqttcon: 0, mqttsub: 0
I (62872) GATTC_DEMO: Ticker: MQTT broker not Connected, retrying 1
[MQTT INFO] Sending pingreq
[MQTT INFO] Read len 2
[MQTT INFO] MQTT_MSG_TYPE_PINGRESP
`

my subscribe_cb and

void subscribe_cb(void *self, void *params)
{
    ESP_LOGI(BCLIENT_TAG, "Subscribed to ");
    mqtt_client *client = (mqtt_client *)self;
    mqtt_event_data_t *event_data = (mqtt_event_data_t *)params;

    char *myID=getStrHubID();
    mqtt_publish(client,MQTT_CSV_AUTH,myID,strlen(myID),0,0);
    mqttSubscribed=true;
//    mqtt_publish(client, MQTT_TOPIC_INFO,str,strlen(str), 0, 0);
}

void connected_cb(void *self, void *params)
{
   ESP_LOGI(BCLIENT_TAG,"Connected to BROKER. \n")
   mqtt_client *client = (mqtt_client *)self;
   MQTTCLIENT=client;

   mqtt_event_data_t *event_data = (mqtt_event_data_t *)params;
  //char *str="I am ESP32.";

   Broker_Connected = true;
   mqttConnected=Broker_Connected;

  create_topics();
// first subscribe to all topics
mqtt_subscribe(client,MQTT_CSV_AUTHDONE,1);
mqtt_subscribe(client,MQTT_CSV_LIST,1);
 }

//and mqtt settings 
mqtt_settings settings = {
    .host = MQTT_HOST,
#if defined(CONFIG_MQTT_SECURITY_ON)
    .port = 8883, // encrypted
#else
    .port = 1883, // unencrypted
#endif
    .client_id = MQTT_CLIENT_ID,
    .username = MQTT_USER,
    .password = MQTT_PASS,
    .clean_session = 0,
    .keepalive = 120,
    .lwt_topic = MQTT_TOPIC_LWT,
    .lwt_msg = "offline",
    .lwt_qos = 0,
    .lwt_retain = 0,
    .connected_cb = connected_cb,
    .disconnected_cb = disconnected_cb,
 //   .reconnect_cb = reconnect_cb,
    .subscribe_cb = subscribe_cb,
    .publish_cb = publish_cb,
    .data_cb = data_cb
};