dotnet / MQTTnet

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
MIT License
4.46k stars 1.06k forks source link

Client processes mqtt messages out of order #248

Closed dpsenner closed 6 years ago

dpsenner commented 6 years ago

Expected behavior:

When connecting to a broker with clean_session=false and a fixed client_id, and then subscribe to a topic that is published with QOS > 0 by using QOS > 0 on the subscription, the client can receive messages that were sent while he was offline. This is a recommended and widely used setup, as can be seen at https://stackoverflow.com/questions/34150452/receive-offline-messages-mqtt messages . Most mqtt brokers preserve the ordering of messages when the QOS of a topic is > 0 and therefore messages should arrive on the client in order and also processed in order.

Actual behavior:

Each of the message is dispatched to a registered ApplicationMessageReceived handler on a separate thread and therefore the ordering of the messages is not preserved.

Proposed fix

I believe that the library invokes Task.Run() or something similar when invoking registered ApplicationMessageReceived handlers. While this works fine in many cases, the library should not implement dispatcher logic or provide an api to customize the dispatcher logic. This allows a user of the library to decide whether dispatching to multiple threads is sensible or whether he would rather like to process messages in the same thread. Using the same thread further improves performance by reducing the number of context switches involved.

The current implementation would force us to implement a rather complex caching and sorting mechanism to achieve message ordering.

chkr1011 commented 6 years ago

Hi, yes you are right. I will try to remove the Task.Run. If the library user needs separate threads he can start a new one in the event handler. But I will consider adding a new option for this because otherwise a slow message (in meaning of processing) will block all other incoming messages.

Best regards Christian

markusschaber commented 6 years ago

Hmm, maybe a per-receiver queue, and using async/await could solve the problem? This way, a receiver can only block itself?

biapar commented 6 years ago

Could you add some queue manager.

dpsenner commented 6 years ago

Thanks Christian for your fast response on this topic. Regarding the response from @biapar:

A queue manager can be implemented easily by using a BlockingCollection as follows:

BlockingCollection<MqttApplicationMessageReceivedEventArgs> ReceivedMqttMessageEvents { get; } = new BlockingCollection<MqttApplicationMessageReceivedEventArgs>();
mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
[...]
private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
{
    ReceivedMqttMessageEvents.Add(e);
}

I see no need for a queue manager for something simple as that.

dpsenner commented 6 years ago

Regarding the response from @markusschaber: I do not grasp your idea. Would you please explain it more verbosely?

markusschaber commented 6 years ago

@chkr1011 Instead of spawning 1 task per message, have 1 task per subscriber. All messages for that subscriber are put in a queue, and the task loops around, delivering one message after another.

Use Tasks and Async/Await instead of a "full" thread, to be more efficient.

dpsenner commented 6 years ago

Having a task per subscriber assumes that each subscriber should handle messages in parallel but this is not always the case. Delivering the same event to different parts in the codebase is a valid design choice that does not require any multi-threading to take place.

To me, leaving out the dispatching functionality is the best option because:

But as pointed out earlier, if the library would like to provide dispatching functionality it should allow it to be customized. In that case I would do that by providing an interface as follows to the MqttFactory:

public interface IMqttApplicationMessageDispatchingStrategy
{
    void ApplicationMessageReceived(IMqttClient client, MqttApplicationMessage message);
}

Such an interface would make the ApplicationMessageReceived event api obsolete because:

Changing the api of IMqttClient might not be the wisest choice, hence my proposal of simply dropping Task.Run(). One can then still interface to invocations on that event by implementing an interface similar to the above but without breaking current implementations.

JanEggers commented 6 years ago

i also hit this issue, ill introduce a scheduler, default will be Task.run but there will also be a inline scheduler

JanEggers commented 6 years ago

seems this was already addressed with: https://github.com/chkr1011/MQTTnet/blob/5e79581662f0e7f409974920a5949fb87202860f/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs#L402-L409

I think the enum value Dedicated is misleading as It is used with Task.Run aka Threadpool

chkr1011 commented 6 years ago

@dpsenner I released version 2.8.0-alpha5. In this version all incoming messages are processed in the same thread and thus the order should not be changed. Please let me know if it works for you.

dpsenner commented 6 years ago

We've completed the MQTT technology spike and team lead decided that it is currently unfeasible to take the road to MQTT with the technological debt that we carry along in this project. Because of this I won't get the chance to evaluate your fix during my day job. But I am happy to see you addressed this issue so quickly! Hopefully we get the chance to meet again in the future. We've found that MQTT / MQTTnet would be a great asset for our requirements and we keep it in mind for future tasks. Thanks!

asthomas commented 6 years ago

The code still delivers packets out of order in all qualities of service. See Performance bottlenecks - analysis