shamblett / mqtt_client

A server and browser based MQTT client for dart
Other
552 stars 179 forks source link

I am not able to unsubscribe my topics on local server #96

Closed DhavalRKansara closed 5 years ago

DhavalRKansara commented 5 years ago

I am trying to unsubscribe topic on a local broker and getting the following exception, could you please check and tell if there is some error from my side? My Code

class MQTTHelper {
  final MqttClient client = MqttClient('172.16.4.231', '');

  Future setUpMQTT() async {
    client.logging(on: true);
    client.onConnected = onConnected;
    client.keepAlivePeriod = 30;
    client.onSubscribed = onSubscribed;
    client.onDisconnected = onDisconnected;
    client.onUnsubscribed = onUnsubscribed;
    client.pongCallback = pong;
    client.secure = false;

    final MqttConnectMessage connMess = await MqttConnectMessage()
        .withClientIdentifier(
            'rayWebSocketClient_123456_87f52272-7aa5-45ae-b25b-4a9e4202ba27')
        .authenticateAs('user', 'password')
        .keepAliveFor(30)
        .startClean()
        .withWillQos(MqttQos.atMostOnce);
    print(' Mosquitto client connecting....');

    client.connectionMessage = connMess;

    ///Connection
    try {
      await client.connect();
    } on Exception catch (e) {
      print('client Connection exception - $e');
    }
}

void subscribeWithTopic() {
        client.subscribe(topic, MqttQos.atLeastOnce);
  }

  void unSubscribeWithTopic( String topic)async {
      client.unsubscribe(topic);
  }

/// Note : MQTT Delegate methods
  void onSubscribed(String topic) {
    print(' onSubscribed=> Subscription confirmed for topic $topic');
  }

  void onUnsubscribed(String topic) {
    print('onUnsubscribed=> Subscription confirmed for topic $topic');
  }

Below is the error log when I tried to unsubscribe after subscription of the topic :

flutter: 2019-05-08 16:24:14.880625 -- MqttConnectionHandler::sendMessage - MQTTMessage of type MqttMessageType.unsubscribe
Header: MessageType = MqttMessageType.unsubscribe, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 0
Unsubscribe VariableHeader Variable Header: MessageIdentifier={3}
Payload: Unsubscription [{1}]
{{ Topic={websockets/stations/stats/fastUpdates/e4eed60a-c308-49ad-92c9-049deaf652ed}}
flutter: MAIN DICTIONARY {websockets/stations/list/e4eed60a-c308-49ad-92c9-049deaf652ed: {_DashboardScreenState#536fe(lifecycle state: defunct, not mounted)}}
flutter: 2019-05-08 16:24:14.883441 -- MqttConnectionHandler::sendMessage - MQTTMessage of type MqttMessageType.unsubscribe
Header: MessageType = MqttMessageType.unsubscribe, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 0
Unsubscribe VariableHeader Variable Header: MessageIdentifier={4}
Payload: Unsubscription [{1}]
{{ Topic={websockets/stations/list/e4eed60a-c308-49ad-92c9-049deaf652ed}}
flutter: MAIN DICTIONARY {}
flutter: 2019-05-08 16:24:15.229265 -- MqttConnection::_onError - calling disconnected callback
flutter: onDisconnected => Client Disconnected
flutter: 2019-05-08 16:24:15.232918 -- MqttConnection::_onDone - calling disconnected callback
flutter: 2019-05-08 16:24:20.249988 -- SynchronousMqttConnectionHandler::internalConnect entered
flutter: 2019-05-08 16:24:20.250819 -- SynchronousMqttConnectionHandler::internalConnect - initiating connection try 0
flutter: 2019-05-08 16:24:20.251623 -- SynchronousMqttConnectionHandler::internalConnect - insecure TCP selected
flutter: 2019-05-08 16:24:20.552507 -- MqttConnection::_startListening
flutter: 2019-05-08 16:24:20.557275 -- SynchronousMqttConnectionHandler::internalConnect sending connect message
flutter: 2019-05-08 16:24:20.560229 -- MqttConnectionHandler::sendMessage - MQTTMessage of type MqttMessageType.connect
Header: MessageType = MqttMessageType.connect, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 108
Connect Variable Header: ProtocolName=MQIsdp, ProtocolVersion=3, ConnectFlags=Connect Flags: Reserved1=false, CleanStart=true, WillFlag=false, WillQos=MqttQos.atMostOnce, WillRetain=false, PasswordFlag=true, UserNameFlag=true, KeepAlive=30
Instance of ‘MqttConnectPayload’
flutter: 2019-05-08 16:24:20.569746 -- SynchronousMqttConnectionHandler::internalConnect - pre sleep, state = Connection status is connecting with return code noneSpecified
flutter: 2019-05-08 16:24:21.005652 -- MqttConnection::_onData
flutter: 2019-05-08 16:24:21.009339 -- MqttConnection::_onData - message received MQTTMessage of type MqttMessageType.connectAck
Header: MessageType = MqttMessageType.connectAck, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 2
Connect Variable Header: TopicNameCompressionResponse={0}, ReturnCode={MqttConnectReturnCode.connectionAccepted}
flutter: 2019-05-08 16:24:21.010958 -- MqttConnection::_onData - message processed
flutter: 2019-05-08 16:24:21.013522 -- SynchronousMqttConnectionHandler::_connectAckProcessor
flutter: 2019-05-08 16:24:21.014252 -- SynchronousMqttConnectionHandler::_connectAckProcessor - state = connected
flutter: onConnected => Client connection was successful
flutter: 2019-05-08 16:24:21.015776 -- SynchronousMqttConnectionHandler:: cancelling connect timer
flutter: 2019-05-08 16:24:21.016919 -- SynchronousMqttConnectionHandler::internalConnect - post sleep, state = Connection status is connected with return code connectionAccepted
flutter: 2019-05-08 16:24:21.017572 -- SynchronousMqttConnectionHandler::internalConnect exited with state Connection status is connected with return code connectionAccepted

Error log from broker side:

11:39:27.803 [debug] stopped connected session, due to unexpected frame type {error,cant_parse_variable_header}
11:39:27.804 [warning] session stopped abnormally due to ’{error,unexpected_message,{error,cant_parse_variable_header}} 
shamblett commented 5 years ago

You don't show where this is called from

void unSubscribeWithTopic( String topic)async { client.unsubscribe(topic); }

also its marked as async, it needn't be, the unsubscribe method does not return a future so you can't await it. That said, the problem seems to be here

flutter: 2019-05-08 16:24:14.880625 -- MqttConnectionHandler::sendMessage - MQTTMessage of type MqttMessageType.unsubscribe

Header: MessageType = MqttMessageType.unsubscribe, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 0

Unsubscribe VariableHeader Variable Header: MessageIdentifier={3}

Payload: Unsubscription [{1}]

{{ Topic={websockets/stations/stats/fastUpdates/e4eed60a-c308-49ad-92c9-049deaf652ed}}

flutter: MAIN DICTIONARY {websockets/stations/list/e4eed60a-c308-49ad-92c9-049deaf652ed: {_DashboardScreenState#536fe(lifecycle state: defunct, not mounted)}}

flutter: 2019-05-08 16:24:14.883441 -- MqttConnectionHandler::sendMessage - MQTTMessage of type MqttMessageType.unsubscribe

Header: MessageType = MqttMessageType.unsubscribe, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 0

Unsubscribe VariableHeader Variable Header: MessageIdentifier={4}

Payload: Unsubscription [{1}]

Your unsubscribing twice, in each case the message size is 0 which is a bit odd, I'll look at this, the broker log shows that this bad message was picked up, which is why you re-connected, which seemed to work Ok.

I need to see how you are using your MQTTHelper class really.

bigbirdhzp commented 5 years ago

I use nodejs to create local mqtt server.The MQTTBox can connect the local mqtt server and subscribe & push.But in flutter , the app i crated which can't connected the local mqtt server.

here is my nodejs mqtt server.

var mosca = require('mosca');

var ascoltatore = {
  //using ascoltatore
  //type: 'mongo',
  //url: 'mongodb://localhost:27017/mqtt',
  //pubsubCollection: 'ascoltatori',
  //mongo: {}
};

var settings = {
  port: 1883,
  backend: ascoltatore
};

var server = new mosca.Server(settings);

server.on('clientConnected', function(client) {
    console.log('client connected', client.id);
});

// fired when a message is received
server.on('published', function(packet, client) {
  console.log('Published', packet.payload);
});

server.on('ready', setup);

// fired when the mqtt server is ready
function setup() {
  console.log('Mosca server is up and running');
}

the flutter app code just like your example.But i change the broker is 192.168.1.16. the client.port is 1883.

I see the error, it confused to me.The port change to 48359. image

shamblett commented 5 years ago

Does it always print the same port number?

bigbirdhzp commented 5 years ago

Does it always print the same port number?

No.some times the port number will change

shamblett commented 5 years ago

Ok this is your local port number then it looks like a network error could you post the code of how you initialise the mqtt client

jwangnz commented 5 years ago

Just a note, when working with RabbitMQ's MQTT plugin, the topic unsubscribing payload should come with Qos as MqttQos.atLeastOnce or the MQTT plugin would throw an unparseable payload error.

MohammedNoureldin commented 3 years ago

@tsing in my case your comment solved my problem! Thank you! The library does not add Qos to unsubscribe as I see? I opened an issue for this, I am not sure if I am correct here of if I am missing something: https://github.com/shamblett/mqtt_client/issues/312