shamblett / mqtt_client

A server and browser based MQTT client for dart
Other
543 stars 175 forks source link

Unhandled Exception: mqtt-client::NoConnectionException: MqttWs2Connection::TLS connection unexpectedly closed #541

Closed quanganh050998 closed 1 week ago

quanganh050998 commented 2 weeks ago

Hello! First off, thank you for making such an epic library!

I'm having some issues when trying to use the MqttServerClient. The default example works for me.

Description:

I'm able to access my WSS server using another MQTT client, but for some reason it doesn't work when I'm using dart.

I had code project android, it works perfectly fine:

  fun connect(context: Context, isReInit: Boolean = false) {
        PSLog.info("$TAG => connect isReInit:$isReInit")
        if (isConnected() && !isReInit) {
            PSLog.info("$TAG <= connect isConnected clientId: ${mqttClient?.clientId}")
        } else {
            val serverURI = Config.getPriceV3SocketUrl()
            val clientId =
                "${Config.getAppType()}_android_${Config.getUsername()}_${System.currentTimeMillis()}"
            mqttClient = MqttAndroidClient(context, serverURI, clientId)
            val mqttConnectOptions = MqttConnectOptions()
            mqttConnectOptions.isAutomaticReconnect = true
            mqttConnectOptions.isCleanSession = true
            mqttConnectOptions.password = "xxxx".toCharArray()
            mqttConnectOptions.userName = "xxxx"
            initListener(context)
            try {
                PSLog.info("$TAG => connect clientId: $clientId")
                mqttClient?.connect(mqttConnectOptions, null, object : IMqttActionListener {
                    override fun onSuccess(asyncActionToken: IMqttToken) {
                        PSLog.info("$TAG <= connect onSuccess clientId: ${asyncActionToken.client?.clientId}")
                    }

                    override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                        PSLog.error("$TAG <= connect onFailure", exception)
                        EventPublisher.getInstance().emitErrorEvent(
                            "connect onFailure: $exception ${exception?.message} ${exception?.cause}",
                            Exception(exception)
                        )
                    }
                })
            } catch (e: MqttException) {
                PSLog.error("$TAG <= connect MqttException", e)
                EventPublisher.getInstance().emitErrorEvent("MqttException: ${e.message}", e)
            }
        }
    }

However, when I try to implement the same functionality using Flutter, I encounter issues with the WebSocket connection. Specifically, the following code results in an error:

Future<void> connect({bool isReInit = false}) async {
    final serverURI = "wss://pricexxxxx.xxxx.com.vn"; //Config.getPriceSocketUrl();
    final clientId =
        "${Config.getAppType()}_android_quanganh_${DateTime.now().millisecondsSinceEpoch}";
    mqttClient = MqttServerClient.withPort(serverURI, clientId, 443);
    mqttClient?.useWebSocket = true;
    mqttClient?.port = 443;
    mqttClient?.useAlternateWebSocketImplementation = true;
    mqttClient?.setProtocolV311();
    mqttClient?.keepAlivePeriod = 60;
    mqttClient?.onConnected = onConnected;
    mqttClient?.onDisconnected = onDisconnected;
    mqttClient?.pongCallback = pong;
    final connMess = MqttConnectMessage()
        .authenticateAs('xxxx', 'xxxx')
        .withClientIdentifier(clientId)
        .startClean();
    mqttClient?.connectionMessage = connMess;
    try {
      Logger().i("$TAG => connect clientId: $clientId");
      await mqttClient?.connect();
      // print('EXAMPLE:data:$data');
      if (mqttClient?.connectionStatus!.state ==
          MqttConnectionState.connected) {
        print('EXAMPLE::Mosquitto client connected');
      } else {
        /// Use status here rather than state if you also want the broker return code.
        print(
            'EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, status is ${mqttClient?.connectionStatus}');
      }
    } catch (e) {
      Logger().e("$TAG <= connect error", error: e);
      disconnect();
    }
  }

Error:

The following error is encountered when trying to establish the WebSocket connection:

[ERROR:flutter/runtime/dart_vm_initializer.cc(41)] Unhandled Exception: mqtt-client::NoConnectionException: MqttWs2Connection::TLS connection unexpectedly closed
#0      MqttServerWs2Connection._performWSHandshake.<anonymous closure> (package:mqtt_client/src/connectionhandling/server/mqtt_client_mqtt_server_ws2_connection.dart:270:7)
#1      _RootZone.runGuarded (dart:async/zone.dart:1582:10)
#2      _BufferingStreamSubscription._sendDone.sendDone (dart:async/stream_impl.dart:392:13)
#3      _BufferingStreamSubscription._sendDone (dart:async/stream_impl.dart:402:7)
#4      _BufferingStreamSubscription._close (dart:async/stream_impl.dart:291:7)
#5      _SyncStreamControllerDispatch._sendDone (dart:async/stream_controller.dart:792:19)
#6      _StreamController._closeUnchecked (dart:async/stream_controller.dart:647:7)
#7      _StreamController.close (dart:async/stream_controller.dart:640:5)
#8      _Socket._onData (dart:io-patch/socket_patch.dart:2454:21)
#9      _RootZone.runUnaryGuarded (dart:async/zo<…>

I dont find code check TLS in MqttServerClient in flutter. With code ios i have code to connect success:

mqtt5 = CocoaMQTT5(clientID: clientID, host: PsConfig.shared.getPriceHostV3Url(), port: 443, socket: websocket)
            let connectProperties = MqttConnectProperties()
            connectProperties.topicAliasMaximum = 0
            connectProperties.sessionExpiryInterval = 0
            connectProperties.receiveMaximum = 100
            connectProperties.maximumPacketSize = 500
            mqtt5!.cleanSession = true
            mqtt5!.connectProperties = connectProperties
            mqtt5!.enableSSL = true
shamblett commented 2 weeks ago

You seem to be using the alternate web socket implementation -

 mqttClient?.useAlternateWebSocketImplementation = true;

Don't do this, its for a specific usage only, usually with AWS brokers, it defaults to false, take this line out and see if that helps.

quanganh050998 commented 2 weeks ago

i removed mqttClient?.useAlternateWebSocketImplementation = true; it return other error

flutter: \^[[38;5;196m│ WebSocketException: Connection to 'https://pricexxxxx.xxxxx.xxx.vn#' was not upgraded to websocket<…>
flutter: \^[[38;5;196m├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄<…>
flutter: \^[[38;5;196m│ #0   EMQXManager.connect (package:test_example_mqtt/mqtt_manager.dart:73:16)<…>
flutter: \^[[38;5;196m│ #1   <asynchronous suspension><…>
flutter: \^[[38;5;196m├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄<…>
flutter: \^[[38;5;196m│ ⛔ EMQXManager <= connect error<…>

I thinks it auto change 'wss://' -> 'https://'

shamblett commented 1 week ago

The client doesn't automatically change anything, if its not your code then its the Dart/flutter runtime.

You say on your first post ' The default example works for me.', so just substitute your wss url(and change the port if needed) in the example and run it, see what happens, if it doesn't work then its at your broker end as you have already said it works with mosquito.

One thing to try is the websocketProtocolString field of the client, read API for this, you may have to set it to [].

quanganh050998 commented 1 week ago

I understand your point, but in my code, I see the host is set to 'wss://', whereas the error message indicates it is being converted to 'https://'. I'm curious about why that is happening. Below is my code:

Future<void> connect({bool isReInit = false}) async {
    Config.configPs();
    final serverURI = "wss://pricexxxxxx"; //Config.getPriceSocketUrl();
    final clientId =
        "${Config.getAppType()}_android_quanganh_${DateTime.now().millisecondsSinceEpoch}";
    mqttClient = MqttServerClient.withPort(serverURI, clientId, 443);
    mqttClient?.useWebSocket = true;
    mqttClient?.port = 443;
    mqttClient?.useAlternateWebSocketImplementation = false;
    mqttClient?.websocketProtocolString = [];
    mqttClient?.keepAlivePeriod = 60;
    mqttClient?.onConnected = onConnected;
    mqttClient?.onDisconnected = onDisconnected;
    mqttClient?.pongCallback = pong;
    final connMess = MqttConnectMessage()
        .authenticateAs('xxxx', 'xxxx')
        .withClientIdentifier(clientId)
        .startClean();
    mqttClient?.connectionMessage = connMess;
    try {
      Logger().i("$TAG => connect clientId: $clientId");
      await mqttClient?.connect();
      // print('EXAMPLE:data:$data');
      if (mqttClient?.connectionStatus!.state ==
          MqttConnectionState.connected) {
        print('EXAMPLE::Mosquitto client connected');
      } else {
        /// Use status here rather than state if you also want the broker return code.
        print(
            'EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, status is ${mqttClient?.connectionStatus}');
      }
    } catch (e) {
      Logger().e("$TAG <= connect error", error: e);
      disconnect();
    }
  }

Then message error:

WebSocketException: Connection to 'https://price-xxxxxxxx.xx#' was not upgraded to websocket<…>
flutter: \^[[38;5;196m├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄<…>
flutter: \^[[38;5;196m│ #0   EMQXManager.connect (package:test_example_mqtt/mqtt_manager.dart:72:16)<…>
flutter: \^[[38;5;196m│ #1   <asynchronous suspension><…>
flutter: \^[[38;5;196m├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄<…>
flutter: \^[[38;5;196m│ ⛔ EMQXManager <= connect error<…>
flutter: \^[[38;5;196m└───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────<…>
flutter: \^[[38;5;12m┌───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────<…>
flutter: \^[[38;5;12m│ #0   EMQXManager.onDisconnected (package:test_example_mqtt/mqtt_manager.dart:181:14)<…>
flutter: \^[[38;5;12m│ #1   MqttClient._disconnect (package:mqtt_client/src/mqtt_client.dart:502:22)<…>
flutter: \^[[38;5;12m├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄<…>
flutter: \^[[38;5;12m│ 💡 EMQXManager <= connectionLost<…>
shamblett commented 1 week ago

This has been seen before and is nothing to do with the client, search the closed issues.

Try the example with your wss url and the protocol settings change I've suggested above, post back what happens.

Ultimately your only recourse will be to examine your broker logs to see exactly why it is refusing the connection, only it knows this, not the client.

quanganh050998 commented 1 week ago

I tried suggested above, but it only return error:

WebSocketException: Connection to 'https://price-xxxxxxxx.xx#' was not upgraded to websocket<…>
flutter: \^[[38;5;196m├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄<…>
flutter: \^[[38;5;196m│ #0   EMQXManager.connect (package:test_example_mqtt/mqtt_manager.dart:72:16)<…>
flutter: \^[[38;5;196m│ #1   <asynchronous suspension><…>
flutter: \^[[38;5;196m├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄<…>
flutter: \^[[38;5;196m│ ⛔ EMQXManager <= connect error<…>
flutter: \^[[38;5;196m└───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────<…>
flutter: \^[[38;5;12m┌───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────<…>
flutter: \^[[38;5;12m│ #0   EMQXManager.onDisconnected (package:test_example_mqtt/mqtt_manager.dart:181:14)<…>
flutter: \^[[38;5;12m│ #1   MqttClient._disconnect (package:mqtt_client/src/mqtt_client.dart:502:22)<…>
flutter: \^[[38;5;12m├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄<…>
flutter: \^[[38;5;12m│ 💡 EMQXManager <= connectionLost<…>

I think the reason for the exception converting the connect URL to 'https://xxxxxx#' might be because the web also uses the same connect link, and it return the old link

image
shamblett commented 1 week ago

I'm not sure what that means or what that image is of. What is the 'old' link?

So are you saying the example fails with your wss URL? If so could you supply a client log please, without it I can't go any further.

quanganh050998 commented 1 week ago

I dont understand what u wrote. It console client, it is above, and if u want to log in web, sorry it is private project

shamblett commented 1 week ago

As I've said above please run the example that worked with mosquito with your wss URL and post me a log from the client, i.e. turn on logging on the client with 'client.logging(on: true);'.

I'm only interested in what the client is seeing, not anything else you may be doing. You don't need any other code to simply run the mqtt_server_client_websocket_secure example with your wss URL.

quanganh050998 commented 1 week ago

It was log below:

2-2024-07-04 17:17:18.784281 -- MqttClient::connect - Connection timeout period is 5000 milliseconds
flutter: 2-2024-07-04 17:17:18.784917 -- MqttClient::connect - keep alive is enabled with a value of 60 seconds
flutter: 2-2024-07-04 17:17:18.785264 -- MqttConnectionKeepAlive:: Initialised with a keep alive value of 60 seconds
flutter: 2-2024-07-04 17:17:18.785434 -- MqttConnectionKeepAlive:: Disconnect on no ping response is disabled
flutter: 2-2024-07-04 17:17:18.785772 -- MqttConnectionHandlerBase::connect - server wss://price-streaming.vndirect.com.vn, port 443
flutter: 2-2024-07-04 17:17:18.785927 -- SynchronousMqttServerConnectionHandler::internalConnect entered
flutter: 2-2024-07-04 17:17:18.786113 -- SynchronousMqttServerConnectionHandler::internalConnect - initiating connection try 0, auto reconnect in progress false
flutter: 2-2024-07-04 17:17:18.786287 -- SynchronousMqttServerConnectionHandler::internalConnect - websocket selected
flutter: 2-2024-07-04 17:17:18.786462 -- SynchronousMqttServerConnectionHandler::internalConnect - calling connect
flutter: 2-2024-07-04 17:17:18.787266 -- MqttWsConnection::connect - entered
flutter: 2-2024-07-04 17:17:18.787630 -- MqttWsConnection::connect - WS URL is wss://pricexxxx.xxxx.xx:443, protocols are [mqtt, mqttv3.1, mqttv3.11]
flutter: 2-2024-07-04 17:17:18.930968 -- MqttConnectionBase::_onError - calling disconnected callback
flutter: \^[[38;5;196m┌───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────<…>
flutter: \^[[38;5;196m│ WebSocketException: Connection to 'https://price-streaming.vndirect.com.vn#' was not upgraded to websocket<…>
flutter: \^[[38;5;196m├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄<…>
flutter: \^[[38;5;196m│ #0   EMQXManager.connect (package:test_example_mqtt/mqtt_manager.dart:73:16)<…>
flutter: \^[[38;5;196m│ #1   <asynchronous suspension><…>
flutter: \^[[38;5;196m├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄<…>
flutter: \^[[38;5;196m│ ⛔ EMQXManager <= connect error<…>
flutter: \^[[38;5;196m└───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────<…>
flutter: 2-2024-07-04 17:17:18.933254 -- MqttConnectionHandlerBase::disconnect - entered
flutter: 2-2024-07-04 17:17:18.933417 -- MqttConnectionHandlerBase::_performConnectionDisconnect entered
flutter: 2-2024-07-04 17:17:18.934108 -- MqttConnectionKeepAlive::stop - stopping keep alive
flutter: \^[[38;5;12m┌───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────<…>
flutter: \^[[38;5;12m│ #0   EMQXManager.onDisconnected (package:test_example_mqtt/mqtt_manager.dart:182:14)<…>
flutter: \^[[38;5;12m│ #1   MqttClient._disconnect (package:mqtt_client/src/mqtt_client.dart:502:22)<…>
flutter: \^[[38;5;12m├┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄┄<…>
flutter: \^[[38;5;12m│ 💡 EMQXManager <= connectionLost<…>
flutter: \^[[38;5;12m└───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────<…>
flutter: 2-2024-07-04 17:17:59.202763 -- MqttConnectionKeepAlive::pingRequired
flutter: 2-2024-07-04 17:17:59.203867 -- MqttConnectionKeepAlive::pingRequired - NOT sending ping - not connected
flutter: 2-2024-07-04 17:17:59.204179 -- MqttConnectionKeepAlive::pingRequired - restarting ping timer
shamblett commented 1 week ago

OK so have you tried changing the web socket protocol string I mentioned above? If none of this works you need to examine your broker logs to see why it is refusing the connection, the client can't tell you this.