shamblett / mqtt_client

A server and browser based MQTT client for dart
Other
548 stars 176 forks source link

Subscriptions broken after autoReconnect #209

Closed osoftware closed 4 years ago

osoftware commented 4 years ago

I was struggling with my app misbehavior after resuming. It was able to send MQTT messages (I was seeing them on the backend) but nothing could be received. The investigation led to autoReconnect featue. Not only subscriptions seem to be canceled, subscribing again does nothing. Here's a test that demonstrates the issue:

    test('should maintain subscriptions after autoReconnect', () async {
      final client = MqttServerClient.withPort('broker.hivemq.com', 'client-id-123456789', 1883);
      client.autoReconnect = true;
      client.logging(on: true);
      await client.connect('user', 'password');
      client.subscribe('xd/+', MqttQos.exactlyOnce);

      client.doAutoReconnect(force: true); // this line breaks the test
      // client.subscribe('xd/+', MqttQos.exactlyOnce); // uncommenting this line doesn't help

      final stream = client.updates.expand((event) sync* {
        for (var e in event) {
          MqttPublishMessage message = e.payload;
          yield utf8.decode(message.payload.message);
        }
      }).timeout(Duration(seconds: 5));

      client.publishMessage('xd/light', MqttQos.exactlyOnce,
          (MqttClientPayloadBuilder()..addUTF8String('xd')).payload);

      expect(await stream.first, equals('xd'));
    });

Using version 7.3.0

shamblett commented 4 years ago

Just an update here, I am looking at this unfortunately its not an easy fix and I'm a bit short of time to devote to it the moment.

ened commented 4 years ago

@osoftware did you find a workaround?

osoftware commented 4 years ago

@ened I disabled autoReconnect feature completely and implemented it myself in a facade class wrapping MqttClient. Every time I try to send a message, I check MqttConnecionState first and connect again if necessary.

class MqttFacade extends WidgetsBindingObserver {
  MqttClient _client;
  final _reconnecting = Lock();

  MqttFacade() {
    WidgetsBinding.instance?.addObserver(this);
  }

  Future<void> dispose() async {
    WidgetsBinding.instance.removeObserver(this);
   // other stuff
  }

  @override
  void didChangeAppLifecycleState(AppLifecycleState state) {
    if (state == AppLifecycleState.resumed && _client != null) {
      reconnect();
    }
  }

  // More or less equivalent of:
  // _client.doAutoReconnect();
  Future<void> reconnect() async {
    await _reconnecting.synchronized(() async {
      await connect(
          _client.server,
          _client.port,
          _client.connectionMessage.payload.username,
          _client.connectionMessage.payload.password);
      subscribe(_home);
    });
  }

  Future<void> connect(String broker, int port, String user, String password) async {
      // _client.connect etc...
  }

  void subscribe(String home) {
   /// _client.subscribe etc...
  }

  Future<void> send(Message message, {bool retain = false}) async {
    if (_client.connectionStatus.state != MqttConnectionState.connected) await reconnect();
    // _client.publishMessage etc...
  }

  // other stuff
}
shamblett commented 4 years ago

OK, I have a working solution now but this raises its own questions, I've updated your test to the code below -

import 'dart:async';
import 'dart:convert';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
import 'package:test/test.dart';

Future<int> main() async {
  test('should maintain subscriptions after autoReconnect', () async {
    final client = MqttServerClient.withPort(
        'broker.hivemq.com', 'client-id-123456789', 1883);
    client.autoReconnect = true;
    client.logging(on: true);
    const topic = 'xd/+';

    // Subscribe callback, we do the auto reconnect when we know we have subscribed
    // second time is from the resubscribe so we ignore it.
    var ignoreSubscribe = false;
    void subCB(subTopic) async {
      if (ignoreSubscribe) {
        print('ISSUE: Received re-subscribe callback for our topic - ignoring');
        return;
      }
      if (topic == subTopic) {
        print(
            'ISSUE: Received subscribe callback for our topic - auto reconnecting');
        client.doAutoReconnect(force: true);
      } else {
        print('ISSUE: Received subscribe callback for unknown topic $subTopic');
      }
      ignoreSubscribe = true;
      print('ISSUE: Exiting subscribe callback');
    }

    // New call back for when auto reconnect is complete
    void autoReconnected() async {
      // First unsubscribe
      print('ISSUE: Auto reconnected - Unsubscribing');
      client.unsubscribe(topic);
      await MqttUtilities.asyncSleep(1);

      // Now resubscribe
      print('ISSUE: Auto reconnected - Subscribing');
      client.subscribe(topic, MqttQos.exactlyOnce);
      await MqttUtilities.asyncSleep(1);

      // Now re publish
      print('ISSUE: Auto reconnected - Publishing');
      client.publishMessage('xd/light', MqttQos.exactlyOnce,
          (MqttClientPayloadBuilder()..addUTF8String('xd')).payload);
    }

    // Main test starts here
    print('ISSUE: Main test start');
    client.onSubscribed = subCB; // Subscribe callback
    client.onAutoReconnected = autoReconnected; // Auto reconnected callback
    print('ISSUE: Connecting');
    await client.connect('user', 'password');
    client.subscribe(topic, MqttQos.exactlyOnce);

    // Now publish the message
    print('ISSUE: Publishing');
    client.publishMessage('xd/light', MqttQos.exactlyOnce,
        (MqttClientPayloadBuilder()..addUTF8String('xd')).payload);

    // Listen for our responses.
    print('ISSUE: Listening >>>>');
    final stream = client.updates.expand((event) sync* {
      for (var e in event) {
        MqttPublishMessage message = e.payload;
        yield utf8.decode(message.payload.message);
      }
    }).timeout(Duration(seconds: 20));

    expect(await stream.first, equals('xd'));
    print('ISSUE: Test complete');
  });

  return 0;
}

As you can see the auto reconnect is now called from the initial subscribe callback, this just makes it easier to test, the main update is the autoReconnected callback that's been added. In it we do the heavy lifting of unsubscribing, resubscribing and resending the publish message with appropriate waits in between them(unfortunate for now, the protocol is the protocol some things have to be done before others).

This works now but puts a lot of work on the user, the upside of this is that the user gets very fine grained control over what happens on re connect(some users asked for the various callbacks specifically for this reason), the downside is re connect doesn't just magically happen, i.e your incoming message stream doesn't just start working again.

As a user yourself I'd like your thoughts on this and any other suggestions you might have.

If you want to play with the code its on the 'development' branch of the repo, update your pubspec yaml appropriately to use it.

ened commented 4 years ago

hi @shamblett - speaking from a point where I'm not too familiar with MQTT yet, I think the sample you posted is somewhat difficult to follow and while I can understand that some users may want fine-grained control & callbacks on reconnect, I believe that most users don't.

The library does support the specification of callback methods, so you could ship with well-written (like your example) default handlers that do all the work underneath.

As a user, I would simply like to write this to set up a subscription:

final client = MqttServerClient.withPort(
        'broker.hivemq.com', 'client-id-123456789', 1883);
client.autoReconnect = true;
client.logging(on: true);
const topic = 'xd/+';
final stream = client.updates.expand((event) sync* {
  for (var e in event) {
    MqttPublishMessage message = e.payload;
    yield utf8.decode(message.payload.message);
  }
});
_sub = stream.listen((e) {
  // process results
});

And at some point in my business logic / app:

_sub?.cancel();

From part 1 to part 2, the stream should:

Personally, I'd make autoReconnect = true the default.

The behavior of "unsubscribing & resubscribing on reconnect" - could be an implementation detail of the library or configured using a flag. It also means the library would need to keep a table of active subscriptions if enabled.

shamblett commented 4 years ago

Yes, the unsubscribe and resubscribe in the autoReconnected callback are too much to burden the user with. I'll update the code to do this automatically.

This should result in the incoming message stream just restarting again as and when. In the test case above we would still need the publish as you are self-publishing, if a 3rd party were publishing to your topic the autoReconnected callback would now be empty.

ThiagoCortez81 commented 4 years ago

Hi guys! To solve this, I always resubscribe to my topics inside onConnected method;

final client = MqttServerClient.withPort('broker.hivemq.com', 'client-id-123456789', 1883);
client.logging(on: false);
client.keepAlivePeriod = 15;
client.onDisconnected = onDisconnected;
client.onConnected = onConnected;
client.onSubscribed = onSubscribed;
client.pongCallback = pong;
client.autoReconnect = true;
// The successful connect callback
void onConnected() {
  print('Conectado ao MQTT com sucesso.');
  subscribeToTopics();
}
void subscribeToTopics() {
  // Do your updates listening here
}
ened commented 4 years ago

@ThiagoCortez81 thank you for your example!

I applied this to my project, but ran into a issue when the MQTT server restarts.

(We are running Mosquitto, so a simple /etc/init.d/mosquitto restart caused the issue.)

The following change to onConnected was needed:

_client.onConnected = () {
  for (String topic in _subscribedTopics) {
    // unsubsubscribe does _not_ work
    // _client.unsubscribe(topic); 

    // access a private variable (subscribtionsManager)
    _client.subscriptionsManager.subscriptions.remove(topic);
    _client.subscribe(topic, _defaultSubscribeQos);
  }
};

@shamblett the unsubscribe method seems to require a server to send unsubscribe ack. In the case when a server goes away, this will not be received, and unsubscribe fails.

When it fails, the subscription is not cleared, causing a re-subscription to reuse the existing sub identifier, which is stale already.

shamblett commented 4 years ago

OK, I've updated client to automatically re subscribe confirmed subscriptions on auto reconnect. This means that the original test posted above now works with only one slight change -

test('should maintain subscriptions after autoReconnect', () async {
    final client = MqttServerClient.withPort(
        'broker.hivemq.com', 'client-id-123456789', 1883);
    client.autoReconnect = true;
    client.logging(on: true);
    await client.connect('user', 'password');
    client.subscribe('xd/+', MqttQos.exactlyOnce);
   await MqttUtilities.asyncSleep(1);
    client.doAutoReconnect(force: true); // this line breaks the test
    // client.subscribe('xd/+', MqttQos.exactlyOnce); // uncommenting this line doesn't help

    final stream = client.updates.expand((event) sync* {
      for (var e in event) {
        MqttPublishMessage message = e.payload;
        yield utf8.decode(message.payload.message);
      }
    }).timeout(Duration(seconds: 5));

    client.publishMessage('xd/light', MqttQos.exactlyOnce,
        (MqttClientPayloadBuilder()..addUTF8String('xd')).payload);

    expect(await stream.first, equals('xd'));
  });

As can be seen an await MqttUtilities.asyncSleep(1) has been added to ensure we receive the subscribe ack from the broker to confirm the subscription before auto reconnecting. No additional callbacks are now needed.

This functionality is controlled by the resubscribeOnAutoReconnect setting on the client. If this is true auto resubscription occurs. if false then the user has to do this manually in any way they wish. To facilitate this and overcome the fault reported above by @ened a new method named resubscribe has been added to the client, this will automatically unsubscribe all confirmed subscriptions without sending an unsubscribe message to the broker and re subscribe them.

Hopefully this should now encompass functionality needed in the majority of use cases. I'll leave this open for further comment while I look at the related #211

shamblett commented 4 years ago

Client re-published at 8.0.0, please use this version for any further testing. Closing this issue, please raise specific issues as they arise.