dotnet / MQTTnet

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
MIT License
4.5k stars 1.07k forks source link

About the memory error problem of InjectApplicationMessage of MqttServer in client.WithCleanSession(false) #2113

Open xljiulang opened 5 days ago

xljiulang commented 5 days ago

Describe the bug

When I was doing performance optimization extension for MqttServer's InjectApplicationMessage , I used ArrayPool<byte> memory and unexpectedly found that the test QoS_Tests.Preserve_Message_Order_For_Queued_Messages failed because I cleared and recycled poolPayload after await MqttServer.InjectApplicationMessage( poolPayload) . I think there is no problem with my operation timing, and this behavior should be allowed.

If I use the byte[] poolPayload that I manually new, this test is successful. The reason is that the caller provides a payload in an independent space for InjectApplicationMessage, and this payload will only be used by MqttServer in the future until it is recycled by GC.

Which component is your bug related to?

To Reproduce

Modify the InjectApplicationMessage code of MqttServerExtensions and run the test Preserve_Message_Order_For_Queued_Messages.

public static async Task InjectApplicationMessage(
    this MqttServer server,
    string topic,
    string payload = null,
    MqttQualityOfServiceLevel qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce,
    bool retain = false)
{
    ArgumentNullException.ThrowIfNull(server);
    ArgumentNullException.ThrowIfNull(topic);

    var payloadBuffer = Encoding.UTF8.GetBytes(payload ?? string.Empty);

    var message = new MqttApplicationMessageBuilder()
        .WithTopic(topic)
        .WithPayload(payloadBuffer)
        .WithQualityOfServiceLevel(qualityOfServiceLevel)
        .WithRetainFlag(retain)
        .Build();

    await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(message));

    // clear the buffer
    payloadBuffer.AsSpan().Clear();
}

Expected behavior

After await InjectApplicationMessage(), the memory reference to the payload must be released.