dotnet / MQTTnet

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
MIT License
4.44k stars 1.06k forks source link

payload not equal,MqttWebSocketServer mqttv5 #743

Closed pcbing closed 5 years ago

pcbing commented 5 years ago

Describe the bug

After a message is published, the message received through the mqtt server changes

Which project is your bug related to?

To Reproduce

Steps to reproduce the behavior:

  1. Using this version of MQTTnet '3.0.6'.
  2. Server use MqttWebSocketServer in aspnetcore project
  3. Client use mqttv5 ResponseTopic,CorrelationData
  4. Publish payload not equal received

Screenshots

payload published:313233343536 payload received:000000000000000000000000000000000000000000000000000000000000000000000000000000000000313233343536

Code example

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Protocol;
using System;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static async Task Main(string[] args)
        {

            var a = new MqttCommunicate();
            Console.ReadLine();
            await a.PublishAsync("api1", "123456789");
            await a.PublishAsync("api", "123456");

            Console.ReadLine();

        }

        public class MqttCommunicate
        {
            private IMqttClient _client;
            private IMqttClientOptions _options;

            public MqttCommunicate()
            {
                _client = new MqttFactory().CreateMqttClient();
                _options = new MqttClientOptionsBuilder()
                    .WithWebSocketServer("localhost:5000/mqtt")
                    .WithClientId("test")
                    .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500)
                    .Build();
                Init();               
            }

            private void Init()
            {
                _client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(e =>
                {
                    var topic = e.ApplicationMessage.Topic;
                    var payload = e.ApplicationMessage.Payload;
                    var hex = String.Concat(Array.ConvertAll(payload, x => x.ToString("X2")));
                    Console.WriteLine($"payload received:{hex}");
                });

                _client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(async e =>
                {
                    await _client.SubscribeAsync("api");
                });

                _client.ConnectAsync(_options);
            }

            public async Task PublishAsync(string topic, string data)
            {
                var guid = Guid.NewGuid();
                var str = String.Concat(Array.ConvertAll(System.Text.Encoding.UTF8.GetBytes(data), x => x.ToString("X2")));

                Console.WriteLine($"payload published:{str}");
                var req = new MqttApplicationMessageBuilder()
                    .WithTopic(topic)
                    .WithResponseTopic($"_")
                    .WithCorrelationData(guid.ToByteArray())
                    .WithPayload(data)
                    .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
                    .Build();
                await _client.PublishAsync(req).ConfigureAwait(false);
            }

            public async Task PublishAsync1(string topic, string data)
            {

                var str = String.Concat(Array.ConvertAll(System.Text.Encoding.UTF8.GetBytes(data), x => x.ToString("X2")));

                Console.WriteLine($"payload published:{str}");
                var req = new MqttApplicationMessageBuilder()
                    .WithTopic(topic) 
                    .WithPayload(data)
                    .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
                    .Build();
                await _client.PublishAsync(req).ConfigureAwait(false);
            }

        }

    }
}
SeppPenner commented 5 years ago

I have no idea where this comes from, but what I can tell is that MQTT5 currently isn't fully supported in the server project.

I'm not sure, but can you maybe try:

Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");

instead of:

var hex = String.Concat(Array.ConvertAll(payload, x => x.ToString("X2")));
Console.WriteLine($"payload received:{hex}");

from the example under https://github.com/chkr1011/MQTTnet/wiki/Client#consuming-messages?

pcbing commented 5 years ago

After replacing MQTTnet.AspNetCore.SpanBasedMqttPacketWriter with MQTTnet.Formatter.MqttPacketWriter, it works.Maybe something wrong in class MQTTnet.AspNetCore.SpanBasedMqttPacketWriter

chkr1011 commented 5 years ago

@JanEggers Do you know what's wrong? I assume some wrong buffer offsets, lengths etc.?

JanEggers commented 5 years ago

@chkr1011 I can reproduce the error in a test, will work on a fix now

JanEggers commented 5 years ago

@chkr1011 fixed pr is on the way, not sureif we need more testcoverage on v5, @pcbing thx for reporting it

chkr1011 commented 5 years ago

Perfekt 👍 I will release this with the upcoming release of 3.0.7.

And yes we need more tests for 5.0.0 😄

chkr1011 commented 5 years ago

@pcbing Please test 3.0.7-rc2.

pcbing commented 5 years ago

It works,thanks

pcbing commented 5 years ago

It works,thanks

chkr1011 commented 5 years ago

@JanEggers I am interested what the error was. Can you explain a little bit?

JanEggers commented 5 years ago

I increased the 'bytes written' counter twice when copiing a subwriter. that added N times 0 where N is the size of the subwriter.