shamblett / mqtt5_client

A server and browser based MQTT 5 client for dart
Other
49 stars 25 forks source link

Bad bytes error #59

Closed AgPeHaJIuH1 closed 6 months ago

AgPeHaJIuH1 commented 11 months ago

Hello, I encountered an error, please tell me how to get around it? Now I have a constant connect-disconnect going on in my application. Apparently due to the fact that garbage got into one of the topics, this message does not reach the user code, your library immediately calls disconnect. I used MQTTExplorer for windows, it connected successfully and there I saw that the message was some kind of garbage. I still don’t understand how it ended up there, but still, I was hoping that your library would give my code the ability to process it, and not automatically call disconnect. If I remove this message, your library will connect and work as expected. However, if garbage gets there once, it can get there again, completely disrupting the operation of my application. Please tell me how to get around this problem?

image

logs.txt

AgPeHaJIuH1 commented 11 months ago

perhaps you are using utf8.decode somewhere and you need to add the allowMalformed: true parameter?

shamblett commented 11 months ago

The root of this is that your incoming packet is malformed as far as the MQTT5 spec is concerned, see section 4.13.1 -

When a Client detects a Malformed Packet or Protocol Error, and a Reason Code is given in the specification, it SHOULD close the Network Connection. In the case of an error in a AUTH packet it MAY send a DISCONNECT packet containing the reason code, before closing the Network Connection. In the case of an error in any other packet it SHOULD send a DISCONNECT packet containing the reason code before closing the Network Connection. Use Reason Code 0x81 (Malformed Packet) or 0x82 (Protocol Error) unless a more specific Reason Code has been defined in section 3.14.2.1 [Disconnect Reason Code](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Disconnect_Reason_Code).

The package enforces this, see the file mqtt_utf8_encoding.dart for exactly what happens here, the package never uses the allowMalformed flag because of this. If you could supply a log of what happens when the error is raised I may be able to implement a work around, one way is to add a parameter to allow malformed packets for this topic, i.e. when you subscribe to a topic you can set the allowMalformed parameter so that strict UTF8 checking is disallowed only for this topic.

AgPeHaJIuH1 commented 11 months ago

It would be great if you could add such a parameter and let the user code handle this situation. Since at the moment, I cannot influence the situation in any way. The connection is constantly lost and the only way is to go to another client in MQTT, look for broken packets and manually delete them. The application simply does not connect and does not allow packets to be processed. Thank you

AgPeHaJIuH1 commented 11 months ago

I can’t yet determine who in my broker is sending garbage, but as a way out, I would simply clean up such messages directly in the code. Got garbage - deleted the topic. This was a temporary solution until the root of the problem was found.

AgPeHaJIuH1 commented 11 months ago
    ///     Gets or sets the payload format indicator.
    ///     The payload format indicator is part of any MQTT packet that can contain a payload. The indicator is an optional
    ///     byte value.
    ///     A value of 0 indicates an “unspecified byte stream”.
    ///     A value of 1 indicates a "UTF-8 encoded payload".
    ///     If no payload format indicator is provided, the default value is 0.
    ///     Hint: MQTT 5 feature only.

Now I came across this comment in the MQTT library for .NET Does this have anything to do with the current situation?

AgPeHaJIuH1 commented 11 months ago

After conducting a more thorough investigation in broker debug mode, I saw that your library puts garbage in the topic, namely byte 224. This breaks the system and the operation of your library. I reproduced it using a minimal example.

  1. I launch a broker.
  2. I publish a message in the topic.
  3. I connect with your library (I see how the broker gives you the correct message) (screen - 1 packet coming from the broker)
  4. Then I see that you are publishing in the same topic!!! message byte[224]. (screen - 2 package included in the broker)

I take it this is a mistake? Was this supposed to be a message to the broker about the successful acceptance of the package or something like that?

I am attaching logs from your library! Please, help! logs (2).txt

screen - 1 image

screen - 2 image

shamblett commented 11 months ago

The payload format indicator is a property on any message that contains a payload, the client does not decode properties it just passes them to the user, you are free to check this if you wish but this indicator set or not will not cause a disconnection.

Looking at your log, the error is being caused by reception of an invalid messge ;-

I/flutter (12226): 2023-09-12 12:59:41.328484 -- MqttServerConnection::_ondata - exception raised is mqtt-client::InvalidMessageException: The data provided in the message stream was not a valid MQTT Message, exception is mqtt-client::IncompleteMessageException: Available bytes is less than the message size, bytestream is [49, 47, 0, 41, 118, 97, 100, 105, 109, 95, 119, 105, 110, 100, 111, 119, 115, 47, 118, 49, 47, 115, 101, 114, 118, 101, 114, 47, 100, 101, 118, 105, 99, 101, 115, 47, 116, 101, 115, 116, 47, 116, 101, 115, 116, 2, 1, 1]
I/flutter (12226): 2023-09-12 12:59:41.328907 -- MqttServerConnection::_ondata - irrecoverable exception raised - sending disconnect mqtt-client::InvalidMessageException: The data provided in the message stream was not a valid MQTT Message, exception is mqtt-client::IncompleteMessageException: Available bytes is less than the message size, bytestream is [49, 47, 0, 41, 118, 97, 100, 105, 109, 95, 119, 105, 110, 100, 111, 119, 115, 47, 118, 49, 47, 115, 101, 114, 118, 101, 114, 47, 100, 101, 118, 105, 99, 101, 115, 47, 116, 101, 115, 116, 47, 116, 101, 115, 116, 2, 1, 1]

and

I/flutter (12226): 2023-09-12 12:59:41.331112 -- MqttServerConnection::_ondata - adding incoming data, data length is 1, message stream length is 50, message stream position is 50
I/flutter (12226): 2023-09-12 12:59:41.331260 -- MqttServerConnection::_ondata - added incoming data message stream length is 51, message stream position is 50
I/flutter (12226): 2023-09-12 12:59:41.331653 -- MqttByteBuffer:isMessageAvailable - assumed valid header, value is 53
I/flutter (12226): 2023-09-12 12:59:41.331841 -- MqttServerConnection::_ondata - irrecoverable exception raised - sending disconnect mqtt-client::InvalidHeaderException: The header being processed contained an invalid size byte pattern. Message size must take a most 4 bytes, and the last byte must have bit 8 set to 0.

You can see your 53 value in the log extract above.

This can't be worked round, I'll have a closer look to see if this indeed an invalid message just to rule out a client bug.

From your log the client then receives a few subscribe acks and sends some ping requests, I can't see any publish messages being sent. The client doesn't send publish messages by itself, the user has to do this.

AgPeHaJIuH1 commented 11 months ago

why is 53 bytes an error? If I encode the string "5" in UTF8, when encoding it into bytes, this is what is returned to me

image

shamblett commented 11 months ago

I'm not saying its an error in the context of UTF8, its in the context of the message, the error is a message length error, the log line says -

...
I/flutter (12226): 2023-09-12 12:59:41.331653 -- MqttByteBuffer:isMessageAvailable - assumed valid header, value is 53
...

this could just be coincidence of course.

In your comment above you say -

 I used MQTTExplorer for windows, it connected successfully and there I saw that the message was some kind of garbage. I still don’t understand how it ended up there.....

Ok so what broker are you using? Brokers shouldn't send malformed messages.

AgPeHaJIuH1 commented 11 months ago

image

AgPeHaJIuH1 commented 11 months ago

https://github.com/dotnet/MQTTnet

https://github.com/dotnet/MQTTnet/issues/1839

shamblett commented 11 months ago

Ok would it be possible for you to repeat your test using mosquito broker or a hivemq broker

AgPeHaJIuH1 commented 11 months ago

I'm not sure I can do it. Can you help me find the reason based on the logs? What exactly is wrong with the message that I would pass it on to the MQTTNet developer?

AgPeHaJIuH1 commented 11 months ago

An even more thorough investigation using online packet tracking software revealed that your library is randomly sending Publish!!! a packet with e0, which is byte 224, as I showed above in the screenshots, and she receives it back and breaks. It has not yet been possible to reproduce this with popular brokers, since they most likely swallow your message from 0e and ignore it

192.168.1.136 - flutter 192.168.1.2 - Broker

photo_2023-09-12_19-51-39

bandicam 2023-09-12 18-38-32-014.zip

shamblett commented 11 months ago

Why do you think this is random? It seems to be a normal publish to one of your subscribed topics.

I need to see a client log of that publish message being sent so I can see exactly what the source of the send it.

Spec compliant brokers shouldnt swallow anything, if they receive a malformed packet they should close the connection.

AgPeHaJIuH1 commented 11 months ago

We put together a simple application and published "5" to the demo/v1 topic several times, at some point (randomly, maybe 5 times, maybe 10 times) your library sends byte 224 PublishMessage and breaks itself. Note, this is only reproduced if it is ENABLED from the broker. That is, if receiving your own messages back is ENABLED. As far as I understand from the 224 protocol, this is a disconnect byte, but why your library sends it to PublishMessage remains a mystery.

AgPeHaJIuH1 commented 11 months ago

logs.txt

shamblett commented 11 months ago

Ok Ill have a look at the log but I must admit you are losing me a bit now. Why would a clent want to recieve a publish message back?

Please identify where in the MQTT5 spec it explains this broker behavior and why its needed Im unfamiliar with it.

AgPeHaJIuH1 commented 11 months ago

There is a special parameter for this, NoLocal. In some cases, it is necessary to receive your own messages that were sent by me. And that's okay. But this behavior can be disabled so as not to receive your messages back

image

AgPeHaJIuH1 commented 11 months ago

Before mqtt 5 it was normal practice to echo your own messages back, wasn't it? If I'm a subscriber to the demo/v1 topic and I post a message to it, why shouldn't I receive it? As far as I understand, the NoLocal parameter when subscribing should change this behavior and NOT send me my own messages

shamblett commented 11 months ago

OK from the MQTT5 spec -

Bit 2 of the Subscription Options represents the No Local option. If the value is 1, Application Messages MUST NOT be forwarded to a connection with a ClientID equal to the ClientID of the publishing connection.

So this seems to be a subscription option.

There is no equivalent in the MQTT3 spec from what I can see.

I think I understand you now, if subscribe to the topic you are publishing to then yes you will receive the message on that topic like any other subscriber, I thought you meant the publish message was somehow just being reflected back to you even if you weren't subscribed.

So if you set no local on subscription the broker should not send that publish message back to you, ,so the question is are you setting this when you subscribe? Presumably not so you are getting the message on your subscription.

The mqtt5_server_client.dart example in the example directory of the repo does exactly this and works fine, there is no re-publishing when the publish message is received for the topic.

From the log we publish the message with a message identifier of 67 -

I/flutter ( 9263): 2023-09-12 20:50:58.426199 -- MqttPublishingManager::publishUserMessage - entered
I/flutter ( 9263): 2023-09-12 20:50:58.426635 -- MqttConnectionHandlerBase::sendMessage - sending message started >>> -> MQTTMessage of type MqttMessageType.publish
I/flutter ( 9263): MessageType = MqttMessageType.publish Duplicate = false Retain = true Qos = atMostOnce Size = 0
I/flutter ( 9263): Topic Name = demo/v1
I/flutter ( 9263): Message Identifier = 67
I/flutter ( 9263): Payload Format Indicator = false
I/flutter ( 9263): Message Expiry Interval = 65535
I/flutter ( 9263): Topic Alias = 255
I/flutter ( 9263): Response Topic =
I/flutter ( 9263): Subscription Identifier(s) = []
I/flutter ( 9263): Properties = No properties set
I/flutter ( 9263):
I/flutter ( 9263): Payload: {1 bytes={<53>
I/flutter ( 9263): 2023-09-12 20:50:58.427358 -- MqttConnectionHandlerBase::sendMessage - sending message ended >>>

This comes from publishUserMessage so your code has called this and is OK.

We then receive it back -

I/flutter ( 9263): 2023-09-12 20:50:58.439143 -- MqttServerConnection::_onData - Message Received Started <<<
I/flutter ( 9263): 2023-09-12 20:50:58.439527 -- MqttServerConnection::_ondata - adding incoming data, data length is 13, message stream length is 0, message stream position is 0
[log] demo/v1
I/flutter ( 9263): 2023-09-12 20:50:58.439828 -- MqttServerConnection::_ondata - added incoming data message stream length is 13, message stream position is 0
I/flutter ( 9263): 2023-09-12 20:50:58.440030 -- MqttByteBuffer:isMessageAvailable - assumed valid header, value is 49
I/flutter ( 9263): 2023-09-12 20:50:58.440706 -- MqttServerConnection::_onData - MESSAGE RECEIVED -> MQTTMessage of type MqttMessageType.publish
I/flutter ( 9263): MessageType = MqttMessageType.publish Duplicate = false Retain = true Qos = atMostOnce Size = 11
I/flutter ( 9263): Topic Name = demo/v1
I/flutter ( 9263): Message Identifier = 0
I/flutter ( 9263): Payload Format Indicator = false
I/flutter ( 9263): Message Expiry Interval = 65535
I/flutter ( 9263): Topic Alias = 255
I/flutter ( 9263): Response Topic =
I/flutter ( 9263): Subscription Identifier(s) = []
I/flutter ( 9263): Properties = No properties set
I/flutter ( 9263):
I/flutter ( 9263): Payload: {1 bytes={<53>
I/flutter ( 9263): 2023-09-12 20:50:58.441225 -- MqttServerConnection::_onData - message available event fired
I/flutter ( 9263): 2023-09-12 20:50:58.441425 -- MqttServerConnection::_onData - Message Received Ended <<<
I/flutter ( 9263): 2023-09-12 20:50:58.441673 -- MqttConnectionHandlerBase::messageAvailable - message type is MqttMessageType.publish
I/flutter ( 9263): 2023-09-12 20:50:58.441875 -- MqttPublishingManager::handlePublish - entered
I/flutter ( 9263): 2023-09-12 20:50:58.442196 -- MqttPublishingManager::_notifyPublish - entered message MqttQos.atMostOnce
I/flutter ( 9263): 2023-09-12 20:50:58.442630 -- MqttSubscriptionManager::publishMessageReceived topic is demo/v1

Again all OK. We then publish again with message id 68 -

I/ViewRootImpl@e47ee08[MainActivity]( 9263): ViewPostIme pointer 0
I/ViewRootImpl@e47ee08[MainActivity]( 9263): ViewPostIme pointer 1
I/flutter ( 9263): 2023-09-12 20:50:58.666861 -- MqttPublishingManager::publishUserMessage - entered
I/flutter ( 9263): 2023-09-12 20:50:58.667324 -- MqttConnectionHandlerBase::sendMessage - sending message started >>> -> MQTTMessage of type MqttMessageType.publish
I/flutter ( 9263): MessageType = MqttMessageType.publish Duplicate = false Retain = true Qos = atMostOnce Size = 0
I/flutter ( 9263): Topic Name = demo/v1
I/flutter ( 9263): Message Identifier = 68
I/flutter ( 9263): Payload Format Indicator = false
I/flutter ( 9263): Message Expiry Interval = 65535
I/flutter ( 9263): Topic Alias = 255
I/flutter ( 9263): Response Topic =
I/flutter ( 9263): Subscription Identifier(s) = []
I/flutter ( 9263): Properties = No properties set
I/flutter ( 9263):
I/flutter ( 9263): Payload: {1 bytes={<53>
I/flutter ( 9263): 2023-09-12 20:50:58.668340 -- MqttConnectionHandlerBase::sendMessage - sending message ended >>>

Again from publishUserMessage so your code has called this but it is Ok. We then break -

I/flutter ( 9263): 2023-09-12 20:50:58.680499 -- MqttServerConnection::_onData - Message Received Started <<<
I/flutter ( 9263): 2023-09-12 20:50:58.680793 -- MqttServerConnection::_ondata - adding incoming data, data length is 12, message stream length is 0, message stream position is 0
I/flutter ( 9263): 2023-09-12 20:50:58.680959 -- MqttServerConnection::_ondata - added incoming data message stream length is 12, message stream position is 0
I/flutter ( 9263): 2023-09-12 20:50:58.681145 -- MqttByteBuffer:isMessageAvailable - assumed valid header, value is 49
I/flutter ( 9263): 2023-09-12 20:50:58.684215 -- MqttServerConnection::_ondata - exception raised is mqtt-client::InvalidMessageException: The data provided in the message stream was not a valid MQTT Message, exception is mqtt-client::IncompleteMessageException: Available bytes is less than the message size, bytestream is [49, 11, 0, 7, 100, 101, 109, 111, 47, 118, 49, 0]
I/flutter ( 9263): 2023-09-12 20:50:58.684530 -- MqttServerConnection::_ondata - irrecoverable exception raised - sending disconnect mqtt-client::InvalidMessageException: The data provided in the message stream was not a valid MQTT Message, exception is mqtt-client::IncompleteMessageException: Available bytes is less than the message size, bytestream is [49, 11, 0, 7, 100, 101, 109, 111, 47, 118, 49, 0]
I/flutter ( 9263): 2023-09-12 20:50:58.688588 -- MqttServerConnection::_onData - Message Received Ended <<<
...

So, I can't see where the client is publishing anything by itself, all publish calls are from user code so you must somehow be setting this 224 code in your publish message. Please add debug around your code to print when you publish and what you publish.

AgPeHaJIuH1 commented 11 months ago

I have a completely empty application that only publishes "5" and that's it! oe publication can only come from your code. We used a packet sniffer to capture this.

192.168.1.136 - flutter 192.168.1.2 - Broker

image

AgPeHaJIuH1 commented 11 months ago

I'm guessing that the message sent from your code is not the expected one, so it might not end up in your logs. But you can’t fool the interceptor =)))) It records everything that passes through it. This can be seen on the screenshot

AgPeHaJIuH1 commented 11 months ago

and no, we do not install NoLocal. We WANT to receive our messages again if we are subscribed to the topic

AgPeHaJIuH1 commented 11 months ago

my code in Flutter

import 'dart:convert';
import 'dart:developer';
import 'dart:typed_data';

import 'package:flutter/material.dart';
import 'package:mqtt5_client/mqtt5_client.dart';
import 'package:mqtt5_client/mqtt5_server_client.dart';
import 'package:typed_data/typed_data.dart';

void main() {
  runApp(const MyApp());
}

class MyApp extends StatelessWidget {
  const MyApp({super.key});

  // This widget is the root of your application.
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      title: 'Flutter Demo',
      theme: ThemeData(
        colorScheme: ColorScheme.fromSeed(seedColor: Colors.deepPurple),
        useMaterial3: true,
      ),
      home: const MyHomePage(),
    );
  }
}

class MyHomePage extends StatefulWidget {
  const MyHomePage({super.key});

  @override
  State<MyHomePage> createState() => _MyHomePageState();
}

class _MyHomePageState extends State<MyHomePage> {
  @override
  Widget build(BuildContext context) {
    return Scaffold(
        appBar: AppBar(
          backgroundColor: Theme.of(context).colorScheme.inversePrimary,
        ),
        body: Center(
          child: Builder(builder: (context) {
            return Column(
              mainAxisAlignment: MainAxisAlignment.center,
              children: [
                ElevatedButton(onPressed: () => _connect(context), child: Text("Connect")),
                ElevatedButton(onPressed: _send, child: Text("Send")),
              ],
            );
          }),
        ));
  }

  MqttServerClient? mqttClient;

  void _connect(BuildContext context) async {
    mqttClient = MqttServerClient.withPort('192.168.1.2', 'dasgjdkghd', 1883);
    //!!!! FOR DEBUG
    mqttClient!.logging(on: true);
    mqttClient!.keepAlivePeriod = 5;
    mqttClient!.secure = false;
    mqttClient!.autoReconnect = true;
    mqttClient!.resubscribeOnAutoReconnect = true;
    mqttClient!.onConnected = _onConnectedHandler;
    mqttClient!.onAutoReconnect = _onAutoReconnectHandler;
    mqttClient!.onAutoReconnected = _onAutoReconnectedHandler;
    mqttClient!.onDisconnected = _onDisconnectedHandler;
    mqttClient!.onBadCertificate = (Object? certificate) => true;

    try {
      var zz = await mqttClient!.connect('demo', 'demo');
      var z = mqttClient!.connectionStatus;
      if (mqttClient!.connectionStatus?.state == MqttConnectionState.connected) {
        ScaffoldMessenger.of(context).showSnackBar(SnackBar(content: Text('Connected')));
        mqttClient!.updates.listen(_onReceivedMessage);

        mqttClient!.subscribe('#', MqttQos.atMostOnce);
      } else {}
    } catch (ex, st) {
      log(ex.toString());
      log(st.toString());
    }
  }

  Future _send() async {
    String text = '5';
    List<int> encodedText = utf8.encode(text);
    Uint8List list = Uint8List.fromList(encodedText);
    Uint8Buffer buffer = Uint8Buffer();
    buffer.addAll(list);

    MqttPublishMessage mqttPublishMessage = MqttPublishMessage().toTopic('demo/v1').publishData(buffer).withQos(MqttQos.atMostOnce);

    mqttPublishMessage.setRetain(state: true);

    mqttClient!.publishUserMessage(mqttPublishMessage);
  }

  void _onConnectedHandler() {}

  void _onAutoReconnectHandler() {}

  void _onAutoReconnectedHandler() {}

  void _onDisconnectedHandler() {}

  void _onReceivedMessage(List<MqttReceivedMessage<MqttMessage>> mqttMessageList) {
    for (MqttReceivedMessage<MqttMessage> mqttMessage in mqttMessageList) {
      MqttPublishMessage publishMqttMessage = mqttMessage.payload as MqttPublishMessage;
      if (publishMqttMessage.payload.message == null) {
        return;
      }

      String messageText = utf8.decode(publishMqttMessage.payload.message!.toList(), allowMalformed: true);
      log(mqttMessage.topic!);
      log(messageText);
    }
  }
}
AgPeHaJIuH1 commented 11 months ago

You are wrong that brokers should not swallow messages. Many brokers have additional configuration parameters. For example, eqmx, a popular Chinese broker, which has a parameter that allows or denies NON-UTF8 characters

image

DxrMorgan commented 11 months ago

OK from the MQTT5 spec -

Bit 2 of the Subscription Options represents the No Local option. If the value is 1, Application Messages MUST NOT be forwarded to a connection with a ClientID equal to the ClientID of the publishing connection.

So this seems to be a subscription option.

There is no equivalent in the MQTT3 spec from what I can see.

I think I understand you now, if subscribe to the topic you are publishing to then yes you will receive the message on that topic like any other subscriber, I thought you meant the publish message was somehow just being reflected back to you even if you weren't subscribed.

So if you set no local on subscription the broker should not send that publish message back to you, ,so the question is are you setting this when you subscribe? Presumably not so you are getting the message on your subscription.

The mqtt5_server_client.dart example in the example directory of the repo does exactly this and works fine, there is no re-publishing when the publish message is received for the topic.

From the log we publish the message with a message identifier of 67 -

I/flutter ( 9263): 2023-09-12 20:50:58.426199 -- MqttPublishingManager::publishUserMessage - entered
I/flutter ( 9263): 2023-09-12 20:50:58.426635 -- MqttConnectionHandlerBase::sendMessage - sending message started >>> -> MQTTMessage of type MqttMessageType.publish
I/flutter ( 9263): MessageType = MqttMessageType.publish Duplicate = false Retain = true Qos = atMostOnce Size = 0
I/flutter ( 9263): Topic Name = demo/v1
I/flutter ( 9263): Message Identifier = 67
I/flutter ( 9263): Payload Format Indicator = false
I/flutter ( 9263): Message Expiry Interval = 65535
I/flutter ( 9263): Topic Alias = 255
I/flutter ( 9263): Response Topic =
I/flutter ( 9263): Subscription Identifier(s) = []
I/flutter ( 9263): Properties = No properties set
I/flutter ( 9263):
I/flutter ( 9263): Payload: {1 bytes={<53>
I/flutter ( 9263): 2023-09-12 20:50:58.427358 -- MqttConnectionHandlerBase::sendMessage - sending message ended >>>

This comes from publishUserMessage so your code has called this and is OK.

We then receive it back -

I/flutter ( 9263): 2023-09-12 20:50:58.439143 -- MqttServerConnection::_onData - Message Received Started <<<
I/flutter ( 9263): 2023-09-12 20:50:58.439527 -- MqttServerConnection::_ondata - adding incoming data, data length is 13, message stream length is 0, message stream position is 0
[log] demo/v1
I/flutter ( 9263): 2023-09-12 20:50:58.439828 -- MqttServerConnection::_ondata - added incoming data message stream length is 13, message stream position is 0
I/flutter ( 9263): 2023-09-12 20:50:58.440030 -- MqttByteBuffer:isMessageAvailable - assumed valid header, value is 49
I/flutter ( 9263): 2023-09-12 20:50:58.440706 -- MqttServerConnection::_onData - MESSAGE RECEIVED -> MQTTMessage of type MqttMessageType.publish
I/flutter ( 9263): MessageType = MqttMessageType.publish Duplicate = false Retain = true Qos = atMostOnce Size = 11
I/flutter ( 9263): Topic Name = demo/v1
I/flutter ( 9263): Message Identifier = 0
I/flutter ( 9263): Payload Format Indicator = false
I/flutter ( 9263): Message Expiry Interval = 65535
I/flutter ( 9263): Topic Alias = 255
I/flutter ( 9263): Response Topic =
I/flutter ( 9263): Subscription Identifier(s) = []
I/flutter ( 9263): Properties = No properties set
I/flutter ( 9263):
I/flutter ( 9263): Payload: {1 bytes={<53>
I/flutter ( 9263): 2023-09-12 20:50:58.441225 -- MqttServerConnection::_onData - message available event fired
I/flutter ( 9263): 2023-09-12 20:50:58.441425 -- MqttServerConnection::_onData - Message Received Ended <<<
I/flutter ( 9263): 2023-09-12 20:50:58.441673 -- MqttConnectionHandlerBase::messageAvailable - message type is MqttMessageType.publish
I/flutter ( 9263): 2023-09-12 20:50:58.441875 -- MqttPublishingManager::handlePublish - entered
I/flutter ( 9263): 2023-09-12 20:50:58.442196 -- MqttPublishingManager::_notifyPublish - entered message MqttQos.atMostOnce
I/flutter ( 9263): 2023-09-12 20:50:58.442630 -- MqttSubscriptionManager::publishMessageReceived topic is demo/v1

Again all OK. We then publish again with message id 68 -

I/ViewRootImpl@e47ee08[MainActivity]( 9263): ViewPostIme pointer 0
I/ViewRootImpl@e47ee08[MainActivity]( 9263): ViewPostIme pointer 1
I/flutter ( 9263): 2023-09-12 20:50:58.666861 -- MqttPublishingManager::publishUserMessage - entered
I/flutter ( 9263): 2023-09-12 20:50:58.667324 -- MqttConnectionHandlerBase::sendMessage - sending message started >>> -> MQTTMessage of type MqttMessageType.publish
I/flutter ( 9263): MessageType = MqttMessageType.publish Duplicate = false Retain = true Qos = atMostOnce Size = 0
I/flutter ( 9263): Topic Name = demo/v1
I/flutter ( 9263): Message Identifier = 68
I/flutter ( 9263): Payload Format Indicator = false
I/flutter ( 9263): Message Expiry Interval = 65535
I/flutter ( 9263): Topic Alias = 255
I/flutter ( 9263): Response Topic =
I/flutter ( 9263): Subscription Identifier(s) = []
I/flutter ( 9263): Properties = No properties set
I/flutter ( 9263):
I/flutter ( 9263): Payload: {1 bytes={<53>
I/flutter ( 9263): 2023-09-12 20:50:58.668340 -- MqttConnectionHandlerBase::sendMessage - sending message ended >>>

Again from publishUserMessage so your code has called this but it is Ok. We then break -

I/flutter ( 9263): 2023-09-12 20:50:58.680499 -- MqttServerConnection::_onData - Message Received Started <<<
I/flutter ( 9263): 2023-09-12 20:50:58.680793 -- MqttServerConnection::_ondata - adding incoming data, data length is 12, message stream length is 0, message stream position is 0
I/flutter ( 9263): 2023-09-12 20:50:58.680959 -- MqttServerConnection::_ondata - added incoming data message stream length is 12, message stream position is 0
I/flutter ( 9263): 2023-09-12 20:50:58.681145 -- MqttByteBuffer:isMessageAvailable - assumed valid header, value is 49
I/flutter ( 9263): 2023-09-12 20:50:58.684215 -- MqttServerConnection::_ondata - exception raised is mqtt-client::InvalidMessageException: The data provided in the message stream was not a valid MQTT Message, exception is mqtt-client::IncompleteMessageException: Available bytes is less than the message size, bytestream is [49, 11, 0, 7, 100, 101, 109, 111, 47, 118, 49, 0]
I/flutter ( 9263): 2023-09-12 20:50:58.684530 -- MqttServerConnection::_ondata - irrecoverable exception raised - sending disconnect mqtt-client::InvalidMessageException: The data provided in the message stream was not a valid MQTT Message, exception is mqtt-client::IncompleteMessageException: Available bytes is less than the message size, bytestream is [49, 11, 0, 7, 100, 101, 109, 111, 47, 118, 49, 0]
I/flutter ( 9263): 2023-09-12 20:50:58.688588 -- MqttServerConnection::_onData - Message Received Ended <<<
...

So, I can't see where the client is publishing anything by itself, all publish calls are from user code so you must somehow be setting this 224 code in your publish message. Please add debug around your code to print when you publish and what you publish.

How else to explain it is unclear! A broker was installed and an example on the flutter. A shark sniffer was launched to intercept packets to see who is sending the symbol that crashes the connection. As it turned out, this is done by the flutter library. In the screenshots above, this can be seen and proved! With a quick exchange, this occurs almost immediately. Most likely, packet reception or buffer handling is implemented incorrectly somewhere! Of course, you can correspond for a long time, but the truth is that she is alone. The sniffer does not lie and it can be clearly seen from the packets! The library is raw and may fall off!

DxrMorgan commented 11 months ago

The logic in the library is written incorrectly and resources are used irrationally and the library does not have time to process requests

There is a local parameter (such as an echo from the broker) when it is turned off

mqtt Subscription Option.noaccel = true;

it seems to work (sometimes of course it falls down), in this mode the packets come only from the flutter

but if you put:

mqtt Subscription Option.no Local = false;

then 99% will fail immediately. Because for each packet from flutter, the broker will respond and the library crumbles and sends and for some reason sends an unreadable character for utf 8

Most likely, the function responsible for receiving or parsing is written with an error!

shamblett commented 11 months ago

OK, the only way I'm going to get to the bottom of this is to run your test code in my local development environment against a range of brokers so please could you post the code you are using to run your test, its important that I'm doing exactly the same as you.

That said I'm away for 10 days now so I won't have my dev environment on hand until I get back.

In the meantime it might be worth running your test with the mqtt_client package. This is the MQTT3 version, you won't of course be able to use any MQTT5 features but just for testing purposes I guess that'l be OK. It'll be interesting to see what this client does.

DxrMorgan commented 11 months ago

Tested the library. If when sending a package with localhost = true, everything seems to work. But it is worth putting false falls.

If false, the broker returns the package (as an echo) and then the library can't stand it and falls!!!

We decided to conduct a test, set the pub to be sent every second for 6 hours with local = true and everything went fine no exception

I checked the second test. Just subscribed to the topic and sent the pub with an external program. the library received packages and seems to be working fine

In general, the library shuts up when the exchange rate is high!

I advise you to conduct the experiment yourself. Do so to say echo mode (when the broker will respond back to each of your pub) and set local=false and you will see for yourself how it will give an exception! Only send 1 byte (any character)

DxrMorgan commented 11 months ago

https://drive.google.com/file/d/1RIE28zjLNqYaWcRlm-cdrsQsZ_9yttpf/view?usp=drive_link

Look video! i record from screen

DxrMorgan commented 11 months ago

We found the cause of the error. The library really messes up the packet and adds byte 224 to the sending packet. (you are not correctly forming packets for disconnection)

The reception logic is incorrectly implemented! the library does not wait for the full reception of the package!!! and then everything breaks (make a normal ring buffer and read about tcp)

Now for more details

This bug can be caught only if you use local=false, this is only necessary for the broker to respond to us. You can write a broker yourself, the main thing is that you receive packages. You have a mistake in the reception! We need an echo from the broker

Now, what's going on

When receiving a package from a broker, it often turns out that the library starts parsing the package without waiting for its full reception. If you read about tcp packets, you should understand that sometimes we can get our data not for one tcp packet, but for 2 and 3, etc.

We haven't had time to check the part of the code that is responsible for receiving, but it is crooked. Probably you are not using a ring buffer, or the logic is wrong, or you are confused with await

As soon as you accept a part of the package, you immediately check its integrity. Accordingly , it turns out incomplete

and exception is executed

throw MqttIncompleteMessageException('Available bytes is less than the message size');

and then it starts even more interesting!

You are trying to send a disconnect package while you create it as a publish package!! And you add this 224 (224 is a shutdown command) The broker is very surprised and turns us off!

This is the second hard mistake!!!! Well, then the broker receives this broken package and disables us (we used to think that the library crashes and disables, but we found out that the broker disables and this is normal and according to the broker's rules)

In general, you have two problems

  1. You don't wait for the full package

look somewhere here while (messageStream.isMessageAvailable())

  1. You don't form packages correctly. The disable command cannot be placed in the publish package!!

final disconnect = MqttDisconnectMessage()..withReasonCode(MqttDisconnectReasonCode.normalDisconnection);

It also seems to have been noticed that you don't clear the buffer. Maybe we are mistaken, but in the verification method we see a buffer to which 3.5 packets at once! Not one, but a lot and with a trim. Of course, no check will pass there!!

to help, literature about mqtt, await and object programming

shamblett commented 11 months ago

Ok as I say Im away at the moment I eagerly await your pull request to fix this on my return. Thanks

DxrMorgan commented 11 months ago

We described the problem and gave points in the code where to dig. Then you decide for yourself whether you will fix it or not. We will most likely fix these problems ourselves and move on.

shamblett commented 11 months ago

Yes OK when you have fixed them create a pull request and I can update the package for the benefit of all other users. This is what we do in OS development isnt it?

Although you are the only user reporting this issue for your particular use case amything that makes the packge better should be adopted no matter how many users it impacts.

DxrMorgan commented 11 months ago

Believe me, there is only one truth here and this error is not systematic, and many may not notice it, well, think about it, the broker fell off, I put a reconnect.

You can conduct the experiment yourself create a broker who will answer you after your public. I.e. you are to him and he is to you immediately. And set a sending every 200 msec, for example, in a timer cycle so that you don't press the button quickly or your finger will get tired (so that you can definitely cause an error faster) and it won't take 20 seconds to get an error. We wouldn't have noticed it if it wasn't for the broker and local=false. Most likely, everyone has a broker configured so that he does not respond back to our public. This is of course correct and logical, but it does not negate that there is an error in the library.

I have a lot of experience debugging applications, I always test my own and other people's applications according to a rigid scheme.

shamblett commented 11 months ago

Ok but if you are going to fix this and move on as you said above then its pointless both of us doing this when you could just publish the fix which I can integrate into the package and test of course.

AgPeHaJIuH1 commented 11 months ago

lib\src\messages\mqtt_message.dart

image

the one who calls the createFrom method expects that it might receive an MqttIncompleteMessageException, but you throw it in this method and immediately catch it and throw another exception (MqttInvalidMessageException). adding lines 48 and 49 to the code solved the current problem and the client started working stably. I’m not sure that this is entirely correct, I haven’t studied all the logic of the work, you know better what to do next.

Your code is difficult to fully understand and separate all references, since dynamic variables are used. This is very bad practice. In this case, you don't need to use them, I recommend creating abstract classes, implementing and using them rather than dynamic variables, so that the code is easier for other developers to understand and contribute to. Now I can't examine the entire call stack (due to dynamic variables) to be sure that my code is correct

shamblett commented 11 months ago

OK, looks good to me, fix incorporated and package re released at version 4.0.1