shamblett / mqtt_client

A server and browser based MQTT client for dart
Other
548 stars 176 forks source link

Get AWS shadow state #234

Closed gipobbe closed 3 years ago

gipobbe commented 3 years ago

Hi Everyone, first of all let me say that I've just recently started working with Flutter development. These few days I was trying to write an app able to set or get a things shadow's state in AWS. So far I was able to send data to AWS Iot, but my problem is that I can't get the shadow state without changing it externally. From AWS docs, to get the state of a thing it's necessary to post an empty payload in the topic:

$aws/things/thingName/shadow/name/shadowName/get

while listening the topic:

$aws/things/thingName/shadow/name/shadowName/get/accepted

Is there a way to do it with mqtt_client? I've tried to subscribe to the above topics at the same time, but I can't receive the shadow state. I've also (naively?) tried to open 2 client, one for each topic but it doesn't seem to work (I got some errors). This is my code, which is 99% from the examples directory. My last attempt was to subscribe to 'get/accepted', and post to '/get' without subscription. Anyone has any ideas? Thank

` import 'package:flutter/material.dart'; import 'dart:async'; import 'dart:io'; import 'package:path/path.dart' as path; import 'package:mqtt_client/mqtt_server_client.dart'; import 'package:mqtt_client/mqtt_client.dart'; import 'package:typed_data/typed_data.dart' as typed; import 'dart:async' show Future; import 'package:flutter/services.dart' show rootBundle;

/// An example of connecting to the google iot-core MQTT bridge server and publishing to a devices topic. /// Full setup instructions can be found here https://cloud.google.com/iot/docs/how-tos/mqtt-bridge, please read this /// before setting up and running this example.

Future main() async { final WidgetsFlutterBinding binding = WidgetsFlutterBinding.ensureInitialized(); var cert = await rootBundle.load('assets/c1.pem');

// Create and connect the client const url = '****-ats.iot.us-west-2.amazonaws.com'; // The google iot-core MQTT bridge server const port = 8883; // You can also use 8883 if you so wish

// The client id is a path to your device, example given below, note this contravenes the 23 character client id length // from the MQTT specification, the mqtt_client allows this, if exceeded and logging is turned on a warning is given.

const clientId = 'GTest'; // User name is not used and can be set to anything, it is needed because the password field contains the encoded JWT token for the device // Password contains the encoded JWT token, example below, the JWT token when generated should be encoded with the private key coresponding // to the public key you have set for your device. // Create the client

final client = MqttServerClient(url, clientId);

// Set the port client.port = port;

// Set secure client.secure = true;

// Set the security context as you need, note this is the standard Dart SecurityContext class. // If this is incorrect the TLS handshake will abort and a Handshake exception will be raised, // no connect ack message will be received and the broker will disconnect. final context = SecurityContext.defaultContext; context.setTrustedCertificatesBytes(cert.buffer.asUint8List());

var c2 = await rootBundle.load('assets/c1.pem'); context.useCertificateChainBytes(c2.buffer.asUint8List());

var c3 = await rootBundle.load('assets/k1.pem'); context.usePrivateKeyBytes(c3.buffer.asUint8List());

// If needed set the private key file path and the optional passphrase and any other supported security features // Note that for flutter users the parameters above can be set in byte format rather than file paths. client.securityContext = context;

// Set the protocol to V3.1.1 for iot-core, if you fail to do this you will receive a connect ack with the response code // 0x01 Connection Refused, unacceptable protocol version client.setProtocolV311(); // logging if you wish //client.logging(on: true); // OK, connect, if your encoded JWT token in the password field cannot be decoded by the corresponding public key attached // to the device or the JWT token is incorrect a connect ack message will be received with a return code of // 0x05 Connection Refused, not authorized. If the password field is not set at all the return code may be // 0x04 Connection Refused, bad user name or password await client.connect();

if (client.connectionStatus.state == MqttConnectionState.connected ) { print('iotcore client connected'); } else { print( 'ERROR iotcore client connection failed - disconnecting, state is ${client.connectionStatus.state}'); client.disconnect(); }

// Troubleshooting tips can be found here https://cloud.google.com/iot/docs/troubleshooting // Publish to the topic you have associated with your device const topic = '\$aws/things/MQTTest/shadow/get'; const topic1 = '\$aws/things/MQTTest/shadow/get/accepted';

client.subscribe(topic1, MqttQos.atLeastOnce);

final builder = MqttClientPayloadBuilder(); builder.addString(''); client.publishMessage(topic, MqttQos.atLeastOnce, builder.payload); client.updates.listen((List c) {

final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage;
final String pt = MqttPublishPayload.bytesToStringAsString(recMess.payload.message);

/// The above may seem a little convoluted for users only interested in the
/// payload, some users however may be interested in the received publish message,
/// lets not constrain ourselves yet until the package has been in the wild
/// for a while.
/// The payload is a byte buffer, this will be specific to the topic
print(
    "EXAMPLE::Change notification:: topic is <${c[0]
        .topic}>, payload is <-- ${pt} -->");
print("");

});

print('Sleeping....'); / await MqttUtilities.asyncSleep(30); print('Disconnecting'); client.disconnect(); / return 0; } ` Update 1: I noticed that if I do a Pub inside the Listener nothing change, but if I do firstly a PUB from AWS Iot console then it enters in an infinite Loop inside the listen method (which is something I expected somehow).

shamblett commented 3 years ago

Turn on logging and post the output of your test run, I don't know anything about AWS shadow states but I can at least tell you if what the client is doing.

gipobbe commented 3 years ago

Shadow state is simply a Json file containing some variables. Your great mqtt_client let me update the shadow (so, a PUB), let me subscribe to a topic (SUB). When I'm subscribed to a topic and I'm on the Listen method, I correctly receive every update when the Shadows changes. I think, to keep it simple, that I need to do a PUB to a topic while I'm listening to another one. Is it possible? My normal Log: no infinite loop fired.

Update1: it seems that it's not necessary to subscribe to 2 topic. One is enough. The problem is that I need to listen to that topic right after doing a pub. Or do a pub while listening to a topic. Neither worked.

Performing hot restart... Syncing files to device EVA L09... Restarted application in 1.608ms. I/flutter (11406): 2020-11-10 16:12:04.130370 -- SynchronousMqttServerConnectionHandler::internalConnect entered I/flutter (11406): 2020-11-10 16:12:04.137235 -- SynchronousMqttServerConnectionHandler::internalConnect - initiating connection try 0 I/flutter (11406): 2020-11-10 16:12:04.137558 -- SynchronousMqttServerConnectionHandler::internalConnect - secure selected I/flutter (11406): 2020-11-10 16:12:04.139750 -- MqttSecureConnection::connect I/flutter (11406): 2020-11-10 16:12:05.587232 -- MqttSecureConnection::connect - securing socket I/flutter (11406): 2020-11-10 16:12:05.592730 -- MqttSecureConnection::connect - start listening I/flutter (11406): 2020-11-10 16:12:05.595196 -- MqttServerConnection::_startListening I/flutter (11406): 2020-11-10 16:12:05.609170 -- SynchronousMqttServerConnectionHandler::internalConnect - connection complete I/flutter (11406): 2020-11-10 16:12:05.609960 -- SynchronousMqttServerConnectionHandler::internalConnect sending connect message I/flutter (11406): 2020-11-10 16:12:05.620573 -- MqttConnectionHandlerBase::sendMessage - MQTTMessage of type MqttMessageType.connect I/flutter (11406): Header: MessageType = MqttMessageType.connect, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 0 I/flutter (11406): Connect Variable Header: ProtocolName=MQTT, ProtocolVersion=4, ConnectFlags=Connect Flags: Reserved1=false, CleanStart=true, WillFlag=false, WillQos=MqttQos.atMostOnce, WillRetain=false, PasswordFlag=false, UserNameFlag=false, KeepAlive=60 I/flutter (11406): MqttConnectPayload - client identifier is : GTest I/flutter (11406): 2020-11-10 16:12:05.662932 -- SynchronousMqttServerConnectionHandler::internalConnect - pre sleep, state = Connection status is connecting with return code of noneSpecified and a disconnection origin of none I/flutter (11406): 2020-11-10 16:12:06.166832 -- MqttConnection::_onData I/flutter (11406): 2020-11-10 16:12:06.193413 -- MqttServerConnection::_onData - message received MQTTMessage of type MqttMessageType.connectAck I/flutter (11406): Header: MessageType = MqttMessageType.connectAck, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 2 I/flutter (11406): Connect Variable Header: TopicNameCompressionResponse={0}, ReturnCode={MqttConnectReturnCode.connectionAccepted} I/flutter (11406): 2020-11-10 16:12:06.203070 -- MqttServerConnection::_onData - message processed I/flutter (11406): 2020-11-10 16:12:06.214739 -- SynchronousMqttServerConnectionHandler::_connectAckProcessor I/flutter (11406): 2020-11-10 16:12:06.215919 -- SynchronousMqttServerConnectionHandler::_connectAckProcessor - state = connected I/flutter (11406): 2020-11-10 16:12:06.217330 -- SynchronousMqttServerConnectionHandler:: cancelling connect timer I/flutter (11406): 2020-11-10 16:12:06.219986 -- SynchronousMqttServerConnectionHandler::internalConnect - post sleep, state = Connection status is connected with return code of connectionAccepted and a disconnection origin of none I/flutter (11406): 2020-11-10 16:12:06.220633 -- SynchronousMqttServerConnectionHandler::internalConnect exited with state Connection status is connected with return code of connectionAccepted and a disconnection origin of none I/flutter (11406): iotcore client connected I/flutter (11406): 2020-11-10 16:12:06.253172 -- MqttConnectionHandlerBase::sendMessage - MQTTMessage of type MqttMessageType.subscribe I/flutter (11406): Header: MessageType = MqttMessageType.subscribe, Duplicate = false, Retain = false, Qos = MqttQos.atLeastOnce, Size = 0 I/flutter (11406): Subscribe Variable Header: MessageIdentifier={1} I/flutter (11406): Payload: Subscription [{1}] I/flutter (11406): {{ Topic={$aws/things/MQTTest/shadow/get/accepted}, Qos={MqttQos.atLeastOnce} }} I/flutter (11406): I/flutter (11406): 2020-11-10 16:12:06.267143 -- MqttConnectionHandlerBase::sendMessage - MQTTMessage of type MqttMessageType.publish I/flutter (11406): Header: MessageType = MqttMessageType.publish, Duplicate = false, Retain = false, Qos = MqttQos.atLeastOnce, Size = 0 I/flutter (11406): Publish Variable Header: TopicName={$aws/things/MQTTest/shadow/get}, MessageIdentifier={2}, VH Length={0} I/flutter (11406): Payload: {3 bytes={<123><32><125> I/flutter (11406): Sleeping.... I/flutter (11406): 2020-11-10 16:12:06.643727 -- MqttConnection::_onData I/flutter (11406): 2020-11-10 16:12:06.650615 -- MqttServerConnection::_onData - message received MQTTMessage of type MqttMessageType.publishAck I/flutter (11406): Header: MessageType = MqttMessageType.publishAck, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 2 I/flutter (11406): PublishAck Variable Header: MessageIdentifier={2} I/flutter (11406): 2020-11-10 16:12:06.652329 -- MqttServerConnection::_onData - message processed I/flutter (11406): 2020-11-10 16:12:06.667493 -- MqttConnection::_onData I/flutter (11406): 2020-11-10 16:12:06.677063 -- MqttServerConnection::_onData - message received MQTTMessage of type MqttMessageType.subscribeAck I/flutter (11406): Header: MessageType = MqttMessageType.subscribeAck, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 3 I/flutter (11406): SubscribeAck Variable Header: MessageIdentifier={1} I/flutter (11406): Payload: Qos grants [{1}] I/flutter (11406): {{ Grant={MqttQos.atLeastOnce} }} I/flutter (11406): I/flutter (11406): 2020-11-10 16:12:06.677923 -- MqttServerConnection::_onData - message processed I/flutter (11406): Disconnecting I/flutter (11406): 2020-11-10 16:12:36.304892 -- SynchronousMqttServerConnectionHandler::disconnect I/flutter (11406): 2020-11-10 16:12:36.309075 -- MqttConnectionHandlerBase::sendMessage - MQTTMessage of type MqttMessageType.disconnect I/flutter (11406): Header: MessageType = MqttMessageType.disconnect, Duplicate = false, Retain = false, Qos = MqttQos.atMostOnce, Size = 0 I/flutter (11406): 2020-11-10 16:12:36.805989 -- MqttConnectionBase::_onDone - calling disconnected callback

shamblett commented 3 years ago

The log looks OK, the client.updates.listener will listen for publish messages from the broker for all subscribed topics, that's why in its body you need to check for the topics you want, if you need to do this. Outside of this you can publish to any topic you want a t any time whether you are subscribed to it or not.

I don't think you need to do this -

final builder = MqttClientPayloadBuilder();
builder.addString('');

just don't set the payload, I think where AWS says 'an empty message' it means a publish with an empty payload, if you look at the log you are sending this -

I/flutter (11406): Publish Variable Header: TopicName={$aws/things/MQTTest/shadow/get}, MessageIdentifier={2}, VH Length={0}
I/flutter (11406): Payload: {3 bytes={<123><32><125>

I don't think you need to do this, could be wrong of course.

gipobbe commented 3 years ago

I've checked again AWS docs. That's not the problem. The problem is the system that AWS use to retrieve a Shadow's state. To get the state you need to trigger AWS with an empty/dummy (it can be empty but it's not necessary) pub to the /get topic while listening to get/accepted topic. I think that this paradigm doesn't fit well with how MQTT should work. I'll dig more into it in the next few days but, right now, I'm not optimistic in finding a solution to this problem. Since this is a trivial test I was doing I'm not bounded to AWS.. So if I can't get out of it soon I'll try to switch to another cloud provider like Azure or Google to see if things are better. My last question to you, since you are the mastermind of this: is it possible to have 2 client running at the same time? One for sending the other for listening? Is it doable in your opinion? I've done some test but I'm not sure on what I got since I had little time today.

shamblett commented 3 years ago

As far as I know you should be able to run 2 clients, they are independent of each other although I've not tried this.

I still can't see why you can't do what you are trying to do, if I understand it correctly I see the sequence as being -

Initialise the client as normal but add an onSubscribed callback. Start listening on client updates. Subscribe to your topic and go into a wait loop when the onSubscribed callback is triggered your subscription has been registered with the broker, in this function send your publish message. Anything the broker sends will now be received in your listener.

gipobbe commented 3 years ago

Like I said in my first post, I'm not a good app developer and I don't know how to manage callbacks. By the way, finally I did it. I just did this (rest of the code is the same as above)

` const topic = '\$aws/things/MQTTest/shadow/get/accepted'; const topic1 = '\$aws/things/MQTTest/shadow/get';

client.subscribe(topic, MqttQos.atLeastOnce); final builder = MqttClientPayloadBuilder();

client.updates.listen((List c) { final MqttPublishMessage recMess = c[0].payload as MqttPublishMessage; final String pt = MqttPublishPayload.bytesToStringAsString(recMess.payload.message); /// The above may seem a little convoluted for users only interested in the /// payload, some users however may be interested in the received publish message, /// lets not constrain ourselves yet until the package has been in the wild /// for a while. /// The payload is a byte buffer, this will be specific to the topic print( "Esempio1: Subscribed to <-- $pt -->"); print(""); });

await MqttUtilities.asyncSleep(1); client.publishMessage(topic1, MqttQos.atLeastOnce, builder.payload); await MqttUtilities.asyncSleep(1);

print('Disconnecting'); client.disconnect();`

Finally I'm able to ask and receive the shadow's state. I'm not sure if it's a clean solution, but it let me do what I wanted to do. Now it's time to get me a proper and systematic introduction to flutter development. Sorry for the bothering, I'm really a noob. If you are interested I can clean the code a little and give it to you if you want to add it to the example folder.

Thank you for your support.

shamblett commented 3 years ago

I'm guessing that your first async sleep gives the broker and client enough time to process the subscription, then you send your publish which now works, its the same thing as using a callback but not as definite, anyway glad its working now, closing this.