nodemcu / nodemcu-firmware

Lua based interactive firmware for ESP8266, ESP8285 and ESP32
https://nodemcu.readthedocs.io
MIT License
7.64k stars 3.12k forks source link

Multiple MQTT Bugs… & Possible fixes #3445

Open chathurangawijetunge opened 3 years ago

chathurangawijetunge commented 3 years ago

By now we all know that there are some bugs in the MQTT module. These are what I found.

1.) If there is no internet connection all publish will return True and pub time out will not fire. 2.) If no internet mqtt tx buffer will keep increasing until running out of heap. 3.) If no internet mqtt:close() will not work.

Possible fixes:

All changes are in between //---------------------SCW--------------------------------------------------
//-----------------------------------------------------------------------------

1.)

void mqtt_socket_timer(void *arg)
{
  NODE_DBG("enter mqtt_socket_timer.\n");
  lmqtt_userdata *mud = (lmqtt_userdata*) arg;

  if(mud == NULL)
    return;

  if(mud->pesp_conn.proto.tcp == NULL){
    NODE_DBG("MQTT not connected\n");
    return;
  }

  NODE_DBG("timer, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));

  if(mud->connState == MQTT_CONNECT_SENDING){ // MQTT_CONNECT send time out.
    NODE_DBG("sSend MQTT_CONNECT failed.\n");
    mud->connState = MQTT_INIT;
    mqtt_socket_do_disconnect(mud);
    mqtt_connack_fail(mud, MQTT_CONN_FAIL_TIMEOUT_SENDING);
  } else if(mud->connState == MQTT_CONNECT_SENT) { // wait for CONACK time out.
    NODE_DBG("MQTT_CONNECT timeout.\n");
    mud->connState = MQTT_INIT;
    mqtt_socket_do_disconnect(mud);
    mqtt_connack_fail(mud, MQTT_CONN_FAIL_TIMEOUT_RECEIVING);
  } else if(mud->connState == MQTT_DATA){
    if(!msg_peek(&(mud->mqtt_state.pending_msg_q))) {
      // no queued event.
      if (mud->keepalive_sent) {
        // Oh dear -- keepalive timer expired and still no ack of previous message
        mqtt_socket_do_disconnect(mud);
      } else {
        uint8_t temp_buffer[MQTT_BUF_SIZE];
        mqtt_message_buffer_t msgb;
        mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE);

        NODE_DBG("\r\nMQTT: Send keepalive packet\r\n");
        mqtt_message_t* temp_msg = mqtt_msg_pingreq(&msgb);
        msg_queue_t *node = msg_enqueue( &(mud->mqtt_state.pending_msg_q), temp_msg,
                            0, MQTT_MSG_TYPE_PINGREQ, (int)mqtt_get_qos(temp_msg->data) );
        mud->keepalive_sent = 1;
        mqtt_send_if_possible(mud);
      }
    }
//---------------------SCW--------------------------------------------------    
    else if(mud != NULL){  
      NODE_DBG("*** SCW MQTT_PUBLISH timeout.");
      mud->connState = MQTT_INIT;
      mqtt_socket_do_disconnect(mud);
      mqtt_connack_fail(mud, MQTT_CONN_FAIL_TIMEOUT_SENDING);    
    }
//--------------------------------------------------------------------------    
  }
  NODE_DBG("leave mqtt_socket_timer.\n");
}

2.)

static int mqtt_socket_publish( lua_State* L )
{
  NODE_DBG("enter mqtt_socket_publish.\n");
  lmqtt_userdata *mud;
  size_t l;
  uint8_t stack = 1;
  uint16_t msg_id = 0;

  mud = (lmqtt_userdata *)luaL_checkudata(L, stack, "mqtt.socket");
  stack++;
//---------------------SCW-----limet max beffer to 5------------------------
  if (msg_size(&(mud->mqtt_state.pending_msg_q))>5){
    return luaL_error( L, "*** SCW MQTT Tx buffer full." );
  }
//--------------------------------------------------------------------------
  if(!mud->connected){
    return luaL_error( L, "not connected" );
  }

  const char *topic = luaL_checklstring( L, stack, &l );
  stack ++;
  if (topic == NULL){
    return luaL_error( L, "need topic" );
  }

  const char *payload = luaL_checklstring( L, stack, &l );
  stack ++;
  uint8_t qos = luaL_checkinteger( L, stack);
  stack ++;
  uint8_t retain = luaL_checkinteger( L, stack);
  stack ++;

  if (qos != 0) {
    msg_id = mqtt_next_message_id(mud);
  }

  uint8_t temp_buffer[MQTT_BUF_SIZE];
  mqtt_message_buffer_t msgb;
  mqtt_msg_init(&msgb, temp_buffer, MQTT_BUF_SIZE);
  mqtt_message_t *temp_msg = mqtt_msg_publish(&msgb,
                       topic, payload, l,
                       qos, retain,
                       msg_id);

  if (lua_isfunction(L, stack)){
    lua_pushvalue(L, stack);  // copy argument (func) to the top of stack
    luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_puback_ref);
    mud->cb_puback_ref = luaL_ref(L, LUA_REGISTRYINDEX);
  }

  msg_queue_t *node = msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
                      msg_id, MQTT_MSG_TYPE_PUBLISH, (int)qos );

  sint8 espconn_status = ESPCONN_OK;

  espconn_status = mqtt_send_if_possible(mud);

  if(!node || espconn_status != ESPCONN_OK){
    lua_pushboolean(L, 0);
  } else {
    lua_pushboolean(L, 1);  // enqueued succeed.
  }

  NODE_DBG("publish, queue size: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)));
  NODE_DBG("leave mqtt_socket_publish.\n");
  return 1;
}

3.)

This new part will close client without informing the broker (use full if no internet)

//--------------------------------------------------------------------------
//mqtt:off() with out sending deconect message
//---------------------SCW--------------------------------------------------
static int mqtt_socket_off( lua_State* L )
{
  lmqtt_userdata *mud = NULL;
  mud = (lmqtt_userdata *)luaL_checkudata(L, 1, "mqtt.socket");
  NODE_DBG("SCW mqtt_socket_off %u\n",mud->connected);
  if (mud->connected) {
     mqtt_socket_do_disconnect(mud);
     NODE_DBG("SCW mqtt_socket_off done \n");
     lua_pushboolean(L, 1);
  }
  else{
     lua_pushboolean(L, 0);
  }  
  return 1;
}

Any professional advice…..?

chathurangawijetunge commented 2 years ago

any updated or fixes..?

chathurangawijetunge commented 1 year ago

Hi Guys any news on fixing this matter