shamblett / mqtt_client

A server and browser based MQTT client for dart
Other
548 stars 176 forks source link

Process Not Terminating #344

Closed SankalpMe closed 2 years ago

SankalpMe commented 2 years ago

I am running this default code for aws_iot.dart example, but this code does not terminate after the main, I think there are some Isolates that are still running.

/*
 * Package : mqtt_client
 * Author : S. Hamblett <steve.hamblett@linux.com>
 * Date   : 10/07/2021
 * Copyright :  S.Hamblett
 *
 */

import 'dart:async';
import 'dart:io';
import 'dart:isolate';
import 'package:mqtt_client/mqtt_server_client.dart';
import 'package:mqtt_client/mqtt_client.dart';

/// An example of connecting to the AWS IoT Core MQTT broker and publishing to a devices topic.
/// More instructions can be found at https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html and
/// https://docs.aws.amazon.com/iot/latest/developerguide/protocols.html, please read this
/// before setting up and running this example.
Future<int> main() async {
  // Your AWS IoT Core endpoint url
  const url = '<id>-ats.iot.us-east-2.amazonaws.com';
  // AWS IoT MQTT default port
  const port = 8883;
  // The client id unique to your device
  const clientId = 'clientale';

  // Create the client
  final client = MqttServerClient.withPort(url, clientId, port);

  // Set secure
  client.secure = true;
  // Set Keep-Alive
  client.keepAlivePeriod = 20;
  // Set the protocol to V3.1.1 for AWS IoT Core, if you fail to do this you will receive a connect ack with the response code
  client.setProtocolV311();
  // logging if you wish
  client.logging(on: true);

  // Set the security context as you need, note this is the standard Dart SecurityContext class.
  // If this is incorrect the TLS handshake will abort and a Handshake exception will be raised,
  // no connect ack message will be received and the broker will disconnect.
  // For AWS IoT Core, we need to set the AWS Root CA, device cert & device private key
  // Note that for Flutter users the parameters above can be set in byte format rather than file paths
  final context = SecurityContext.defaultContext;
  context.setClientAuthorities('AmazonRootCA1.pem');
  context.useCertificateChain('certificate.pem.crt');
  context.usePrivateKey('private.pem.key');
  client.securityContext = context;

  // Setup the connection Message
  final connMess =
      MqttConnectMessage().withClientIdentifier(clientId).startClean();
  client.connectionMessage = connMess;

  // Connect the client
  try {
    print('MQTT client connecting to AWS IoT....');
    await client.connect();
  } on Exception catch (e) {
    print('MQTT client exception - $e');
    client.disconnect();
  }

  if (client.connectionStatus!.state == MqttConnectionState.connected) {
    print('MQTT client connected to AWS IoT');

    // Publish to a topic of your choice
    const topic = '/test/topic';
    final builder = MqttClientPayloadBuilder();
    builder.addString('Hello World');
    // Important: AWS IoT Core can only handle QOS of 0 or 1. QOS 2 (exactlyOnce) will fail!
    client.publishMessage(topic, MqttQos.atLeastOnce, builder.payload!);

    // Subscribe to the same topic
    client.subscribe(topic, MqttQos.atLeastOnce);
    // Print incoming messages from another client on this topic
    client.updates!.listen((List<MqttReceivedMessage<MqttMessage>> c) {
      final recMess = c[0].payload as MqttPublishMessage;
      final pt =
          MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
      print(
          'EXAMPLE::Change notification:: topic is <${c[0].topic}>, payload is <-- $pt -->');
      print('');
    });
  } else {
    print(
        'ERROR MQTT client connection failed - disconnecting, state is ${client.connectionStatus!.state}');
    client.disconnect();
  }

  print('Sleeping....');
  await MqttUtilities.asyncSleep(1);

  print('Disconnecting');
  client.disconnect();

  return 0;
}

Dart SDK version: 2.15.0 (stable) (Fri Dec 3 14:23:23 2021 +0100) on "macos_x64" mqtt_client: 9.6.3

cameronl commented 2 years ago

I see this also (have been meaning to open an issue), since version 9.5.0 .

wait-on-exit_942.log wait-on-exit_950.log wait_on_exit.dart.txt

I think the cause is in commit f53f863f512a82c7ee01c5841a7939443ecc2588

--- a/lib/src/mqtt_client.dart
+++ b/lib/src/mqtt_client.dart
@@ -446,6 +446,7 @@ class MqttClient {
    connectionHandler?.disconnect();
    disconnectOrigin = MqttDisconnectionOrigin.solicited;
    }
+    connectionHandler?.stopListening();
    publishingManager?.published.close();
    publishingManager = null;
    subscriptionsManager = null;

Because, MqttConnectionHandlerBase::disconnect() doesn't close any socket connection. It simply sends a disconnect message to the broker. And I guess waits for the server to close the socket...

Then when the client sees the socket onDone, it destroys the socket. So it's all been working up thru v9.4.2

But in 9.5.0, the change above stops listening to the socket so it never receives the socket onDone and so never destroys the socket.

A Fix?

I would expect connectionHandler.disconnect() to

  1. send a disconnect message
  2. destroy the socket
  3. stop listening
  4. call onDisconnected()

but currently, it only does 1.

connectionHandler.connection.disconnect() does 2, 3, 4.

But then some, but not all, of these are happening in MqttClient._disconnect() too.

shamblett commented 2 years ago

There are a few things here, firstly @SankalpMe the code example above is not the one from the package, the aws_iot.dart example does not include dart:isolate for instance. I can't really comment further on this.

@cameronl , you say

And I guess waits for the server to close the socket...

No, the client can't expect the server to do anything in response to a disconnect message as the spec doesn't require this, in fact the spec doesn't require a disconnect message be sent at all but we can play nice here so we do.

Taking a working example, run the mqtt_server_client.dart file in examples, it produces the following output (wait time set to 60 seconds for brevity) -

/home/steve/Development/google/dart/dart-sdk/bin/dart --enable-asserts /home/steve/Development/google/dart/projects/mqtt_client/example/mqtt_server_client.dart
EXAMPLE::Mosquitto client connecting....
EXAMPLE::OnConnected client callback - Client connection was sucessful
EXAMPLE::Mosquitto client connected
EXAMPLE::Subscribing to the test/lol topic
EXAMPLE::Subscribing to the Dart/Mqtt_client/testtopic topic
EXAMPLE::Publishing our topic
EXAMPLE::Sleeping....
EXAMPLE::Subscription confirmed for topic test/lol
EXAMPLE::Subscription confirmed for topic Dart/Mqtt_client/testtopic
EXAMPLE::Published notification:: topic is Dart/Mqtt_client/testtopic, with Qos MqttQos.exactlyOnce
EXAMPLE::Change notification:: topic is <Dart/Mqtt_client/testtopic>, payload is <-- Hello from mqtt_client -->

EXAMPLE::Ping response client callback invoked
EXAMPLE::Ping response client callback invoked
EXAMPLE::Ping response client callback invoked
EXAMPLE::Unsubscribing
EXAMPLE::OnDisconnected client callback - Client disconnection

Process finished with exit code 255

i.e it terminates correctly, if you comment out the exit(-1) line(175) in the onDisconnected callback -

/home/steve/Development/google/dart/dart-sdk/bin/dart --enable-asserts /home/steve/Development/google/dart/projects/mqtt_client/example/mqtt_server_client.dart
EXAMPLE::Mosquitto client connecting....
EXAMPLE::OnConnected client callback - Client connection was sucessful
EXAMPLE::Mosquitto client connected
EXAMPLE::Subscribing to the test/lol topic
EXAMPLE::Subscribing to the Dart/Mqtt_client/testtopic topic
EXAMPLE::Publishing our topic
EXAMPLE::Sleeping....
EXAMPLE::Subscription confirmed for topic test/lol
EXAMPLE::Subscription confirmed for topic Dart/Mqtt_client/testtopic
EXAMPLE::Published notification:: topic is Dart/Mqtt_client/testtopic, with Qos MqttQos.exactlyOnce
EXAMPLE::Change notification:: topic is <Dart/Mqtt_client/testtopic>, payload is <-- Hello from mqtt_client -->

EXAMPLE::Ping response client callback invoked
EXAMPLE::Ping response client callback invoked
EXAMPLE::Ping response client callback invoked
EXAMPLE::Unsubscribing
EXAMPLE::OnDisconnected client callback - Client disconnection
EXAMPLE::Disconnecting
EXAMPLE::OnDisconnected client callback - Client disconnection
EXAMPLE::OnDisconnected callback is solicited, this is correct

Process finished with exit code 0

still works OK

I can't test the aws_iot.dart example as I don't have an AWS project to test against, may be worth removing the isolate package inclusion and running the example with just the credentials updated.

cameronl commented 2 years ago

In both those logs, there is an unsolicited disconnection from the server after unsubscribe, but before we call disconnect(). So the server is disconnecting first.

EXAMPLE::Ping response client callback invoked
EXAMPLE::Ping response client callback invoked
EXAMPLE::Ping response client callback invoked
EXAMPLE::Unsubscribing
EXAMPLE::OnDisconnected client callback - Client disconnection     <-- unsolicited disconnection from server
EXAMPLE::Disconnecting
EXAMPLE::OnDisconnected client callback - Client disconnection
EXAMPLE::OnDisconnected callback is solicited, this is correct

If you don't call unsubscribe(), but do call disconnect() the process doesn't terminate.

Why does mosquitto disconnect on unsubscribe? I think new mosquitto is more strict. Here's the log from local mosquitto broker:

1640119537: mosquitto version 2.0.14 starting
1640119537: Config loaded from /usr/local/etc/mosquitto/mosquitto.conf.
1640119537: Starting in local only mode. Connections will only be possible from clients running on this machine.
1640119537: Create a configuration file which defines a listener to allow remote access.
1640119537: For more details see https://mosquitto.org/documentation/authentication-methods/
1640119537: Opening ipv4 listen socket on port 1883.
1640119537: Opening ipv6 listen socket on port 1883.
1640119537: mosquitto version 2.0.14 running
1640120120: New connection from ::1:57675 on port 1883.
1640120120: New connection from 127.0.0.1:57676 on port 1883.
1640120120: Client <unknown> closed its connection.
1640120120: New client connected from ::1:57675 as Mqtt_MyClientUniqueId (p1, c1, k20).
1640120120: Will message specified (15 bytes) (r0, q1).
1640120120:     willtopic
1640120120: Sending CONNACK to Mqtt_MyClientUniqueId (0, 0)
1640120120: Received SUBSCRIBE from Mqtt_MyClientUniqueId
1640120120:     test/lol (QoS 0)
1640120120: Mqtt_MyClientUniqueId 0 test/lol
1640120120: Sending SUBACK to Mqtt_MyClientUniqueId
1640120120: Received SUBSCRIBE from Mqtt_MyClientUniqueId
1640120120:     Dart/Mqtt_client/testtopic (QoS 2)
1640120120: Mqtt_MyClientUniqueId 2 Dart/Mqtt_client/testtopic
1640120120: Sending SUBACK to Mqtt_MyClientUniqueId
1640120120: Received PUBLISH from Mqtt_MyClientUniqueId (d0, q2, r0, m3, 'Dart/Mqtt_client/testtopic', ... (22 bytes))
1640120120: Sending PUBREC to Mqtt_MyClientUniqueId (m3, rc0)
1640120120: Received PUBREL from Mqtt_MyClientUniqueId (Mid: 3)
1640120120: Sending PUBLISH to Mqtt_MyClientUniqueId (d0, q2, r0, m1, 'Dart/Mqtt_client/testtopic', ... (22 bytes))
1640120120: Sending PUBCOMP to Mqtt_MyClientUniqueId (m3)
1640120120: Received PUBREC from Mqtt_MyClientUniqueId (Mid: 1)
1640120120: Sending PUBREL to Mqtt_MyClientUniqueId (m1)
1640120120: Received PUBCOMP from Mqtt_MyClientUniqueId (Mid: 1, RC:0)
1640120140: Received PINGREQ from Mqtt_MyClientUniqueId
1640120140: Sending PINGRESP to Mqtt_MyClientUniqueId
1640120160: Received PINGREQ from Mqtt_MyClientUniqueId
1640120160: Sending PINGRESP to Mqtt_MyClientUniqueId
1640120180: Received PINGREQ from Mqtt_MyClientUniqueId
1640120180: Sending PINGRESP to Mqtt_MyClientUniqueId
1640120180: Client Mqtt_MyClientUniqueId disconnected due to malformed packet.

I agree you don't want to depend on the server for a disconnect and sending disconnect message is just to be nice.

I think there is a bug in the client (atleast on server, don't know about browser websockets) that connection.disconnect() is never called inside client.disconnect().

SankalpMe commented 2 years ago

@shamblett Even after removing the "isolate import line" the results are the same.

shamblett commented 2 years ago

@cameronl , the unsubscribe error is this from the unsubscribe message for the writeTo code -

// If the protocol is V3.1.1 the following header fields
 // must be set as below as in this protocol they are reserved.

you need to set the client to use V3.1.1 of the protocol i.e use 'client.setProtocolV311();' . Mosquitto didn't need to do this originally but seems to now, thsi has cropped up in a few issues lately. Now I've updated the mqtt_server_client example I can see the problem, it doesn't terminate correctly. I'll have a look at this now.

@SankalpMe , yes thanks looking at this now.

shamblett commented 2 years ago

Should be fixed now, also updated the examples to correctly set the MQTT protocol for mosquitto and few other tweaks. Package re published at version 9.6.4.

SankalpMe commented 2 years ago

Yes the issue has been fixed.