shamblett / mqtt_client

A server and browser based MQTT client for dart
Other
552 stars 179 forks source link

MQTTNet Local Server + Flutter mqtt_client #63

Closed jetompki closed 5 years ago

jetompki commented 5 years ago

I have a MQTTNet server on my local host at http://localhost:65061 and a separate Android Device (not emulator) connected via USB.

Using a sample MQTTNet client console app in C#, I can successfully connect to it using the following:

var options = new MqttClientOptionsBuilder()
     .WithClientId("Client2")
     .WithWebSocketServer("localhost:65061/mqtt")
     .Build();

Server side:
Request starting HTTP/1.1 GET http://localhost:65061/mqtt  
Client Connected, id: Client2

I'm using the sample Flutter mqtt_client from

dependencies:
 mqtt_client: ^5.2.0

I attempt to connect like this:

final MqttClient client = MqttClient('ws://localhost/mqtt', '');
Future<int> _connect() async {
  /// A websocket URL must start with ws:// or wss:// or Dart will throw an exception, consult your websocket MQTT broker
  /// for details.
  /// To use websockets add the following lines -:
  /// client.useWebSocket = true;
  /// client.port = 80;  ( or whatever your WS port is)
  /// Note do not set the secure flag if you are using wss, the secure flags is for TCP sockets only.

  /// Set logging on if needed, defaults to off
  client.logging(on: false);

  /// If you intend to use a keep alive value in your connect message that is not the default(60s)
  /// you must set it here
  client.keepAlivePeriod = 20;

 client.useWebSocket = true;
    client.port=65061;
  /// Add the unsolicited disconnection callback
  client.onDisconnected = onDisconnected;

  /// Add the successful connection callback
  client.onConnected = onConnected;

  /// Add a subscribed callback, there is also an unsubscribed callback if you need it.
  /// You can add these before connection or change them dynamically after connection if
  /// you wish. There is also an onSubscribeFail callback for failed subscriptions, these
  /// can fail either because you have tried to subscribe to an invalid topic or the broker
  /// rejects the subscribe request.
  client.onSubscribed = onSubscribed;

  /// Create a connection message to use or use the default one. The default one sets the
  /// client identifier, any supplied username/password, the default keepalive interval(60s)
  /// and clean session, an example of a specific one below.
  final MqttConnectMessage connMess = MqttConnectMessage()
      .withClientIdentifier('MqttMobile')
      .keepAliveFor(20) // Must agree with the keep alive set above or not set
      .withWillTopic('willtopic') // If you set this you must set a will message
      .withWillMessage('My Will message')
      .startClean() // Non persistent session for testing
      .withWillQos(MqttQos.atLeastOnce);
  print('EXAMPLE::client connecting....');
  client.connectionMessage = connMess;

  /// Connect the client, any errors here are communicated by raising of the appropriate exception. Note
  /// in some circumstances the broker will just disconnect us, see the spec about this, we however eill
  /// never send malformed messages.
  try {
    await client.connect();
  } on Exception catch (e) {
    print('EXAMPLE::client exception - $e');
    client.disconnect();
  }

  /// Check we are connected
  if (client.connectionStatus.state == MqttConnectionState.connected) {
    print('EXAMPLE::Mosquitto client connected');
  } else {
    /// Use status here rather than state if you also want the broker return code.
    print(
        'EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, status is ${client.connectionStatus}');
    client.disconnect();
    exit(-1);
  }

  /// Ok, lets try a subscription
  const String topic = 'test/lo'; // Not a wildcard topic
  client.subscribe(topic, MqttQos.atMostOnce);

  /// The client has a change notifier object(see the Observable class) which we then listen to to get
  /// notifications of published updates to each subscribed topic.
  client.updates.listen((List<MqttReceivedMessage<MqttMessage>> c) {
    final MqttPublishMessage recMess = c[0].payload;
    final String pt =
        MqttPublishPayload.bytesToStringAsString(recMess.payload.message);

    /// The above may seem a little convoluted for users only interested in the
    /// payload, some users however may be interested in the received publish message,
    /// lets not constrain ourselves yet until the package has been in the wild
    /// for a while.
    /// The payload is a byte buffer, this will be specific to the topic
    print(
        'EXAMPLE::Change notification:: topic is <${c[0].topic}>, payload is <-- $pt -->');
    print('');
  });

  /// Lets publish to our topic
  // Use the payload builder rather than a raw buffer
  print('EXAMPLE::Publishing our topic');

  /// Our known topic to publish to
  const String pubTopic = 'Dart/Mqtt_client/testtopic';
  final MqttClientPayloadBuilder builder = MqttClientPayloadBuilder();
  builder.addString('Hello from mqtt_client');

  /// Subscribe to it
  client.subscribe(pubTopic, MqttQos.exactlyOnce);

  /// Publish it
  client.publishMessage(pubTopic, MqttQos.exactlyOnce, builder.payload);

  /// Ok, we will now sleep a while, in this gap you will see ping request/response
  /// messages being exchanged by the keep alive mechanism.
  print('EXAMPLE::Sleeping....');
  await MqttUtilities.asyncSleep(120);

  /// Finally, unsubscribe and exit gracefully
  print('EXAMPLE::Unsubscribing');
  client.unsubscribe(topic);

  /// Wait for the unsubscribe message from the broker if you wish.
  await MqttUtilities.asyncSleep(2);
  print('EXAMPLE::Disconnecting');
  client.disconnect();
  return 0;
}

I can see the get request reaching my MQTTNet server, however it errors out: Notice System.ArgumentException: The WebSocket protocol 'mqtt, mqttv3.1, mqttv3.11' is invalid because it contains the invalid character ','. may be relevant.

MQTTWebServer> info: Microsoft.AspNetCore.Hosting.Internal.WebHost[1]
MQTTWebServer>       Request starting HTTP/1.1 GET http://localhost:65061/mqtt  0
MQTTWebServer> fail: Microsoft.AspNetCore.Diagnostics.DeveloperExceptionPageMiddleware[1]
MQTTWebServer>       An unhandled exception has occurred while executing the request.
MQTTWebServer> System.ArgumentException: The WebSocket protocol 'mqtt, mqttv3.1, mqttv3.11' is invalid because it contains the invalid character ','.
MQTTWebServer> Parameter name: subProtocol
MQTTWebServer>    at System.Net.WebSockets.WebSocketValidate.ValidateSubprotocol(String subProtocol)
MQTTWebServer>    at System.Net.WebSockets.WebSocketProtocol.CreateFromStream(Stream stream, Boolean isServer, String subProtocol, TimeSpan keepAliveInterval)
MQTTWebServer>    at Microsoft.AspNetCore.WebSockets.WebSocketMiddleware.UpgradeHandshake.AcceptAsync(WebSocketAcceptContext acceptContext)
MQTTWebServer>    at MQTTnet.AspNetCore.ApplicationBuilderExtensions.<>c__DisplayClass0_0.<<UseMqttEndpoint>b__0>d.MoveNext()
MQTTWebServer> --- End of stack trace from previous location where exception was thrown ---
MQTTWebServer>    at Microsoft.AspNetCore.Diagnostics.DeveloperExceptionPageMiddleware.Invoke(HttpContext context)
MQTTWebServer> warn: Microsoft.AspNetCore.Diagnostics.DeveloperExceptionPageMiddleware[2]
MQTTWebServer>       The response has already started, the error page middleware will not be executed.
MQTTWebServer> fail: Microsoft.AspNetCore.Server.Kestrel[13]
MQTTWebServer>       Connection id "0HLJF966QUU4R", Request id "0HLJF966QUU4R:00000001": An unhandled exception was thrown by the application.
MQTTWebServer> System.ArgumentException: The WebSocket protocol 'mqtt, mqttv3.1, mqttv3.11' is invalid because it contains the invalid character ','.
MQTTWebServer> Parameter name: subProtocol
MQTTWebServer>    at System.Net.WebSockets.WebSocketValidate.ValidateSubprotocol(String subProtocol)
MQTTWebServer>    at System.Net.WebSockets.WebSocketProtocol.CreateFromStream(Stream stream, Boolean isServer, String subProtocol, TimeSpan keepAliveInterval)
MQTTWebServer>    at Microsoft.AspNetCore.WebSockets.WebSocketMiddleware.UpgradeHandshake.AcceptAsync(WebSocketAcceptContext acceptContext)
MQTTWebServer>    at MQTTnet.AspNetCore.ApplicationBuilderExtensions.<>c__DisplayClass0_0.<<UseMqttEndpoint>b__0>d.MoveNext()
MQTTWebServer> --- End of stack trace from previous location where exception was thrown ---
MQTTWebServer>    at Microsoft.AspNetCore.Diagnostics.DeveloperExceptionPageMiddleware.Invoke(HttpContext context)
MQTTWebServer>    at Microsoft.AspNetCore.Diagnostics.DeveloperExceptionPageMiddleware.Invoke(HttpContext context)
MQTTWebServer>    at Microsoft.AspNetCore.Server.IISIntegration.IISMiddleware.Invoke(HttpContext httpContext)
MQTTWebServer>    at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpProtocol.ProcessRequests[TContext](IHttpApplication`1 application)
MQTTWebServer> info: Microsoft.AspNetCore.Hosting.Internal.WebHost[2]
MQTTWebServer>       Request finished in 7.655ms 101 

I then see in my flutter app an error which states connection not upgraded to websocket (presumably because of the error on the server side?):

I/flutter (28776): EXAMPLE:: client connecting....
I/flutter (28776): EXAMPLE::client exception - WebSocketException: Connection to 'http://localhost:65061/mqtt#' was not upgraded to websocket
I/flutter (28776): EXAMPLE::OnDisconnected client callback - Client disconnection
I/flutter (28776): EXAMPLE::OnDisconnected callback is solicited, this is correct

Any ideas?

edit Also note that I can seemingly do all the requirements I can think of from my android device.

If I use chrome browser on my Android device and enter localhost:65061 I access the home page of my server. If I implement an api on my server, I can retrieve the values that the api provides via a HttpGet using the flutter http package.

It just seems that this MqttClient doesn't work.

shamblett commented 5 years ago

Ok, it looks as though this line in the code is causing your problems

'final List protocols = ['mqtt', 'mqttv3.1', 'mqttv3.11'];'

We only do this on web socket connections. from the Dart API docs

'The protocols argument is specifying the subprotocols the client is willing to speak.'

This part of websocket spec refers, this document section 5.2.1 point 6 states

'Optionally, a "Sec-WebSocket-Protocol" header, with a list of values indicating which protocols the client would like to speak, ordered by preference.'

This does say list of values, but I can't find any info on what this list actually is, i.e. what the separator is.

Your the first to report this, the client is being used in a number of websocket and secure websocket projects with different MQTT brokers, e.g Mosquito, Paho, cloud IOT solutions from IBM, Google, Amazon, Azure etc so I'm loath to change this, however I can add a flag to disable the list setting and just set it to 'mqtt' or maybe allow the user to specify the subprotocol maybe if you think this would help you.

jetompki commented 5 years ago

I've tested it by using the project locally and altering the

final List protocols = ['mqtt', 'mqttv3.1', 'mqttv3.11'];

to

final List protocols = ['mqtt'];

And I can successfully connect to my MQTTNet broker.

I believe that the latter you suggested -- allowing the user to specify the subprotocol(s) would be more flexible. You could continue to keep the default implementation (which I assume is 'final List protocols = ['mqtt', 'mqttv3.1', 'mqttv3.11'];') then allow an override with a custom list of protocols which you can specify when building the MQTTClient. Something like:

MQTTClient client = new MQTTClient(...);
List protocols = ['mqtt'];
client.useWebSocketSubProtocols(protocols); 

I appreciate the help, and please update the project at your discretion.

shamblett commented 5 years ago

Setter websocketProtocols added on to the client to set default protocols, add your own or swith this off entirely, see the API docs for more details, client re-published at 5.3.0, please retest.

uwejan commented 5 years ago

I have an issue, dsoe not say i have to specify protocol, but wanted to give it a try, i get the following error;

Error: The method 'websocketProtocols' isn't defined for the class 'mqtt_client::MqttClient'.
Try correcting the name to the name of an existing method, or defining a method named 'websocketProtocols'.
  client.websocketProtocols(protocols);
         ^^^^^^^^^^^^^^^^^^
shamblett commented 5 years ago

Check you are using at least version 5.3.0, note the method isn't static, you need to call it on an instantiated client,

final MqttClient client = MqttClient('test.mosquitto.org', ''); client.websocketProtocols(MqttWsConnection.protocolsSingleDefault)

for example

uwejan commented 5 years ago

Hey, i still have the same issue, just created new dart project, from the exmples dir, final MqttClient client = MqttClient('test.mosquitto.org', ''); client.websocketProtocols(MqttWsConnection.protocolsSingleDefault), i notice you did not have ; why is that? v. 5.4.0 note: am using dart 2.1.0 can i ask for websockets example in the examples dir?

shamblett commented 5 years ago

Sorry my bad, the websocketProtocols API is a setter, not a method, please do this client.websocketProtocols = MqttWsConnection.protocolsSingleDefault; etc.

jetompki commented 5 years ago

Retested and is working as expected, thanks.