dotnet / MQTTnet

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
MIT License
4.32k stars 1.04k forks source link

Freeze on long running async operation in ApplicationMessageReceivedAsync #1646

Open rincewound opened 1 year ago

rincewound commented 1 year ago

Version: 4.1.4.563

Describe the bug

If a long running async operation takes place in the callstack of the ApplicationMessageReceivedAsync event the client will not forward any messages received in between until after the call to the event completed.

Which component is your bug related to?

To Reproduce

T Given the following implementation for an application message handler:

public AsyncAutoResetEvent evt = new AsyncAutoResetEvent(false);

    private async Task on_message_test(MqttApplicationMessageReceivedEventArgs arg)
    {    
        if (arg.ApplicationMessage.Topic.Equals("a"))
        {
            Log.Info("Waiting");
            await evt .WaitAsync(TimeSpan.FromSeconds(20));
            Log.Info("Done Waiting");        
        }
        else if (arg.ApplicationMessage.Topic.Equals("b"))
        {
            Log.Info("Setting Event");
            evt .Set();
            Log.Info("Event was set");
        }
    }

This produces the following output:

[17:01:33 INF] Waiting
[17:01:53 INF] Done Waiting
[17:01:53 INF] Waiting                                <--- Resend from broker?
[17:02:13 INF] Done Waiting
[17:02:13 INF] Setting Event
[17:02:13 INF] Event was set
[17:02:13 INF] Setting Event                     <--- Resend from broker?
[17:02:13 INF] Event was set

i.e. not only does the client not dispatch further messages, the block is severe enough that the broker seems to resend the message.

TL;DR: If a message on topic "a" is received the client is blocked for 20 seconds, even if a message on topic "b" is received in the mean time. The message on topic "b" will not be dispatched until the timeout has expired and the above call has returned.

Notes: The implementation of AsyncAutoResetEvent is from DotNext, but I have also tried a simplified own version to verify that that is not actually the problem.

Further: Sorry, the problem popped up in the context of a larger codebase, so I had to create a very stripped down example

Expected behavior

I might have a problem understanding async/await (the topic is newish for mit with C#), but my expecation is, that the client spawns one task per received message, where each task is executed by workers from the threadpool, i.e. if a message in "a" appears the task that processes the message can happily wait until a message in "b" appears without freezing the client.

Devidence7 commented 1 year ago

Hi @rincewound,

Based on your code snippet and the issue you've described, it seems that the MqttClient is processing messages in a serialized manner instead of processing them concurrently. I understand your expectation that the client should spawn one task per received message, allowing them to be processed independently without freezing the client.

I recommend trying the following modification to your code to force the processing of messages concurrently:

private async Task on_message_test(MqttApplicationMessageReceivedEventArgs arg)
{
    _ = Task.Run(async () => {
        if (arg.ApplicationMessage.Topic.Equals("a"))
        {
            Log.Info("Waiting");
            await evt.WaitAsync(TimeSpan.FromSeconds(20));
            Log.Info("Done Waiting");
        }
        else if (arg.ApplicationMessage.Topic.Equals("b"))
        {
            Log.Info("Setting Event");
            evt.Set();
            Log.Info("Event was set");
        }
    });
}

By wrapping the message handling code in Task.Run, you are effectively creating a new task for each message, allowing them to be processed concurrently. This should prevent the client from freezing when waiting for an event in the "a" topic.