dotnet / MQTTnet

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
MIT License
4.5k stars 1.07k forks source link

Retain the copy message #2115

Open xljiulang opened 1 day ago

xljiulang commented 1 day ago

This PR fixes #2113

xljiulang commented 21 hours ago

I found that this is a stubborn bug. After making the above fix, if I modify the logic of MqttChannelAdapter receiving data packets, the bug still exists.

So once a packet is received, we have to copy its payload out instead of using the Pool memory? In Mqttnet.AspNetCore, using Pool memory is the default behavior, which means that the project has never handled this situation correctly.

async Task<ReceivedMqttPacket> ReceiveAsync(CancellationToken cancellationToken)
{
   if (cancellationToken.IsCancellationRequested)
   {
       return ReceivedMqttPacket.Empty;
   }

   var readFixedHeaderResult = await ReadFixedHeaderAsync(cancellationToken).ConfigureAwait(false);

   if (cancellationToken.IsCancellationRequested)
   {
       return ReceivedMqttPacket.Empty;
   }

   if (readFixedHeaderResult.IsConnectionClosed)
   {
       return ReceivedMqttPacket.Empty;
   }

   var fixedHeader = readFixedHeaderResult.FixedHeader;
   if (fixedHeader.RemainingLength == 0)
   {
       return new ReceivedMqttPacket(fixedHeader.Flags, ReadOnlySequence<byte>.Empty, 2);
   }

   var bodyLength = fixedHeader.RemainingLength;

   // Return and clear the previous body buffer
   _bodyOwner?.Dispose();

   // Re-rent a body buffer
   _bodyOwner = BufferOwner.Rent(bodyLength);
   var body = _bodyOwner.Buffer;

   var bodyOffset = 0;
   var chunkSize = Math.Min(ReadBufferSize, bodyLength);

   do
   {
       var bytesLeft = bodyLength - bodyOffset;
       if (chunkSize > bytesLeft)
       {
           chunkSize = bytesLeft;
       }

       var readBytes = await _channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false);

       if (cancellationToken.IsCancellationRequested)
       {
           return ReceivedMqttPacket.Empty;
       }

       if (readBytes == 0)
       {
           return ReceivedMqttPacket.Empty;
       }

       bodyOffset += readBytes;
   } while (bodyOffset < bodyLength);

   PacketInspector?.FillReceiveBuffer(body.AsSpan(0, bodyLength));

   var bodySegment = body.AsMemory(0, bodyLength);
   var bodySequence = new ReadOnlySequence<byte>(bodySegment);
   return new ReceivedMqttPacket(fixedHeader.Flags, bodySequence, fixedHeader.TotalLength);
}
private sealed class BufferOwner : IDisposable
{
  private bool _disposed = false;

  public byte[] Buffer { get; private set; }

  public static BufferOwner Rent(int bufferSieze)
  {
      return new BufferOwner()
      {
          Buffer = ArrayPool<byte>.Shared.Rent(bufferSieze)
      };
  }

  public void Dispose()
  {
      if (!_disposed)
      {
          _disposed = true;
          ArrayPool<byte>.Shared.Return(Buffer);
      }
  }
}