shamblett / mqtt_client

A server and browser based MQTT client for dart
Other
552 stars 179 forks source link

not getting subscription message back #121

Closed lamikam100 closed 5 years ago

lamikam100 commented 5 years ago

I am trying to re-purpose the code from this project - https://github.com/ricardoogliari/flutter_mqtt

I can connect to the broker and subscribe, but never get any notifications. Here is a debug listing:

D/ViewRootImpl@7db01c0[MainActivity](16674): ViewPostIme pointer 0
D/ViewRootImpl@7db01c0[MainActivity](16674): ViewPostIme pointer 1
I/flutter (16674): [MQTT client] MQTT client connecting....
I/flutter (16674): 2019-09-29 15:17:15.190418 -- Authenticating with username '{xxxxxxx}' and password '{xxxxxxx}'
I/flutter (16674): 2019-09-29 15:17:15.224548 -- SynchronousMqttConnectionHandler::internalConnect entered
I/flutter (16674): 2019-09-29 15:17:15.224801 -- SynchronousMqttConnectionHandler::internalConnect - initiating connection try 0
I/flutter (16674): 2019-09-29 15:17:15.225187 -- SynchronousMqttConnectionHandler::internalConnect - insecure TCP selected
I/flutter (16674): 2019-09-29 15:17:15.362495 -- MqttConnection::_startListening
I/flutter (16674): 2019-09-29 15:17:15.379131 -- SynchronousMqttConnectionHandler::internalConnect sending connect message
I/flutter (16674): 2019-09-29 15:17:15.384025 -- MqttConnectionHandler::sendMessage - MQTTMessage of type MqttMessageType.connect
I/flutter (16674): Header: MessageType = MqttMessageType.connect, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 0
I/flutter (16674): Connect Variable Header: ProtocolName=MQIsdp, ProtocolVersion=3, ConnectFlags=Connect Flags: Reserved1=false, CleanStart=false, WillFlag=false, WillQos=MqttQos.atLeastOnce, WillRetain=false, PasswordFlag=true, UserNameFlag=true, KeepAlive=30
I/flutter (16674): Instance of 'MqttConnectPayload'
I/flutter (16674): 2019-09-29 15:17:15.411930 -- SynchronousMqttConnectionHandler::internalConnect - pre sleep, state = Connection status is connecting with return code notAuthorized
I/flutter (16674): 2019-09-29 15:17:15.457292 -- MqttConnection::_onData
I/flutter (16674): 2019-09-29 15:17:15.469568 -- MqttConnection::_onData - message received MQTTMessage of type MqttMessageType.connectAck
I/flutter (16674): Header: MessageType = MqttMessageType.connectAck, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 2
I/flutter (16674): Connect Variable Header: TopicNameCompressionResponse={0}, ReturnCode={MqttConnectReturnCode.connectionAccepted}
I/flutter (16674): 2019-09-29 15:17:15.474494 -- MqttConnection::_onData - message processed
I/flutter (16674): 2019-09-29 15:17:15.480429 -- SynchronousMqttConnectionHandler::_connectAckProcessor
I/flutter (16674): 2019-09-29 15:17:15.480785 -- SynchronousMqttConnectionHandler::_connectAckProcessor - state = connected
I/flutter (16674): 2019-09-29 15:17:15.481166 -- SynchronousMqttConnectionHandler:: cancelling connect timer
I/flutter (16674): 2019-09-29 15:17:15.482083 -- SynchronousMqttConnectionHandler::internalConnect - post sleep, state = Connection status is connected with return code connectionAccepted
I/flutter (16674): 2019-09-29 15:17:15.482364 -- SynchronousMqttConnectionHandler::internalConnect exited with state Connection status is connected with return code connectionAccepted
I/flutter (16674): [MQTT client] connected
I/flutter (16674): [MQTT client] Subscribing to lamikam/garagedoor
I/flutter (16674): 2019-09-29 15:17:15.497346 -- MqttConnectionHandler::sendMessage - MQTTMessage of type MqttMessageType.subscribe
I/flutter (16674): Header: MessageType = MqttMessageType.subscribe, Duplicate = false, Retain = false, Qos = MqttQos.atLeastOnce, Size = 0
I/flutter (16674): Subscribe Variable Header: MessageIdentifier={1}
I/flutter (16674): Payload: Subscription [{1}]
I/flutter (16674): {{ Topic={lamikam/garagedoor}, Qos={MqttQos.exactlyOnce} }}
I/flutter (16674): 
I/flutter (16674): 2019-09-29 15:17:15.535422 -- MqttConnection::_onData
I/flutter (16674): 2019-09-29 15:17:15.540044 -- MqttConnection::_onData - message received MQTTMessage of type MqttMessageType.subscribeAck
I/flutter (16674): Header: MessageType = MqttMessageType.subscribeAck, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 3
I/flutter (16674): SubscribeAck Variable Header: MessageIdentifier={1}
I/flutter (16674): Payload: Qos grants [{1}]
I/flutter (16674): {{ Grant={MqttQos.exactlyOnce} }}
I/flutter (16674): 
I/flutter (16674): 2019-09-29 15:17:15.540429 -- MqttConnection::_onData - message processed
Here is the code.  My _onMessage() handler never gets called.  Any suggestions?

import 'dart:async';
import 'package:flutter/material.dart';
import 'package:flutter_mqtt/thermometer_widget.dart';
import 'package:mqtt_client/mqtt_client.dart' as mqtt;
import 'package:mqtt_client/mqtt_client.dart';

String broker           = 'soldier.cloudmqtt.com';
int port                = 14870;
String username = 'ssss';
String password = 'wwwwwww';
String clientIdentifier = 'android123232';
String topic = "lamikam/garagedoor";

  mqtt.MqttClient client;
  mqtt.MqttConnectionState connectionState;

  StreamSubscription subscription;

  void _subscribeToTopic(String topic) {
    if (connectionState == mqtt.MqttConnectionState.connected) {
        print('[MQTT client] Subscribing to ${topic.trim()}');
        client.subscribe(topic, mqtt.MqttQos.exactlyOnce);
    }
  }

  void _connect() async {
      client = mqtt.MqttClient.withPort(broker, '',port);
      client.logging(on: true);
      client.onDisconnected = _onDisconnected;
    final mqtt.MqttConnectMessage connMess = mqtt.MqttConnectMessage()
        .withClientIdentifier("Mqtt_MyClientUniqueId")
        .keepAliveFor(30)
        .withWillQos(mqtt.MqttQos.atLeastOnce)
    .authenticateAs(username, password)
    .withProtocolName("MQIsdp")
    .withProtocolVersion(3);

    print('[MQTT client] MQTT client connecting....');
    client.connectionMessage = connMess;
    try {
      await client.connect("kmoetsxm", "4O3HFeeN84yX");
    } catch (e) {
      print(e);
      _disconnect();
    }

    /// Check if we are connected
    if (client.connectionState == mqtt.MqttConnectionState.connected) {
      print('[MQTT client] connected');
      setState(() {
        connectionState = client.connectionState;
      });
    } else {
      print('[MQTT client] ERROR: MQTT client connection failed - '
          'disconnecting, state is ${client.connectionState}');
      _disconnect();
    }
.
    subscription = client.updates.listen(_onMessage);
    _subscribeToTopic(topic);
    client.updates.listen((List<MqttReceivedMessage<MqttMessage>> c) {
      final MqttPublishMessage recMess = c[0].payload;
      final String pt =
      MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
      print(
          'EXAMPLE::Change notification:: topic is <${c[0].topic}>, payload is <-- $pt -->');
      print('');

    });

  }

  void _disconnect() {
    print('[MQTT client] _disconnect()');
    client.disconnect();
    _onDisconnected();
  }

  void _onDisconnected() {
    print('[MQTT client] _onDisconnected');
    setState(() {
      //topics.clear();
      connectionState = client.connectionState;
      client = null;
  //    subscription.cancel();
      subscription = null;
    });
    print('[MQTT client] MQTT client disconnected');
  }

  void _onMessage(List<mqtt.MqttReceivedMessage> event) {
    print(event.length);
    final mqtt.MqttPublishMessage recMess =
    event[0].payload as mqtt.MqttPublishMessage;
    final String message =
    mqtt.MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
    print('[MQTT client] MQTT message: topic is <${event[0].topic}>, '
        'payload is <-- ${message} -->');
    print(client.connectionState);
    print("[MQTT client] message with topic: ${event[0].topic}");
    print("[MQTT client] message with message: ${message}");
  }
}
shamblett commented 5 years ago

You are subscribing and getting a sub ack back, however you seem to have two listeners, subscription = client.updates.listen(_onMessage); and client.updates.listen((List<MqttReceivedMessage<MqttMessage>> c) {....... also your subscription QOS is exactlyOnce, try changing this. Its also possible the broker has nothing to send to you, check your broker logs.