shamblett / mqtt5_client

A server and browser based MQTT 5 client for dart
Other
50 stars 26 forks source link

While using a persistent session and when client is offline and the messages published are not received #28

Closed fredrick-foodyguru closed 1 year ago

fredrick-foodyguru commented 2 years ago

Use case

Issue Once the subscriber turns online from offline after initial connections are established, subscriber doesn't get all the published messages while it was offline.

Here is the code


init() async {
client = MqttServerClient.withPort(hostName, 'test2', 1883);
    client.port = 1883;
    client.secure = false;
    client.logging(on: false);
    client.autoReconnect = true;
    client.resubscribeOnAutoReconnect = false;
    client.keepAlivePeriod = 3600;
    client.onAutoReconnected = onAutoReconnected;
    client.onDisconnected = onDisconnected;
    client.onSubscribed = onSubscribed;
    client.onUnsubscribed = onUnSubscribed;

    final connMess = MqttConnectMessage()
        .withClientIdentifier('test2');
    print('EXAMPLE::Mosquitto client connecting....');
    client.connectionMessage = connMess;

    try {
      await client.connect();

    } on Exception catch (e) {
      print('EXAMPLE::client exception - $e');
      client.disconnect();
    }

    if (client.connectionStatus!.state == MqttConnectionState.connected) {
      print('EXAMPLE::Mosquitto client connected');
    } else {
      print(
          'EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, state is ${client.connectionStatus!.state}');
      client.disconnect();

    }

    print('EXAMPLE:: <<<< SUBCRIBE 1 >>>>');
    const topic1 = 'demo/'; 

    client.subscribe(topic1, MqttQos.atLeastOnce);

    client.updates.listen((dynamic c) {
      final MqttPublishMessage recMess = c[0].payload;
      final pt = MqttUtilities.bytesToStringAsString(recMess.payload.message!);
      print(
          'EXAMPLE::Change notification:: topic is <${c[0].topic}>, payload is <-- $pt -->');
      print('');
    });

    client.published!.listen((MqttPublishMessage message) {
      print(
          'EXAMPLE::Published notification:: topic is ${message.variableHeader!.topicName}, with Qos ${message.header!.qos}');
      if (message.variableHeader!.topicName == topic1) {
        print(
            'EXAMPLE::Published notification:: Non subscribed topic publication received');
      }
    });

    final builder1 = MqttPayloadBuilder();
    builder1.addString('Hello from mqtt_client flutter');
    print('EXAMPLE:: <<<< PUBLISH 1 >>>>');
    client.publishMessage(topic1, MqttQos.atLeastOnce, builder1.payload!);
  }

  void onSubscribed(MqttSubscription topic) {
    print('EXAMPLE::Subscription confirmed for topic $topic');
  }

  void onUnSubscribed(MqttSubscription topic) {
    print('EXAMPLE::UnSubscribed for topic $topic');
  }

  void onAutoReconnected() {
    print('EXAMPLE::OnAutoReconnected');
    client.resubscribe();
  }

  void onDisconnected() {
    print('EXAMPLE::OnDisconnected client callback - Client disconnection');

  }
fredrick-foodyguru commented 2 years ago

Adding the logs - After the client reconnects to network

When the client was disconnected - three messages are sent by publisher


I/flutter ( 9272): 2022-06-17 21:27:53.265516 -- MqttSynchronousServerConnectionHandler::internalConnect - post sleep, state = Connection status is connecting with return code of success and a disconnection origin of none
I/flutter ( 9272): 2022-06-17 21:27:53.266291 -- MqttSynchronousServerConnectionHandler::internalConnect - initiating connection try 1, auto reconnect in progress true
I/flutter ( 9272): 2022-06-17 21:27:53.266725 -- MqttSynchronousServerConnectionHandler::internalConnect - calling connectAuto
I/flutter ( 9272): 2022-06-17 21:27:53.266915 -- MqttNormalConnection::connectAuto - entered
I/flutter ( 9272): 2022-06-17 21:27:53.347681 -- MqttServerConnection::_startListening
I/flutter ( 9272): 2022-06-17 21:27:53.350156 -- MqttSynchronousServerConnectionHandler::internalConnect - connection complete
I/flutter ( 9272): 2022-06-17 21:27:53.350429 -- MqttSynchronousServerConnectionHandler::internalConnect sending connect message
I/flutter ( 9272): 2022-06-17 21:27:53.350563 -- MqttConnectionHandlerBase::sendMessage - sending message started >>> -> MQTTMessage of type MqttMessageType.connect
I/flutter ( 9272): MessageType = MqttMessageType.connect Duplicate = false Retain = false Qos = atMostOnce Size = 18
I/flutter ( 9272): ProtocolName = MQTT
I/flutter ( 9272): ProtocolVersion = 5
I/flutter ( 9272): ConnectFlags = CleanStart=false, WillFlag=false, WillQos=atMostOnce, WillRetain=false, PasswordFlag=false, UserNameFlag=false
I/flutter ( 9272): KeepAlive = 0
I/flutter ( 9272): Properties = No properties setWill topic = null
I/flutter ( 9272): User name = not set
I/flutter ( 9272): Password = not set
I/flutter ( 9272): 2022-06-17 21:27:53.351230 -- MqttConnectionHandlerBase::sendMessage - sending message ended >>>
I/flutter ( 9272): 2022-06-17 21:27:53.351404 -- MqttSynchronousServerConnectionHandler::internalConnect - pre sleep, state = Connection status is connecting with return code of success and a disconnection origin of none
I/flutter ( 9272): 2022-06-17 21:27:53.363290 -- MqttServerConnection::_onData - Message Received Started <<< 
I/flutter ( 9272): 2022-06-17 21:27:53.363728 -- MqttServerConnection::_ondata - adding incoming data, data length is 14, message stream length is 0, message stream position is 0
I/flutter ( 9272): 2022-06-17 21:27:53.364254 -- MqttServerConnection::_onData - MESSAGE RECEIVED -> MQTTMessage of type MqttMessageType.connectAck
I/flutter ( 9272): MessageType = MqttMessageType.connectAck Duplicate = false Retain = false Qos = atMostOnce Size = 12
I/flutter ( 9272): Session Present = false
I/flutter ( 9272): Connect Reason Code = success
I/flutter ( 9272): Session Expiry Interval = 0
I/flutter ( 9272): Receive Maximum = 20
I/flutter ( 9272): Maximum QoS = 2
I/flutter ( 9272): Retain Available = false
I/flutter ( 9272): Maximum Packet Size = 0
I/flutter ( 9272): Assigned client Identifier = null
I/flutter ( 9272): Topic Alias Maximum = 10
I/flutter ( 9272): Reason String = null
I/flutter ( 9272): Wildcard Subscription Available = true
I/flutter ( 9272): Subscription Identifiers Available = true
I/flutter ( 9272): Shared Subscription Available = true
I/flutter ( 9272): broker Keep Alive = 65535
I/flutter ( 9272): Response Information = null
I/flutter ( 9272): broker Reference = null
I/flutter ( 9272): Authentication Method = null
I/flutter ( 9272): Properties = Identifier : serverKeepAlive, value : 65535
I/flutter ( 9272): Identifier : topicAliasMaximum, value : 10
I/flutter ( 9272): Identifier : receiveMaximum, value : 20
I/flutter ( 9272): 
I/flutter ( 9272): 2022-06-17 21:27:53.375269 -- MqttServerConnection::_onData - message available event fired
I/flutter ( 9272): 2022-06-17 21:27:53.375575 -- MqttServerConnection::_onData - Message Received Ended <<< 
I/flutter ( 9272): 2022-06-17 21:27:53.375866 -- MqttConnectionHandlerBase::_connectAckProcessor
I/flutter ( 9272): 2022-06-17 21:27:53.376147 -- MqttConnectionHandlerBase::_connectAckProcessor - state = connected
I/flutter ( 9272): 2022-06-17 21:27:53.376381 -- MqttConnectionHandlerBase:: cancelling connect timer
I/flutter ( 9272): 2022-06-17 21:27:53.376754 -- MqttSynchronousServerConnectionHandler::internalConnect - post sleep, state = Connection status is connected with return code of success and a disconnection origin of none
I/flutter ( 9272): 2022-06-17 21:27:53.377128 -- MqttSynchronousServerConnectionHandler::internalConnect exited with state Connection status is connected with return code of success and a disconnection origin of none
I/flutter ( 9272): 2022-06-17 21:27:53.377419 -- MqttConnectionHandlerBase::autoReconnect - auto reconnect complete
I/flutter ( 9272): EXAMPLE::OnAutoReconnected
I/flutter ( 9272): 2022-06-17 21:27:53.377915 -- MqttConnectionHandlerBase::sendMessage - sending message started >>> -> MQTTMessage of type MqttMessageType.subscribe
I/flutter ( 9272): MessageType = MqttMessageType.subscribe Duplicate = false Retain = false Qos = atLeastOnce Size = 0
I/flutter ( 9272): Message Identifier = 3
I/flutter ( 9272): Subscription identifier = 0
I/flutter ( 9272): Properties = No properties set
I/flutter ( 9272): Topic = demo/, Option = Maximum Qos = atLeastOnce
I/flutter ( 9272): No Local = false
I/flutter ( 9272): Retain As Published = true
I/flutter ( 9272): Retain Handling = sendRetained
I/flutter ( 9272): 2022-06-17 21:27:53.378677 -- MqttConnectionHandlerBase::sendMessage - sending message ended >>>
I/flutter ( 9272): 2022-06-17 21:27:53.379283 -- MttSubscriptionManager::_resubscribe - NOT resubscribing from auto reconnect true, resubscribeOnAutoReconnect is false
I/flutter ( 9272): 2022-06-17 21:27:53.399507 -- MqttServerConnection::_onData - Message Received Started <<< 
I/flutter ( 9272): 2022-06-17 21:27:53.399852 -- MqttServerConnection::_ondata - adding incoming data, data length is 6, message stream length is 0, message stream position is 0
I/flutter ( 9272): 2022-06-17 21:27:53.400120 -- MqttServerConnection::_onData - MESSAGE RECEIVED -> MQTTMessage of type MqttMessageType.subscribeAck
I/flutter ( 9272): MessageType = MqttMessageType.subscribeAck Duplicate = false Retain = false Qos = atMostOnce Size = 4
I/flutter ( 9272): Message Identifier = 3
I/flutter ( 9272): Reason String = null
I/flutter ( 9272): Properties = No properties set
I/flutter ( 9272): Reason Code = grantedQos1
I/flutter ( 9272): 2022-06-17 21:27:53.400699 -- MqttServerConnection::_onData - message available event fired
I/flutter ( 9272): 2022-06-17 21:27:53.400949 -- MqttServerConnection::_onData - Message Received Ended <<< 
I/flutter ( 9272): 2022-06-17 21:27:53.401243 -- MqttConnectionHandlerBase::messageAvailable - message type is MqttMessageType.subscribeAck
I/flutter ( 9272): EXAMPLE::Subscription confirmed for topic Topic = demo/
I/flutter ( 9272): Maximum Qos = MqttQos.atLeastOnce
I/flutter ( 9272): Created Time = 2022-06-17 21:27:53.377891
shamblett commented 2 years ago

This could be related to #27, I'll merge this and re release the package, see if the fault persists.

fredrick-foodyguru commented 2 years ago

Thanks. Do we have an ETA for the release?

fredrick-foodyguru commented 2 years ago

I tested with the latest master branch, to see if that fixes the issue.. But it didn't fix the issue. Also on top of it, disrupted the behaviour in iOS. The iOS client was reconnecting continuously and also didn't receive the regular messages as well.

shamblett commented 2 years ago

I've just merged #29 which is also in this area, could you try your test against master again, ta.

fredrick-foodyguru commented 2 years ago

Tested again with the latest master. It still isn't fixed. Please find the following behaviours in respective platforms.

Infrastructure

Test case

Publishing details

Results

Android

iOS

EDIT: Ran on iOS again after clearing some cache in my local machine. iOS also behaves similar like other platforms

Browser(Chrome) - Using MqttBrowserClient

Debug service listening on ws://127.0.0.1:51449/xZbZxdCGCNM=/ws
flutter: EXAMPLE::Mosquitto client connecting....
flutter: 2022-06-20 14:02:55.646581 -- MqttConnectionHandlerBase::connect - server 192.168.1.66, port 1883
flutter: 2022-06-20 14:02:55.646735 -- MqttSynchronousServerConnectionHandler::internalConnect entered
flutter: 2022-06-20 14:02:55.646800 -- MqttSynchronousServerConnectionHandler::internalConnect - initiating connection try 0, auto reconnect in progress false
flutter: 2022-06-20 14:02:55.646873 -- MqttSynchronousServerConnectionHandler::internalConnect - insecure TCP selected
flutter: 2022-06-20 14:02:55.646931 -- MqttSynchronousServerConnectionHandler::internalConnect - calling connect
flutter: 2022-06-20 14:02:55.647 -- MqttNormalConnection::connect- entered
flutter: 2022-06-20 14:04:10.787997 -- MqttConnectionBase::_onError - calling disconnected callback
flutter: EXAMPLE::client exception - SocketException: Operation timed out (OS Error: Operation timed out, errno = 60), address = 192.168.1.66, port = 58632
flutter: 2022-06-20 14:04:10.788981 -- MqttConnectionHandlerBase::disconnect
flutter: 2022-06-20 14:04:10.789157 -- MqttConnectionHandlerBase::_performConnectionDisconnect entered
flutter: EXAMPLE::OnDisconnected client callback - Client disconnection
flutter: EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, state is MqttConnectionState.disconnected
flutter: EXAMPLE::OnDisconnected client callback - Client disconnection
fredrick-foodyguru commented 2 years ago

Not sure. Looks like flutter clients are starting a clean session in spite of not calling the startClean() in MqttConnectMessage.

Adding the Mosquttio logs while the client reconnects after it is disconnected.

Notice the Message ID (Mid)when client reconnects. Node client resumes from the Mid from where it's left. But Flutter clients starts from the scratch. We are not starting a clean session in both flutter and node.

Node

New connection from 127.0.0.1:57114 on port 1883.
1655761547: New client connected from 127.0.0.1:57114 as node_server (p2, c0, k60).
1655761547: No will message specified.
1655761547: Sending CONNACK to node_server (1, 0)
1655761547: Sending PUBLISH to node_server (d0, q1, r0, m181, 'demo/', ... (18 bytes))
1655761547: Sending PUBLISH to node_server (d0, q1, r0, m183, 'demo/', ... (21 bytes))
1655761547: Received SUBSCRIBE from node_server
1655761547:     presence (QoS 0)
1655761547: node_server 0 presence
1655761547: Sending SUBACK to node_server
1655761547: Received SUBSCRIBE from node_server
1655761547:     $shared/fred/demo/ (QoS 1)
1655761547: node_server 1 $shared/fred/demo/
1655761547: Sending SUBACK to node_server
1655761547: Received PUBACK from node_server (Mid: 181, RC:0)
1655761547: Received PUBACK from node_server (Mid: 183, RC:0)

Android

1655761932: New connection from 192.168.1.70:47966 on port 1883.
1655761932: Client test2 already connected, closing old connection.
1655761932: New client connected from 192.168.1.70:47966 as test2 (p5, c0, k0).
1655761932: No will message specified.
1655761932: Sending CONNACK to test2 (0, 0)
1655761932: Received SUBSCRIBE from test2
1655761932:     demo/ (QoS 1)
1655761932: test2 1 demo/
1655761932: Sending SUBACK to test2
1655761932: Sending PUBLISH to test2 (d0, q1, r1, m1, 'demo/', ... (22 bytes))
1655761933: Received PUBACK from test2 (Mid: 1, RC:0)

Browser Client

New client connected from 192.168.1.66:57379 as diner-client (p5, c0, k0).
1655761707: No will message specified.
1655761707: Sending CONNACK to diner-client (0, 0)
1655761707: Received SUBSCRIBE from diner-client
1655761707:     demo/ (QoS 1)
1655761707: diner-client 1 demo/
1655761707: Sending SUBACK to diner-client
1655761707: Sending PUBLISH to diner-client (d0, q1, r1, m1, 'demo/', ... (22 bytes))
1655761708: Received PUBACK from diner-client (Mid: 1, RC:0)
shamblett commented 2 years ago

OK, thanks II'll put some tests togther locally and see what I can see, what doe you mean by 'persisted message'. The client does not support persistence as some other clients do, or do you mean something else?

fredrick-foodyguru commented 2 years ago

Persisted Message are the messages(QoS1 or QoS2) stored in the MQTT server when client is disconnected. So the expected outcome is, when the client reconnects, the client should be receiving all the messages persisted for that client.

shamblett commented 2 years ago

From your logs posted 5 days ago when we get the conack message from the broker the client logs -

I/flutter ( 9272): Session Present = false

So it would appear the broker doesn't think a session for this client is present, do you know what the conack received by the node client contains?

shamblett commented 2 years ago

OK, pushed another change up to master, autoreconnect is supposed to use the connect message used when we initially connected, it wasn't it as using a new one every time, this package now does the same as mqtt_client.

fredrick-foodyguru commented 2 years ago

I checked the latest master and still is the same. I checked the node client logs while reconnecting to MQTT. Logs aren't clear about the CONACK. I notice the following in the log and I am assuming that's were the node client is receiving the queued msgs.

mqttjs:client connect :: sending queued packets +1ms

Please find the logs(Node client) below.

 mqttjs:tcp port 1883 and host 0.0.0.0 +0ms
  mqttjs:client _setupStream :: pipe stream to writable stream +0ms
  mqttjs:client _setupStream: sending packet `connect` +2ms
  mqttjs:client sendPacket :: packet: { cmd: 'connect' } +0ms
  mqttjs:client sendPacket :: emitting `packetsend` +1ms
  mqttjs:client sendPacket :: writing to stream +0ms
  mqttjs:client sendPacket :: writeToStream result true +12ms
Server running in development on port 3000
Client connected to redis...
Client connected to redis...
  mqttjs:client writable stream :: parsing buffer +15ms
  mqttjs:client parser :: on packet push to packets array. +1ms
  mqttjs:client parser :: on packet push to packets array. +0ms
  mqttjs:client parser :: on packet push to packets array. +0ms
  mqttjs:client work :: getting next packet in queue +0ms
  mqttjs:client work :: packet pulled from queue +0ms
  mqttjs:client _handlePacket :: emitting packetreceive +1ms
  mqttjs:client _handleConnack +0ms
  mqttjs:client _setupPingTimer :: keepalive 40 (seconds) +0ms
  mqttjs:client connect :: sending queued packets +1ms
  mqttjs:client deliver :: entry undefined +0ms
  mqttjs:client _resubscribe +0ms
  mqttjs:client subscribe: array topic presence +0ms
  mqttjs:client subscribe: pushing topic `presence` and qos `0` to subs list +0ms
  mqttjs:client subscribe :: resubscribe true +0ms
  mqttjs:client subscribe :: call _sendPacket +1ms
  mqttjs:client _sendPacket :: (node_server}) ::  start +0ms
  mqttjs:client sendPacket :: packet: {
  mqttjs:client   cmd: 'subscribe',
  mqttjs:client   subscriptions: [ { topic: 'presence', qos: 0 } ],
  mqttjs:client   qos: 1,
  mqttjs:client   retain: false,
  mqttjs:client   dup: false,
  mqttjs:client   messageId: 23533
  mqttjs:client } +0ms
  mqttjs:client sendPacket :: emitting `packetsend` +0ms
  mqttjs:client sendPacket :: writing to stream +0ms
  mqttjs:client sendPacket :: writeToStream result true +0ms
  mqttjs:client sendPacket :: invoking cb +0ms
  mqttjs:client nop :: undefined +0ms
  mqttjs:client subscribe: array topic demo/ +0ms
  mqttjs:client subscribe: pushing topic `demo/` and qos `1` to subs list +0ms
  mqttjs:client subscribe :: resubscribe true +0ms
  mqttjs:client subscribe :: call _sendPacket +0ms
  mqttjs:client _sendPacket :: (node_server}) ::  start +0ms
  mqttjs:client sendPacket :: packet: {
  mqttjs:client   cmd: 'subscribe',
  mqttjs:client   subscriptions: [ { topic: 'demo/', qos: 1 } ],
  mqttjs:client   qos: 1,
  mqttjs:client   retain: false,
  mqttjs:client   dup: false,
  mqttjs:client   messageId: 23534
  mqttjs:client } +1ms
  mqttjs:client sendPacket :: emitting `packetsend` +0ms
  mqttjs:client sendPacket :: writing to stream +0ms
  mqttjs:client sendPacket :: writeToStream result true +0ms
  mqttjs:client sendPacket :: invoking cb +0ms
  mqttjs:client nop :: undefined +0ms
  mqttjs:client subscribe: array topic $share/server/kitchen/+/orderId +0ms
  mqttjs:client subscribe: pushing topic `$share/server/kitchen/+/orderId` and qos `1` to subs list +0ms
  mqttjs:client subscribe :: resubscribe true +0ms
  mqttjs:client subscribe :: call _sendPacket +0ms
  mqttjs:client _sendPacket :: (node_server}) ::  start +0ms
  mqttjs:client sendPacket :: packet: {
  mqttjs:client   cmd: 'subscribe',
  mqttjs:client   subscriptions: [ { topic: '$share/server/kitchen/+/orderId', qos: 1 } ],
  mqttjs:client   qos: 1,
  mqttjs:client   retain: false,
  mqttjs:client   dup: false,
  mqttjs:client   messageId: 23535
  mqttjs:client } +0ms
  mqttjs:client sendPacket :: emitting `packetsend` +1ms
  mqttjs:client sendPacket :: writing to stream +0ms
  mqttjs:client sendPacket :: writeToStream result true +0ms
  mqttjs:client sendPacket :: invoking cb +0ms
  mqttjs:client nop :: undefined +0ms
  mqttjs:client work :: getting next packet in queue +1ms
  mqttjs:client work :: packet pulled from queue +0ms
  mqttjs:client _handlePacket :: emitting packetreceive +0ms
  mqttjs:client _handlePublish: packet Packet { cmd: 'publish', retain: false, qos: 1, dup: false, length: 22, topic: 'demo/', payload: <Buffer 7b 27 68 65 6c 6c 6f 27 3a 32 30 27 7d>, messageId: 15 } +0ms
  mqttjs:client _handlePublish: qos 1 +0ms
demo/ : {'hello':20'}
  mqttjs:client _sendPacket :: (node_server}) ::  start +1ms
  mqttjs:client sendPacket :: packet: { cmd: 'puback', messageId: 15, reasonCode: 0 } +0ms
  mqttjs:client sendPacket :: emitting `packetsend` +0ms
  mqttjs:client sendPacket :: writing to stream +0ms
  mqttjs:client sendPacket :: writeToStream result true +0ms
  mqttjs:client sendPacket :: invoking cb +0ms
  mqttjs:client work :: getting next packet in queue +0ms
  mqttjs:client work :: packet pulled from queue +0ms
  mqttjs:client _handlePacket :: emitting packetreceive +0ms
  mqttjs:client _handlePublish: packet Packet { cmd: 'publish', retain: false, qos: 1, dup: false, length: 22, topic: 'demo/', payload: <Buffer 7b 27 68 65 6c 6c 6f 27 3a 32 31 27 7d>, messageId: 16 } +0ms
  mqttjs:client _handlePublish: qos 1 +1ms
demo/ : {'hello':21'}
  mqttjs:client _sendPacket :: (node_server}) ::  start +0ms
  mqttjs:client sendPacket :: packet: { cmd: 'puback', messageId: 16, reasonCode: 0 } +0ms
  mqttjs:client sendPacket :: emitting `packetsend` +0ms
  mqttjs:client sendPacket :: writing to stream +0ms
  mqttjs:client sendPacket :: writeToStream result true +0ms
  mqttjs:client sendPacket :: invoking cb +0ms
MongoDB Connected: localhost
  mqttjs:client writable stream :: parsing buffer +11ms
  mqttjs:client parser :: on packet push to packets array. +0ms
  mqttjs:client parser :: on packet push to packets array. +1ms
  mqttjs:client parser :: on packet push to packets array. +0ms
  mqttjs:client work :: getting next packet in queue +0ms
  mqttjs:client work :: packet pulled from queue +0ms
  mqttjs:client _handlePacket :: emitting packetreceive +0ms
  mqttjs:client _handleAck :: packet type suback +0ms
  mqttjs:client publish :: message `Hello mqtt from presence` to topic `presence` +0ms
  mqttjs:client publish :: qos 0 +0ms
  mqttjs:client MqttClient:publish: packet cmd: publish +0ms
  mqttjs:client _sendPacket :: (node_server}) ::  start +0ms
  mqttjs:client sendPacket :: packet: {
  mqttjs:client   cmd: 'publish',
  mqttjs:client   topic: 'presence',
  mqttjs:client   payload: 'Hello mqtt from presence',
  mqttjs:client   qos: 0,
  mqttjs:client   retain: false,
  mqttjs:client   messageId: 0,
  mqttjs:client   dup: false
  mqttjs:client } +0ms
  mqttjs:client sendPacket :: emitting `packetsend` +0ms
  mqttjs:client sendPacket :: writing to stream +0ms
  mqttjs:client sendPacket :: writeToStream result true +0ms
  mqttjs:client sendPacket :: invoking cb +0ms
  mqttjs:client nop :: undefined +0ms
  mqttjs:client _sendPacket :: (node_server}) ::  end +0ms
  mqttjs:client work :: getting next packet in queue +1ms
  mqttjs:client work :: packet pulled from queue +0ms
  mqttjs:client _handlePacket :: emitting packetreceive +0ms
  mqttjs:client _handleAck :: packet type suback +0ms
  mqttjs:client publish :: message `Hello mqtt from server` to topic `demo/` +0ms
  mqttjs:client publish :: qos 0 +0ms
  mqttjs:client MqttClient:publish: packet cmd: publish +0ms
  mqttjs:client _sendPacket :: (node_server}) ::  start +0ms
  mqttjs:client sendPacket :: packet: {
  mqttjs:client   cmd: 'publish',
  mqttjs:client   topic: 'demo/',
  mqttjs:client   payload: 'Hello mqtt from server',
  mqttjs:client   qos: 0,
  mqttjs:client   retain: false,
  mqttjs:client   messageId: 0,
  mqttjs:client   dup: false
  mqttjs:client } +0ms
  mqttjs:client sendPacket :: emitting `packetsend` +0ms
  mqttjs:client sendPacket :: writing to stream +0ms
  mqttjs:client sendPacket :: writeToStream result true +0ms
  mqttjs:client sendPacket :: invoking cb +0ms
  mqttjs:client nop :: undefined +0ms
  mqttjs:client _sendPacket :: (node_server}) ::  end +0ms
  mqttjs:client work :: getting next packet in queue +1ms
  mqttjs:client work :: packet pulled from queue +0ms
  mqttjs:client _handlePacket :: emitting packetreceive +0ms
  mqttjs:client _handleAck :: packet type suback +0ms
  mqttjs:client publish :: message `OrderId` to topic `presence` +0ms
  mqttjs:client publish :: qos 0 +0ms
  mqttjs:client MqttClient:publish: packet cmd: publish +1ms
  mqttjs:client _sendPacket :: (node_server}) ::  start +0ms
  mqttjs:client sendPacket :: packet: {
  mqttjs:client   cmd: 'publish',
  mqttjs:client   topic: 'presence',
  mqttjs:client   payload: 'OrderId',
  mqttjs:client   qos: 0,
  mqttjs:client   retain: false,
  mqttjs:client   messageId: 0,
  mqttjs:client   dup: false
  mqttjs:client } +0ms
  mqttjs:client sendPacket :: emitting `packetsend` +0ms
  mqttjs:client sendPacket :: writing to stream +1ms
  mqttjs:client sendPacket :: writeToStream result true +0ms
  mqttjs:client sendPacket :: invoking cb +0ms
  mqttjs:client nop :: undefined +0ms
  mqttjs:client _sendPacket :: (node_server}) ::  end +0ms
Client connected to redis and ready to use...
Client connected to redis and ready to use...
  mqttjs:client writable stream :: parsing buffer +2ms
  mqttjs:client parser :: on packet push to packets array. +0ms
  mqttjs:client parser :: on packet push to packets array. +0ms
  mqttjs:client parser :: on packet push to packets array. +1ms
  mqttjs:client work :: getting next packet in queue +0ms
  mqttjs:client work :: packet pulled from queue +0ms
  mqttjs:client _handlePacket :: emitting packetreceive +0ms
  mqttjs:client _handlePublish: packet Packet { cmd: 'publish', retain: false, qos: 0, dup: false, length: 34, topic: 'presence', payload: <Buffer 48 65 6c 6c 6f 20 6d 71 74 74 20 66 72 6f 6d 20 70 72 65 73 65 6e 63 65> } +0ms
  mqttjs:client _handlePublish: qos 0 +0ms
presence : Hello mqtt from presence
  mqttjs:client work :: getting next packet in queue +0ms
  mqttjs:client work :: packet pulled from queue +0ms
  mqttjs:client _handlePacket :: emitting packetreceive +0ms
  mqttjs:client _handlePublish: packet Packet { cmd: 'publish', retain: false, qos: 0, dup: false, length: 29, topic: 'demo/', payload: <Buffer 48 65 6c 6c 6f 20 6d 71 74 74 20 66 72 6f 6d 20 73 65 72 76 65 72> } +0ms
  mqttjs:client _handlePublish: qos 0 +0ms
demo/ : Hello mqtt from server
  mqttjs:client work :: getting next packet in queue +0ms
  mqttjs:client work :: packet pulled from queue +0ms
  mqttjs:client _handlePacket :: emitting packetreceive +0ms
  mqttjs:client _handlePublish: packet Packet { cmd: 'publish', retain: false, qos: 0, dup: false, length: 17, topic: 'presence', payload: <Buffer 4f 72 64 65 72 49 64> } +0ms
  mqttjs:client _handlePublish: qos 0 +1ms
shamblett commented 2 years ago

OK, getting there, the session expiry interval in the connect message unfortunately defaults to 0 which has the effect of killing the session whether or mot clean start is set, try setting this on your connect message -

final connMess = MqttConnectMessage();
  connMess.variableHeader?.sessionExpiryInterval = MqttConnectVariableHeader.sessionDoesNotExpire;

This is working now for me with the HiveMQ broker,

Note the incoming queued messages are sent immediately after the connack, so you be listening at this point ready for them, why these messages are seemingly discarded if by the client if you are not is something else to look at.

shamblett commented 2 years ago

Version 3.3.0 now released.