mochi-mqtt / server

The fully compliant, embeddable high-performance Go MQTT v5 server for IoT, smarthome, and pubsub
MIT License
1.21k stars 211 forks source link

An Issue with Client Connection in Dart Language 一个非常奇怪的bug #331

Closed OpenJarvisAI closed 1 month ago

OpenJarvisAI commented 10 months ago

Hello,最近使用 flutter的client,连接服务。用的是mqtt5_client这个lib。

服务端报错:

time=2023-11-20T22:05:39.822+08:00 level=INFO msg=unsubscribed hook=my-custom-events-hook client=android_client_iDxic_usrItug3Lj2c5 filters=[]
time=2023-11-20T22:05:39.822+08:00 level=WARN msg="" listener=t1 error=EOF
time=2023-11-20T22:05:42.144+08:00 level=WARN msg="client keepalive is below minimum recommended value and may exhibit connection instability" client="" keepalive=0 recommended=5
time=2023-11-20T22:05:42.144+08:00 level=INFO msg="--> client connected" hook=my-custom-events-hook client=android_client_iDxic_usrItug3Lj2c5
time=2023-11-20T22:05:42.147+08:00 level=WARN msg="client keepalive is below minimum recommended value and may exhibit connection instability" client="" keepalive=0 recommended=5
time=2023-11-20T22:05:42.147+08:00 level=INFO msg="--> client connected" hook=my-custom-events-hook client=android_client_iDxic_usrItug3Lj2c5
time=2023-11-20T22:05:42.147+08:00 level=INFO msg="--> client disconnected" hook=my-custom-events-hook client=android_client_iDxic_usrItug3Lj2c5 expire=true error="read tcp 172.18.171.245:1883->113.89.11.200:20164: use of closed network connection"
time=2023-11-20T22:05:42.147+08:00 level=WARN msg="" listener=t1 error="read tcp 172.18.171.245:1883->113.89.11.200:20164: use of closed network connection"

表现时,一连接上,立马就断开。

现在不知道是客户端还是服务端的问题? 是什么原因导致的呢?

werbenhu commented 10 months ago

你将客户端的keepalive参数设置长一点,比如2分钟

OpenJarvisAI commented 10 months ago

我用的是mqtt5_client,我实际上已经设置了keepAlive,但是这个库的作者说你不用设置这玩意,我现在有点懵逼了,不知道是broker的问题还是client的问题。

https://github.com/shamblett/mqtt5_client/issues/70

OpenJarvisAI commented 10 months ago

变现就是本来都已经连上了(实际上我的auth是全部通过),然后从flutter的log来看,是连上了然后又不知道什么原因断开了,原因未知,很奇怪

werbenhu commented 10 months ago

client keepalive is below minimum recommended value and may exhibit connection instability 从服务端的日志来看,你客户端传上来的就是小于5秒,那个日志才会打出来。你不会是设置成2秒吧,那个单位是秒哦,不是分钟,2分钟就是120才对。

IshanDaga commented 10 months ago

Could also be a problem of creating multiple connections using the same client_id / client_name information - this was the an issue in my environment. This could also be the client attempting to connect after already establishing a connection / opening a new connection without closing an existing one

Suggestions for flutter:

werbenhu commented 10 months ago

你用的什么客户端,将简化代码贴出来我可以看看

OpenJarvisAI commented 10 months ago

@werbenhu 非常感谢大佬分析和回答。

我用的是这个库:mqtt5_client. flutter.

这个是我的简化connect代码:

@override
  Future<bool> connect(
      {required host,
      required String username,
      required String password,
      String? clientId,
      int? port}) async {
    if (!connectLock) {
      String cid = clientId ?? getClientId()!;
      _clientId = cid;
      if (useWebsocket) {
        host = "ws://" + host;
      }
      _client =
          // MqttServerClient.withPort(host, cid, 1883, maxConnectionAttempts: 3);
          MqttServerClient.withPort("test.mosquitto.org", cid, 1883,
              maxConnectionAttempts: 3);
      kLog(
          '!!!! ------------> [conn] connect with $host, $port, $username, $password, ${_clientId} $_client');

      _client!.logging(on: true);
      // _client!.logging(on: false);
      _client!.onConnected = onConnected;
      _client!.onDisconnected = onDisconnected;
      _client!.onUnsubscribed = onUnsubscribed;
      _client!.onSubscribed = onSubscribed;
      _client!.onSubscribeFail = onSubscribeFail;
      _client!.onAutoReconnect = onAutoReconnect;
      _client!.pongCallback = pong;
      _client!.keepAlivePeriod = 60 * 3;
      _client!.autoReconnect = false;
      _client!.useWebSocket = useWebsocket;

      final connMessage = MqttConnectMessage()
          .authenticateAs(username, password)
          // .withWillTopic('lastwills')
          // .withWillMessage('Will message')
          .withClientIdentifier(cid);
      // .withProtocolName("MQTT")
      // .withProtocolVersion(4)
      // .withSessionExpiryInterval(interval)
      // .withWillQos(MqttQos.atLeastOnce);
      _client!.connectionMessage = connMessage;

      connectLock = true;
      try {
        //_messagesController =  StreamController.broadcast();
        _broadcastConnectionState();
        await _client!.connect();
        _broadcastConnectionState();
        // only after connected?
        _subscribeToArchivesTopics();
        _listenAndFilter();

        connectLock = false;
        return true;
      } catch (e) {
        kLog('Exception: $e');
        if (_client != null) _client!.disconnect();

        connectLock = false;
        return false;
      }
    } else {
      kLog("!!!!xxx connect is locked ! do not repeat connect");
      return false;
    }
  }

不知道大佬能否看懂dart代码,但逻辑上应该差不多。

我目前的问题是两个:

  1. 上面说的时间,是keepAlive吗这里的?我也不太确定这个库的作者有没有真的把这个值传上去,但是服务端没有收到(0),实际上已经设置;
  2. 至今不知道为什么会connect两次,我改成了mosquitto的server,也是一样,似乎不是服务器的问题。而是客户端的问题,但是完全不知道为什么。我应该没有重复connect。
werbenhu commented 10 months ago

用这个.keepAliveFor(120), 连接两次是不是因为keepalive太短,导致服务端断开客户端连接后,客户端又自动重连了.

You can use .keepAliveFor(120) to set the keep-alive. The reason for two connections might be that the client's keep-alive setting is too short, causing the server to disconnect the client, and then the client automatically reconnects?

final connMessage = MqttConnectMessage()
          .authenticateAs(username, password)
          .keepAliveFor(120)
          .withClientIdentifier(cid);
      _client!.connectionMessage = connMessage;
OpenJarvisAI commented 10 months ago

@werbenhu hi, what's the properly keepalive time for a chating mobile app?

using 120 means every 2 miniutes it needs to re-connect if no packet send.

Will it consume many battery life?

what's this value set to normally ?

werbenhu commented 10 months ago

About keep alive, you can refer to MQTT spec 3.1.2.10 Keep Alive.

The line _client!.keepAlivePeriod = 60 * 3; here sets the interval for client pings. If you have set your client's keep-alive time to 120s, you might also need to set this to a value less than or equal to 120s.

werbenhu commented 9 months ago

@OpenJarvisAI Has there been any progress on this issue? Can it be closed now?

OpenJarvisAI commented 7 months ago

@werbenhu 这个问题至今仍未解决,我现在服务端的log大概是这样:

4-02-07T17:07:11.066+08:00 level=INFO msg="--> client connected [auth]" hook=my-auth-hook client=android_client_6g6jh_usrHQ3w4bIjN7
INFO[880882] >>>>>[v2] login, username: 15x1xxx23x0, client: android_client_6g6jh_usrHQ3w4bIjN7
INFO[880882] ------> publish IdArchives: archivesmyid/android_client_6g6jh_usrHQ3w4bIjN7
time=2024-02-07T17:07:11.072+08:00 level=INFO msg="--> received from inline client " hook=my-hook topic=archivesmyid/android_client_6g6jh_usrHQ3w4bIjN7
time=2024-02-07T17:07:11.072+08:00 level=INFO msg="--> published to inline client" hook=my-hook
time=2024-02-07T17:07:11.072+08:00 level=INFO msg="--> client disconnected" hook=my-hook client=android_client_6g6jh_usrHQ3w4bIjN7 expire=false error="read tcp 172.18.171.245:1883->113.89.10.97:44377: use of closed network connection"
time=2024-02-07T17:07:11.073+08:00 level=WARN msg="" listener=t1 error="read tcp 172.18.171.245:1883->113.89.10.97:44377: use of closed network connection"
time=2024-02-07T17:07:11.083+08:00 level=INFO msg="subscribed qos=[1]" hook=my-hook client=android_client_6g6jh_usrHQ3w4bIjN7 filters="[{ShareName:[] Filter:archivesrooms/android_client_6g6jh_usrHQ3w4bIjN7 Identifier:0 Identifiers:map[] RetainHandling:0 Qos:1 RetainAsPublished:true NoLocal:false FwdRetainedFlag:false}]"
time=2024-02-07T17:07:11.092+08:00 level=INFO msg="subscribed qos=[1]" hook=my-hook client=android_client_6g6jh_usrHQ3w4bIjN7 filters="[{ShareName:[] Filter:archivesmessages/android_client_6g6jh_usrHQ3w4bIjN7 Identifier:0 Identifiers:map[] RetainHandling:0 Qos:1 RetainAsPublished:true NoLocal:false FwdRetainedFlag:false}]"
time=2024-02-07T17:07:11.092+08:00 level=INFO msg="subscribed qos=[1]" hook=my-hook client=android_client_6g6jh_usrHQ3w4bIjN7 filters="[{ShareName:[] Filter:archivesrooms/new/android_client_6g6jh_usrHQ3w4bIjN7 Identifier:0 Identifiers:map[] RetainHandling:0 Qos:1 RetainAsPublished:true NoLocal:false FwdRetainedFlag:false}]"
time=2024-02-07T17:07:11.092+08:00 level=INFO msg="subscribed qos=[1]" hook=my-hook client=android_client_6g6jh_usrHQ3w4bIjN7 filters="[{ShareName:[] Filter:personalmyid/android_client_6g6jh_usrHQ3w4bIjN7 Identifier:0 Identifiers:map[] RetainHandling:0 Qos:1 RetainAsPublished:true NoLocal:false FwdRetainedFlag:false}]"
time=2024-02-07T17:07:11.101+08:00 level=INFO msg="--> client connected" hook=my-hook client=android_client_6g6jh_usrHQ3w4bIjN7
time=2024-02-07T17:07:11.101+08:00 level=INFO msg="--> client connected [auth]" hook=my-auth-hook client=android_client_6g6jh_usrHQ3w4bIjN7
INFO[880882] >>>>>[v2] login, username: 15116123160, client: android_client_6g6jh_usrHQ3w4bIjN7
INFO[880882] ------> publish IdArchives: archivesmyid/android_client_6g6jh_usrHQ3w4bIjN7
time=2024-02-07T17:07:11.107+08:00 level=INFO msg="--> received from inline client " hook=my-hook topic=archivesmyid/android_client_6g6jh_usrHQ3w4bIjN7
time=2024-02-07T17:07:11.107+08:00 level=INFO msg="--> published to inline client" hook=my-hook
time=2024-02-07T17:07:11.108+08:00 level=INFO msg="--> client disconnected" hook=my-hook client=android_client_6g6jh_usrHQ3w4bIjN7 expire=false error="read tcp 172.18.171.245:1883->113.89.10.97:44378: use of closed network connection"
time=2024-02-07T17:07:11.108+08:00 level=WARN msg="" listener=t1 error="read tcp 172.18.171.245:1883->113.89.10.97:44378: use of closed network connection"

就是一连接,立马就断开。

@IshanDaga 感觉跟这个用户的现象是一致的。

也就是说,我这边用同样的clientId去创建了连接,但是之前的,服务端并不知道掉线了。

这个就比较蛋疼,客户端好像没有明确的把disconnect信息,传递给服务端。

但这也不能说是客户端的库有问题, 这个问题导致的原因是因为 手机休眠机制。

实际上应用退到后台(不是退出),我估计被系统杀死了,没有一句话发出去。 然后回到前台,由于客户端开启了自动连接,他就自动连接了。但是不管你怎么连接,你的判断都是错的,因为你掉线了,但是服务器并不知道,于是无论如何都要重练,也无论如何都会被服务器 先连接上然后断开。

基于此,想请问各位有什么办法么?

werbenhu commented 7 months ago

但是不管你怎么连接,你的判断都是错的,因为你掉线了,但是服务器并不知道,于是无论如何都要重练,也无论如何都会被服务器 先连接上然后断开。

这个流程有问题吗?没有问题啊!你之前的连接还没有超时(keepalive时间内)服务器将它看成连接还在线,这时候你又来一个连接(同一个客户端ID),服务端要断开之前的连接,然后保存新连接,没问题啊。

OnDisconnect那里可以判断是否是重连的,这个代码就是:cl.StopCause() == packets.ErrSessionTakenOver ,你可以参考下这里的代码

// OnDisconnect removes a client from the store if they were using a clean session.
func (h *Hook) OnDisconnect(cl *mqtt.Client, _ error, expire bool) {
    if h.db == nil {
        h.Log.Error("", "error", storage.ErrDBFileNotOpen)
        return
    }

    if !expire {
        return
    }

    // 如果是重连,不清除数据库里的连接数据
    if cl.StopCause() == packets.ErrSessionTakenOver {
        return
    }

    err := h.db.HDel(h.ctx, h.hKey(storage.ClientKey), clientKey(cl)).Err()
    if err != nil {
        h.Log.Error("failed to delete client", "error", err, "id", clientKey(cl))
    }
}
werbenhu commented 7 months ago

@OpenJarvisAI 客户端主动关闭连接的服务端是可以马上知道的,因为客户端给服务端发送了一个disconnect或者直接关闭了连接,但是如果客户端是拔网线,关机类似这种情况,服务端只能通过keepalive心跳包来自己判断,你可以随便找一个PC mqtt客户端,连上后拔掉网线,你可以看看服务端是不是 1.5 * keepalive后才会触发disconnect。这些都在mqtt标准协议里面有说到的。

IshanDaga commented 7 months ago

@OpenJarvisAI one approach here could be to run your MQTT Client in a flutter isolate (background process), to ensure that it keeps running despite the app being put to sleep by the phone. Some docs are available here.

Ofcouse, as @werbenhu suggested, you could also have a very short keepalive setting, where the client must send out heartbeats every 1-2 seconds, failing which the connection is automatically closed, and the client must be responsible to open a new connection using the onDisconnect hook on the client side library.

OpenJarvisAI commented 7 months ago

@werbenhu 对的,这个逻辑服务端没有问题。

我思考再三,觉得本质上还是拔网线了(相当于),没有及时告知服务器。

@IshanDaga isolate 之前试过,好像也不行,有可以work的例子吗? 因为整个mqttlclient 有一些数据库的操作,跑isolate代码改动较大。

我尝试一下吧keepAlive 改为1s试一下。 但是这样的话,会不会增加手机功耗

OpenJarvisAI commented 7 months ago

我试了一下吧keepAlive 改为 1s 不行, 感觉还是mqtt_client这个client写的有问题,或者是使用有问题。 我如果监听app是否切换到了后台,主动disconnect,然后回来再connect,好像就没有问题。但是这样,又导致调用image picker的时候会有问题。 无法发图片了

OpenJarvisAI commented 7 months ago

为什么客户端设置了keepAliveFor 1s ,客户端会1s中掉线一次?

  _client!.logging(on: false);
      _client!.onConnected = onConnected;
      _client!.onDisconnected = onDisconnected;
      _client!.onUnsubscribed = onUnsubscribed;
      _client!.onSubscribed = onSubscribed;
      _client!.onSubscribeFail = onSubscribeFail;
      _client!.onAutoReconnect = onAutoReconnect;
      _client!.onAutoReconnected = onAutoReconnected;
      _client!.pongCallback = pong;
      _client!.keepAlivePeriod =
          1; // short enough in case client can not notify server offline
      _client!.autoReconnect = true;
      _client!.useWebSocket = useWebsocket;

      _host = host;
      _userName = username;
      _password = password;

      final connMessage = MqttConnectMessage()
          .authenticateAs(username, password)
          .withSessionExpiryInterval(3600 * 24 * 14) // last for 7 days
          .keepAliveFor(1)
          // .withWillTopic('lastwills')
          // .withWillMessage('Will message')
          .withClientIdentifier(cid);
      // .withProtocolName("MQTT")
      // .withProtocolVersion(4)
      // .withSessionExpiryInterval(interval)
      // .withWillQos(MqttQos.atLeastOnce);
      _client!.connectionMessage = connMessage;

有人知道吗

werbenhu commented 7 months ago

@OpenJarvisAI keepalive推荐至少60s以上

werbenhu commented 7 months ago

用这个.keepAliveFor(120), 连接两次是不是因为keepalive太短,导致服务端断开客户端连接后,客户端又自动重连了.

You can use .keepAliveFor(120) to set the keep-alive. The reason for two connections might be that the client's keep-alive setting is too short, causing the server to disconnect the client, and then the client automatically reconnects?

final connMessage = MqttConnectMessage()
          .authenticateAs(username, password)
          .keepAliveFor(120)
          .withClientIdentifier(cid);
      _client!.connectionMessage = connMessage;

你看看这个。

OpenJarvisAI commented 7 months ago

@werbenhu 大佬,我有点看不懂了,刚才不是说,不如把时间设置成1s吗?我之前就是设置的很大,依旧会有这个问题,刚才设置了1s,也是一样。

迷茫了啊

OpenJarvisAI commented 7 months ago

现在问题并不是拔网线,我把数据链接关掉,然后再打开,是不会出现重复连接问题的。

神奇就神奇在这里。但是偶尔退到后台,多一段时间会重复连接

werbenhu commented 7 months ago

我记得我之前帮你查过这个客户端的代码:keepAliveFor(120),这个才是设置keepalive。你按我这个配置来: keepAliveFor(120)

client!.keepAlivePeriod = 60, 你换成这两个配置试试看。

你要先搞清楚keepalive是干嘛的, keepalive是服务端保存客户端存活时间的,也就是如果keepalive是60秒,服务端如果60秒没有收到客户端的心跳包就认为客户端掉线了,你觉得设置成1秒是合理的吗?

OpenJarvisAI commented 7 months ago

@werbenhu 我咋感觉这个指标单传给服务器是不合理的,客户端他必然得根据这个值法心跳,不然设置个蛋。

我现在改成了 120,最新代码:

_client!.logging(on: false);
      _client!.onConnected = onConnected;
      _client!.onDisconnected = onDisconnected;
      _client!.onUnsubscribed = onUnsubscribed;
      _client!.onSubscribed = onSubscribed;
      _client!.onSubscribeFail = onSubscribeFail;
      _client!.onAutoReconnect = onAutoReconnect;
      _client!.onAutoReconnected = onAutoReconnected;
      _client!.pongCallback = pong;
      // _client!.keepAlivePeriod =
      //     1; // short enough in case client can not notify server offline
      _client!.autoReconnect = true;
      _client!.useWebSocket = useWebsocket;

      _host = host;
      _userName = username;
      _password = password;

      final connMessage = MqttConnectMessage()
          .authenticateAs(username, password)
          .withSessionExpiryInterval(3600 * 24 * 14) // last for 7 days
          .keepAliveFor(120)
          // .withWillTopic('lastwills')
          // .withWillMessage('Will message')
          .withClientIdentifier(cid);

还是一如既往的,推到主屏幕,隔一段时间(超过2min),再打开app,就会一直重复连接。

这次我彻底迷茫了啊。我日了。

我感觉本质原因还是,回来的时候,客户端其实已经断了,被系统杀死了,服务器不知道,而AutoReconnect还是沿用的原来的连接,就肯定被服务器断开,说 用了一个已经断开的连接

我已经在flutter app里面,监控了appLifeCycle,在detach的时候手动的disconnect,无奈,没吊用。

可有破解之法啊

werbenhu commented 7 months ago

_client!.keepAlivePeriod = 60 ,这个就是那个客户端的心跳间隔时间。就是它,我之前稍微看了下那个客户端的代码。你吧这个也带上去。心跳间隔时间最少也应该是一个keepalive。这里设置成60秒,相当于一个keepalive之内,客户端会发送两个心跳。

你可以在服务端加点日志看看,server.go里处理心跳包的地方加个日志看看,看看客户端有没有发送心跳。

// processPingreq processes a Pingreq packet.
func (s *Server) processPingreq(cl *Client, _ packets.Packet) error {
    return cl.WritePacket(packets.Packet{
        FixedHeader: packets.FixedHeader{
            Type: packets.Pingresp, // [MQTT-3.12.4-1]
        },
    })
}

另外从你的日志来看,服务端似乎没有断开这个连接,而是客户端自己重连导致的,如果心跳还在维持,客户端不应该重新连接才对的。

werbenhu commented 1 month ago

As the issue is non-reproducible and no feedback has been received for a long time, it is now closed. If further problems arise, please reopen it.