bertmelis / espMqttClient

MQTT 3.1.1 client library for the Espressif devices ESP8266 and ESP32 on the Arduino framework.
https://www.emelis.net/espMqttClient/
MIT License
92 stars 21 forks source link

Connection stucks in second phase of a connect request #150

Closed yippieyaray closed 1 month ago

yippieyaray commented 2 months ago

Ever since I started using OpenDTU, I have been troubled by a annoying reconnect problem, that I found described on several forums.

I've analyzed and corrected it in my temp branch of async-mqtt-client in the module MqttClient.cpp used by OpenDTU.

Regarding the cause: OpenDTU initiates a connect request via an external mqtt client software to the MQTT server (e.g., ioBroker). It expects either a successful response via onMqttConnect-Callback (_onConnectCallback) or, through onMqttDisconnect-Callback (_onDisconnectCallback), an indication that the connection was interrupted or the connection attempt failed. In the case of an onMqttDisconnect call, a new attempt to connect is initiated by OpenDTU via mqtt client. If the call to onMqttDisconnect is absent, no reconnect occurs.

The problem now, however, is that the mqtt client code seems to has a bug in the connection setup. Under certain circumstances, the method MqttClient::loop() may stuck in the second phase of the connection setup if the mqtt server e.g. ioBroker allows a socket connect but unfortunately does not send back a connect-acknowledge packet in situation like server shutdown or restart.

The faulty code in MqttClient.cpp:

void MqttClient::loop() { 
  switch (_state) {
    case State::connectingTcp1:
      ...
      ...
      if (_useIp ? _transport->connect(_ip, _port) : _transport->connect(_host, _port)) {
        _setState(State::connectingTcp2);   <---- connection successful; set _state to connectingTcp2
      } else {
        _setState(State::disconnectingTcp1);
        _disconnectReason = DisconnectReason::TCP_DISCONNECTED;
        break;
      }
      // Falling through to speed up connecting on blocking transport 'connect' implementations
      [[fallthrough]];
    case State::connectingTcp2:
      if (_transport->connected()) {    <---- connected is false until the ioBroker sends a connect-acknowledge-packet
        _parser.reset();
        _setState(State::connectingMqtt);  <---- ioBroker does not send an acknowledge, the _state will never set to next step connectingMqtt
      }
      break;

Here are my modifications:

void MqttClient::loop() {
  switch (_state) {
    case State::disconnected:
      #if defined(ARDUINO_ARCH_ESP32)
      if (_useInternalTask == espMqttClientTypes::UseInternalTask::YES) {
        vTaskSuspend(_taskHandle);
      }
      #endif
      break;
    case State::connectingTcp1:
      if (_useIp ? _transport->connect(_ip, _port) : _transport->connect(_host, _port)) {
        _setState(State::connectingTcp2);
        _lastClientActivity = millis();           // <---- set connect time
      } else {
        _setState(State::disconnectingTcp1);
        _disconnectReason = DisconnectReason::TCP_DISCONNECTED;
        break;
      }
      // Falling through to speed up connecting on blocking transport 'connect' implementations
      [[fallthrough]];
    case State::connectingTcp2:
      if (_transport->connected()) {
        _parser.reset();
        _lastClientActivity = _lastServerActivity = millis();
        _setState(State::connectingMqtt);
      }  else if (millis() - _lastClientActivity > _keepAlive) {  //  <---- new else block; use _keepAlive 15 seconds as timeout here 
        _setState(State::disconnectingTcp1);                         // <---- force an disconnect sequence that calls onMqttDisconnect which starts a new connect request
        _disconnectReason = DisconnectReason::TCP_DISCONNECTED;
      }
      break;

Regards!

bertmelis commented 2 months ago

if (_transport->connected()) { <---- connected is false until the ioBroker sends a connect-acknowledge-packet

This checks the TCP connection status, not the MQTT connection. Since you're falling through from State::connectingTcp1 this check should be redundant in case of the non-async version. The async version keeps checking until it can move on.

You are using the async version? Do you happen to have a debug log?

PS AsyncTCP has a nasty limitation. It it not possible to set the connection timeout. So if the broker is not responding, you will have to sit out the hardcoded timeout which is quite long.

yippieyaray commented 2 months ago

Yes, I know it does not depend on the MQTT connection. It is the connected status of the specific implementation of the communication channel (espMqttClientInternals::Transport* _transport). The issue is independent of the case statement's fall-through. As long as the method _transport->connected() returns FALSE, the case State::connectingTcp1 is repeatedly called over the loop() method, and it cannot exit the state since connected() continuously returns FALSE.

Example for the code of ARDUINO_ARCH_ESP32 (WiFi), _transport->connected():

arduinoespresif32 - WiFiClient.cpp:

uint8_t WiFiClient::connected()
{
    if (_connected) {
        uint8_t dummy;
        int res = recv(fd(), &dummy, 0, MSG_DONTWAIT);
        // avoid unused var warning by gcc
        (void)res;
        // recv only sets errno if res is <= 0
        if (res <= 0){
          switch (errno) {
              case EWOULDBLOCK:
              case ENOENT: //caused by vfs
                  _connected = true;
                  break;
              case ENOTCONN:
              case EPIPE:
              case ECONNRESET:
              case ECONNREFUSED:
              case ECONNABORTED:
                  _connected = false;
                  log_d("Disconnected: RES: %d, ERR: %d", res, errno);
                  break;
              default:
                  log_i("Unexpected: RES: %d, ERR: %d", res, errno);
                  _connected = true;
                  break;
          }
        } else {
          _connected = true;
        }
    }
    return _connected;
}  

If the socket-connection runs into problems between the socket connect and the subsequent call to connected(), the call to recv() will always call the cases where _connected is set to false. That means MqttClient::loop() keeps “looping” over the case State::connectingTcp1.

Regards!

bertmelis commented 2 months ago

I see. Tricky network situation...

You are using the sync (regular) version then? Async works a bit different by setting the state directly when it disconnects (after the timeout defined in lwip).

Your solution might work but I'm unsure about the async versions. Let me think about it.

yippieyaray commented 2 months ago

yes, I am using the regular version.

tbnobody commented 2 months ago

yes, I am using the regular version.

But please keep in mind, that OpenDTU uses the AsyncTCP approach

yippieyaray commented 2 months ago

But please keep in mind, that OpenDTU uses the AsyncTCP approach

Yes, you are right. But to drill down the problem, I used my own small little test.

I had previously located this issue through trace outputs in OpenDTU. Since implementing the timeout correction suggestion, the error has not occurred in my version of OpenDTU for quite some time.

bertmelis commented 2 months ago

Maybe this works too: (brainstorming here, completely untested)

    case State::connectingTcp2:
      if (_transport->connected()) {
        _parser.reset();
        _lastClientActivity = _lastServerActivity = millis();
        _setState(State::connectingMqtt);
      }  else if (_client->disconnected()) {  // sync: implemented as "not connected"; async: depending on state of pcb in underlying lib
        _setState(State::disconnectingTcp1);
        _disconnectReason = DisconnectReason::TCP_DISCONNECTED;
      }
      break;

There is no point in having a timeout here. For the sync version, the timeout is already in the connect function. Timeout for async is handled in the underlying lib.

yippieyaray commented 2 months ago

Hi! I guess you mean _transport->disconnected() instead of _client->disconnected()

yippieyaray commented 2 months ago

For a swift response, I've integrated your code changes into my OpenDTU setup and have started and killed my ioBroker Docker many times. So far, your solution seems to be working as well. I'll monitor the solution for a while.

However, a combination of both approaches, disconnected() plus a timeout query, could also be a solution to ensure safety.

    case State::connectingTcp1:
      if (_useIp ? _transport->connect(_ip, _port) : _transport->connect(_host, _port)) {
        _setState(State::connectingTcp2);
        _lastClientActivity = millis();
      } else {
        _setState(State::disconnectingTcp1);
        _disconnectReason = DisconnectReason::TCP_DISCONNECTED;
        break;
      }
      // Falling through to speed up connecting on blocking transport 'connect' implementations
      [[fallthrough]];
    case State::connectingTcp2:
      if (_transport->connected()) {
        _parser.reset();
        _lastClientActivity = _lastServerActivity = millis();
        _setState(State::connectingMqtt);
      } else if ((millis() - _lastClientActivity > _keepAlive) && (_transport->disconnected())) {
        _setState(State::disconnectingTcp1);
        _disconnectReason = DisconnectReason::TCP_DISCONNECTED;
      }
      break;
bertmelis commented 2 months ago

That might break async.

Mind that in the Transport implementation client.disconnected() is implemented as !client.connected(). simple as that. Async can have a state in between. Sync does not so there is no point if waiting for a timeout.

yippieyaray commented 2 months ago

"Mind that in the Transport implementation client.disconnected() is implemented as !client.connected(). simple as that."

I hope I’m not misunderstanding your concerns. Indeed, that’s why I suggested adding an extra time period before disconnected() is triggered. There’s an AND operation in the else-if clause, meaning the time must elapse before disconnected() is called/relevant. In other words, both conditions must be true before the disconnection process starts. This approach makes it safer with such straightforward connected/disconnected implementations.

Regards!

bertmelis commented 2 months ago

It will work, but the extra timeout is not needed. It will just take longer to move on in case of a failed connection.

sync version: transport->connect is a blocking call with a timeout. If the result is true, the transport->connected() is true so we fall through. all is well. If in between these states connected() would become false, disconnected() will return true. There is no automatic reconnect so once the connection is down, it stays down. Also, in case of a half-open connection, it will get detected in the next state. async version: no blocking calls and anything can happen between calls. we rely on the callbacks of the async lib to update the state of the FSM. The transport can be "not connected" and "not disconnecting" at the same time (eg. "busy connecting"). Adding a timeout here might cause strange behaviour and is a nightmare to test thouroughly.

yippieyaray commented 2 months ago

The behavior heavily depends on the implementation of the Transport interface class. In the Async version, future code changes could lead to a situation where connected() is equivalent to !disconnected(), and the case statement, through fall-through, might immediately proceed to disconnect. Even if it’s a brief moment that stabilizes in a few milliseconds, a timeout would be beneficial. The timeout isn't problematic, especially since we now face a situation where it could loop indefinitely.

From my personal experience, I’ve learned that one should not rely on the behavior of interface methods unless they are explicitly described.

But I don't mean to interfere. It’s entirely up to you whether and how you want to address this issue.

By the way, OpenDTU runs stable with the current modifications until now.

Regards!

yippieyaray commented 1 month ago

Hello, For your information, OpenDTU runs stable with the current modifications until now. It always reconnects to ioBroker without any problem.

Regards!