shamblett / mqtt_client

A server and browser based MQTT client for dart
Other
545 stars 176 forks source link

Problems maintaining connection to AWS IoT MQTT broker: Connection was not upgraded to websocket #552

Open rpungin opened 1 month ago

rpungin commented 1 month ago

I have read similar issues like #246 and #348 but I can't get it fixed.

When I connect the instance of MqttServerClient for the first time, I see this in the log:

 -- MqttWsConnection::connect - WS URL is wss://xxx-ats.iot.us-east-1.amazonaws.com:443/mqtt?...

I can subscribe to topics and receive the messages with no problem. Everything works.

But if I call disconnect() on the instance of MqttServerClient and wait for several minutes and then call connect(), I see the same message in the log as above, but then I get this exception thrown in the connect() method of MqttServerClient:

WebSocketException: Connection to 'https://xxx-ats.iot.us-east-1.amazonaws.com/mqtt?... was not upgraded to websocket

So the question is: why did the protocol in the exception message change from wss to https and why is there no port number 443? Is this the reason 'Connection ... was not upgraded to websocket' SocketException is thrown?

I only have one single instance of MqttServerClient and it is configured as follows:

Future<MqttServerClient?> _createMqttServerClient(
      CognitoAuthSession cognitoAuthSession) async {
    final AWSResult userPoolTokensResult =
        cognitoAuthSession.userPoolTokensResult;
    if (userPoolTokensResult is! AWSSuccessResult) {
      _errorHandler
          .handle("Invalid userPoolTokensResult: $userPoolTokensResult");
      return null;
    }

    final credentialsResult = cognitoAuthSession.credentialsResult;
    if (credentialsResult is! AWSSuccessResult) {
      _logError("Invalid credentialsResult: $credentialsResult");
      return null;
    }
    final awsCredentials = credentialsResult.value;
    final accessKey = awsCredentials.accessKeyId;
    final secretKey = awsCredentials.secretAccessKey;
    final sessionToken = awsCredentials.sessionToken;
    if (sessionToken == null) {
      _logError("sessionToken is null");
      return null;
    }

    final identityId = DeviceInfo().deviceId;

    // Transform the url into a Websocket url using SigV4 signing
    String signedUrl = _getWebSocketURL(
        accessKey: accessKey,
        secretKey: secretKey,
        sessionToken: sessionToken,
        region: _region,
        scheme: _scheme,
        endpoint: _baseUrl,
        urlPath: _urlPath);

    // Create the client with the signed url
    MqttServerClient mqttServerClient = MqttServerClient.withPort(
        signedUrl, identityId, _port,
        maxConnectionAttempts: 2);

    // Set the protocol to V3.1.1 for AWS IoT Core, if you fail to do this you will not receive a connect ack with the response code
    mqttServerClient.setProtocolV311();
    mqttServerClient.logging(
        on: SettingsRepository().getBool(MqttLoggingSetting()));
    mqttServerClient.useWebSocket = true;
    mqttServerClient.secure = false;
    mqttServerClient.autoReconnect =
        SettingsRepository().getBool(MqttAutoReconnectSetting());
    mqttServerClient.disconnectOnNoResponsePeriod = 90;
    //mqttServerClient.keepAlivePeriod = 30;
    mqttServerClient.connectionMessage =
        MqttConnectMessage().withClientIdentifier(identityId);
    mqttServerClient.websocketProtocols =
        MqttClientConstants.protocolsSingleDefault;
    mqttServerClient.onDisconnected = () {};
    mqttServerClient.onConnected = () {
      _logDebug("Connected");
      _connectionStatusStreamController.add(PubSubConnectionStatus.connected);
      _listenToUpdates(mqttServerClient);
      onConnected?.call();
    };
    mqttServerClient.onSubscribeFail = (topic) {
      _logError("Failed subscribing to topic $topic");
    };
    mqttServerClient.onSubscribed = (topic) {
      _logDebug("Subscribed to topic $topic");
      _subscribedTopics.add(topic);
    };
    mqttServerClient.onUnsubscribed = (topic) {
      _logDebug("Unsubscribed from topic $topic");
      _subscribedTopics.remove(topic);
    };
    mqttServerClient.onBadCertificate = (certificate) {
      _logError("Bad certificate $certificate");
      return true;
    };
    mqttServerClient.onAutoReconnect = () {
      _logDebug("AutoReconnecting");
      _connectionStatusStreamController
          .add(PubSubConnectionStatus.autoReconnecting);
    };
    mqttServerClient.onAutoReconnected = () {
      _logDebug("AutoReconnected");
      _connectionStatusStreamController.add(PubSubConnectionStatus.connected);
    };
    return mqttServerClient;
  }

I am using mqtt_client package version ^10.2.1.

I get this exception with autoConnect flag being set to both true and false.

If the autoConnect flag is true, I also have a problem of the MqttServerClient instance get stuc in autoConnecting status when the app comes back from the background after being there for several minutes. Something tells me those two problems are related.

rpungin commented 1 month ago

This is the complete MqttServerClient log before the error occurs:


flutter: 670-2024-07-19 00:42:04.625513 -- SynchronousMqttServerConnectionHandler::internalConnect entered
flutter: 670-2024-07-19 00:42:04.625954 -- SynchronousMqttServerConnectionHandler::internalConnect - initiating connection try 0, auto reconnect in progress false
flutter: 670-2024-07-19 00:42:04.626204 -- SynchronousMqttServerConnectionHandler::internalConnect - websocket selected
flutter: 670-2024-07-19 00:42:04.626541 -- SynchronousMqttServerConnectionHandler::internalConnect - calling connect
flutter: 670-2024-07-19 00:42:04.627027 -- MqttWsConnection::connect - entered
flutter: 670-2024-07-19 00:42:04.631646 -- MqttWsConnection::connect - WS URL is wss://xxx-ats.iot.us-east-1.amazonaws.com:443/mqtt?X-Amz-Date=20240719T041326Z&X-Amz-SignedHeaders=host&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=ASIA2UC3DTIQOAI57XN6%2F20240719%2Fus-east-1%2Fiotdevicegateway%2Faws4_request&X-Amz-Expires=3600&X-Amz-Signature=xxx
flutter: 670-2024-07-19 00:42:25.813971 -- MqttConnectionBase::_onError - calling disconnected callback
shamblett commented 1 month ago

I'll have a look at your disconnect/reconnect problem, however in the meantime examine your AWS server logs to see what reason, if any, your broker gives for failing the connection.

These two problems are not related, there are many, many closed issues on the effect on the client of going into/coming out of background on android, please search the closed issues for how to handle this.

shamblett commented 1 month ago

This simple test works fine -

import 'dart:io';

import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';

int connectionNumber = 1;

Future<void> main() async {
  final client = MqttServerClient('wss://test.mosquitto.org', 'SJH-TEST');
  client.useWebSocket = true;
  client.port = 8081; // ( or whatever your ws port is)
  client.websocketProtocols = MqttClientConstants.protocolsSingleDefault;
  client.keepAlivePeriod = 60;
  client.onConnected = onConnected;
  client.onDisconnected = onDisconnected;
  client.logging(on: false);
  client.setProtocolV311();

  print('EXAMPLE::first connection');
  try {
    await client.connect();
  } on NoConnectionException catch (e) {
    print(e.toString());
  }

  if (client.connectionStatus!.state == MqttConnectionState.connected) {
    print('EXAMPLE::Mosquitto wss client connected');
  } else {
    print(
        'EXAMPLE::ERROR Mosquitto client connection $connectionNumber failed - '
        'disconnecting, status is ${client.connectionStatus}');
    client.disconnect();
    exit(-1);
  }

  print('EXAMPLE::sleeping for 60 seconds...');
  await MqttUtilities.asyncSleep(60);

  print('EXAMPLE::disconnecting');
  client.disconnect();

  print('');
  print('EXAMPLE::second connection');
  connectionNumber++;
  try {
    await client.connect();
  } on NoConnectionException catch (e) {
    print(e.toString());
  }

  if (client.connectionStatus!.state == MqttConnectionState.connected) {
    print('EXAMPLE::Mosquitto wss client connected');
  } else {
    print(
        'EXAMPLE::ERROR Mosquitto client connection $connectionNumber failed - '
        'disconnecting, status is ${client.connectionStatus}');
    client.disconnect();
    exit(-1);
  }

  print('EXAMPLE::sleeping for 60 seconds...');
  await MqttUtilities.asyncSleep(60);

  print('EXAMPLE::disconnecting');
  client.disconnect();

  print('');
  print('EXAMPLE::end of test');
}

void onDisconnected() {
  print('EXAMPLE::Disconnected, number is $connectionNumber');
}

void onConnected() {
  print('EXAMPLE::Connected, number is $connectionNumber');
}

giving the following output -

EXAMPLE::first connection
EXAMPLE::Connected, number is 1
EXAMPLE::Mosquitto wss client connected
EXAMPLE::sleeping for 60 seconds...
EXAMPLE::disconnecting
EXAMPLE::Disconnected, number is 1

EXAMPLE::second connection
EXAMPLE::Connected, number is 2
EXAMPLE::Mosquitto wss client connected
EXAMPLE::sleeping for 60 seconds...
EXAMPLE::disconnecting
EXAMPLE::Disconnected, number is 2

EXAMPLE::end of test

So clearly there is something happening environmentally at your end in between the disconnect and reconnect. Are you staying in foreground all the time in this period or are you transitioning from foreground to background and maybe staying in background?

Is your network still in the same state as it was on the disconnect, i.e. have you switched wifi on/off in this period.

You can determine the status of the network and the foreground/background status by standard android platform events as exposed to you by flutter, again, search the closed issues in this area.

rpungin commented 1 month ago

@shamblett , Thank you for your response, test code and the follow up questions!

Are you staying in foreground all the time in this period or are you transitioning from foreground to background and maybe staying in background?

When I get Connection was not upgraded to websocket error, I stay in foreground. I have a button in my app to call connect() and disconnect() on the instance of MqttServerClient. Sometimes it reconnects with no problem, but if I stay disconnected for a longer period of time, I get that error.

Is your network still in the same state as it was on the disconnect, i.e. have you switched wifi on/off in this period.

Yes it is the same. I am running in an iOS simulator and it is using the Macbook's WiFi connection which is stable.

I traced the code and this is where the error occurs (line 62 of mqtt_client_mqtt_server_ws_connection.dart):

Screenshot 2024-07-19 at 9 28 30 AM
rpungin commented 1 month ago

I did more digging and see why the exception message has the URL with https protocol as opposed to wss that is in the original URL passed to it. Line 1014 of websocket_impl.dart in the flutter SDK does the conversion:

Screenshot 2024-07-19 at 10 14 44 AM

I am not networking expect, but I guess for web socket connection the underlying protocol is still HTTP.

I am still wondering why the port number is not shown in the URL of the exception message. It is set on line 1017...

shamblett commented 1 month ago

Yes I understand but this question is not for me, as you have correctly identified the exception is raised solely by the flutter runtime and passed to the client, there's nothing the client can do about this.

As you have ruled out any environmental effects at your end then your only recourse is to examine your broker logs to see why the AWS broker is upset, the client can't help you anymore here.

Maybe when you connect the second time you have to change something in the connection string sort of like making each connection attempt unique in its own right, maybe the AWS docs are another place to look or even other AWS users.

rpungin commented 1 month ago

@shamblett , I stumbled upon this documentation from Apple and here is a quote:

Once your app goes into the background, it may be suspended. Once it is suspended, it's unable to properly process incoming connections on the listening socket. However, the socket is still active as far as the kernel is concerned. If a client connects to the socket, the kernel will accept the connection but your app won't communicate over it. Eventually the client will give up, but that might take a while. Thus, it's better to close the listening socket when going into the background, which will cause incoming connections to be immediately rejected by the kernel

What is the recommended way to make sure the socket is properly closed when the app goes to the background and recreated when the app resumes? Does autoReconnect functionality property handle this?

This was not upgraded to a websocket does not occur consistently. It occurs totally randomly when the app goes into the background.

I now try to call disconnect() when the app goes to background and connect() when the app gets back from the background, yet I still get that error SOMETIMES!

rpungin commented 1 month ago

I followed your advice and looked at the AWS CloudWatch IoT Core logs. I see the successful connection, disconnection and subscription events. But when I get that Connection was not upgraded to a websocket error, I see nothing in the logs. I supposed that means that the server has never made the connection.

shamblett commented 1 month ago

Your doing nothing wrong, do not use autoreconnect for foreground/background transitions its not intended for situations that are known about and can be handled by the user by calling disconnect/connect appropriately as you are doing.

OK, so in the case of failure the is no connection being established to AWS, this rules out any broker issues and puts the problem firmly back on your platform.

The client does the same thing for a disconnect every time its called, it doesn't vary, it just calls standard Dart API socket methods, if your saying these work sometime and not others then this is a runtime issue. There is no recommended way of making sure a socket is correctly closed that I know of using Dart, if the close() method for instance doesn't work for any reason there's nothing the client can do about it. Note that the socket is re allocated on every connect -

 Future<MqttClientConnectionStatus?> connect(String server, int port) {
...
 // Connect and save the socket.
      WebSocket.connect(uriString,
              protocols: protocols.isNotEmpty ? protocols : null,
              headers: headers)
          .then((socket) {
        client = socket;
...

so its not that the previous socket is being re used and is hung somehow.

You need to start looking at what exactly is happening at the network level, I'm a linux guy so you need to examine whatever your platform equivalents of dmsg, messages.log, journal, audit.log etc. to find out.

rpungin commented 1 month ago

@shamblett , I still could not figure out what causes the error.

My co-worker recommended checking the "clean session" flag on the MQTT connection. I saw this flag in the Kotlin implementation that @quanganh050998 posted here. But I could not find it in your Dart implementation. What is the value of that flag in your implementation?

More info on this flag here: https://www.emqx.com/en/blog/mqtt-session

shamblett commented 1 month ago

Just simply go to the API documentation for the package and type 'session' into the search box, this gives you the results you need to see how to use this, you can also see its usage in the examples -

/// Create a connection message to use or use the default one. The default one sets the
  /// client identifier, any supplied username/password and clean session,
  /// an example of a specific one below.
  final connMess = MqttConnectMessage()
      .withClientIdentifier('Mqtt_MyClientUniqueId')
      .withWillTopic('willtopic') // If you set this you must set a will message
      .withWillMessage('My Will message')
      .startClean() // Non persistent session for testing
      .withWillQos(MqttQos.atLeastOnce);
  print('EXAMPLE::Mosquitto client connecting....');
  client.connectionMessage = connMess;

I don't think this is going to help at all, this is for session/message/management of the broker, you are not connecting to your broker at the socket level as you have proved earlier.

erindolson commented 4 days ago

HI @rpungin, were you able to figure this out? I am running into a very similar thing if not the same.