dotnet / MQTTnet

MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
MIT License
4.46k stars 1.06k forks source link

Possible Race Condition #45

Closed 1iveowl closed 7 years ago

1iveowl commented 7 years ago

I'm struggling with the RecievePackets piece of you code on UWP. The issue only arise in Release, not in Debug mode.

Things seem to works on Windows 10, but not on Windows Mobile 10. I can replicate the issue both on a physical Windows 10 Mobile device as well as the Windows 10 Emulator. I even see the issue on rare occations on Windows 10 too actually.

I've been mulling over this piece .

I've narrowed it down to here. The behaviors is really strange, which leads me to thinking that this is a Race Condition sort of issue.

The Exception logged is:

The I/O operation has been aborted because of either a thread exit or an application request.

... and the exception originates from here.

I've been trying to figure out exactly what you are doing with the three threads you seem to spin up here, but it is not all that clear to me.

I always tend to get concerned when I see Task.Run(async () => ...) as a lot of care must be taken when going from one thread to multiple.

First of, I think that you can just make StartProcessReceivedPacket Async. Like this:

private async Task StartProcessReceivedPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
{
       await ProcessReceivedPacketAsync(packet).ConfigureAwait(false);
}

I've tried this, and it seems to work. Well, it doesn't solve my issue in Windows Mobile 10, but it doesn't introduce any new issues either and with this change we are down to only two thread.

Any chance that you (or we) could take a closer look at this central piece of your code where packets are received? For one, I'd like to see the code here to become more explicit and easier to follow . Is this some sort of packet receiver pump? Are you handling the order of which things are being send and received somehow?

Personally, I would have used use Reactive Extensions and maybe even some state machine here to keep track of if Connection is made etc, as this is exactly the sort of areas where Rx excels and tight state management matters.

1iveowl commented 7 years ago

I dug some deeper with Wireshark.

Look at these two recordings of initial traffic for establishing a connection.

Working (Windows 10 Desktop): working

Failing (Windows 10 Mobile, Emulator): failing

Looking at the timing of the fourth TCP packet. It is delayed approx. 12 sec. in the failing scenario Whereas the working connection has no such delay.

Also, this happened just before the CONNACK is send. It looks like the send logic is somehow tied up on the client end.

What I think is happening here is that the client is connecting to the server. The server is waiting for a PSH from the client, but it does not come for 12 seconds. Something on the client times out and the client get unstuck after 12 seconds and now finally sends the PSH. The server responds with the CONNACK, but now the client is no longer ready and it doesn't know what to do with the CONNACK from the server and shuts down the connection with a RST.

Why this happens and why this only happens in Release mode on Windows 10 Mobile and not Debug mode nor Windows 10 Desktop in any mode is still a mystery to me.

1iveowl commented 7 years ago

This is getting stranger by the minute.

In this small video both the if and the else is run?! if-else

Quantum physics?

chkr1011 commented 7 years ago

@1iveowl I also had such problems with UWP. I had a if-statement whioch was false but still entered. It was a mess. I also have it for my home automation project. I was able to fix it by debugging "Debug"-Versions only and disable the "native toolchain".

Related to processing messages I would like to keep the way it works now becaue if you make it async/await the internal processing is halted and if you have a longer running operation the internal thread for receiving will not receive anymore. But I see that the async await keywords inside of the Task.Run can be removed.

I am not very happy in adding other libraries to this project because my goal is to have it intependent from other libraries and their release cycles. I agree that it makes things easier but I assume that this is not such a big problem which can be fixed only by adding a totally different approach in dealing with incoming messages. Especially because it works on Windows 10 Desktop there must be a plattform related difference. I also tested a lot with a UWP test app from this project and have no problems at all.

I assume we cannot add the fush into the lock statement because this is not possible with async/await. So I asume it is correct. @JanEggers Can you maybe shed some light here?

@JanEggers Do you have maybe some idea? Maybe the issue is related to the new buffers etc.?

Best regards Christian

JanEggers commented 7 years ago

only thing i can say is that task.run is required because it is an async read loop that runs as long as the client is connected. if it would be awaited the connect would be completed if the client is disconnected again.

chkr1011 commented 7 years ago

@1iveowl Is this a new problem and didn't happen in a previous version?

1iveowl commented 7 years ago

Thank you. I've been working some more on the issue. I'm trying something new and will share when ready.

Not sure if this is new or old. I was not testing with UWP before now.

Regarding putting the FlushAsync inside the lock, it is not. What you are doing inside the lock is composing the task using ContinueWith. I'm only adding the FlushAsync to the composition where you already have WriteAsync. The combined Task is not run async until after the lock. At least this is how I read the code.

JanEggers commented 7 years ago

I assume we cannot add the fush into the lock statement because this is not possible with async/await. So I asume it is correct. @JanEggers Can you maybe shed some light here?

that is correct async await may not be inside lock statements, and the lock is present so just one writer has access to the stream at a time, without that i had raceconditions because packages override each other and then deserializing them failed.

1iveowl commented 7 years ago

that is correct async await may not be inside lock statements, and the lock is present so just one writer has access to the stream at a time, without that i had raceconditions because packages override each other and then deserializing them failed.

The way I read the code, you are composing the Task with ContinueWith inside the lock. Then you run the combined Task right after the lock. Is this the way you understand the code too? In this case I do believe that you can add both WriteAsync and FlushAsync inside the lock as they are in fact not run there.

I.e. this seems valid to me:

lock (_channel)
{
    foreach (var packet in packets)
    {
        if (packet == null) { continue; }

        MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]");

        var writeBuffer = PacketSerializer.Serialize(packet);
        _sendTask = _sendTask.ContinueWith(p => _channel.SendStream.WriteAsync(writeBuffer, 0, writeBuffer.Length, cancellationToken).ConfigureAwait(false), cancellationToken);
    }

    if (timeout > TimeSpan.Zero)
    {
        _sendTask = _sendTask.ContinueWith(c => _channel.SendStream.FlushAsync(cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false), cancellationToken);
    }
    else
    {
        _sendTask = _sendTask.ContinueWith(c => _channel.SendStream.FlushAsync(cancellationToken).ConfigureAwait(false), cancellationToken);
    }
}

await _sendTask; // configure await false generates stackoverflow

Now, the Task is not executing until await _sendTask which is outside the lock , so I'm not sure that you are protected from some other thread changing the channel simultaneously and hence creating a Race Condition, although the code makes this less likely.

JanEggers commented 7 years ago

yep you could also add flush inside the lock but flush does not affect the data that is send (so if a and b want to send stuff it does not matter if a flushes the stream or b flushes the stream). my understanding is that parallel calls to FlushAsync are scnchronized by the Stream.

1iveowl commented 7 years ago

My thinking was that if you add the _channel.SendStream.FlushAsync to the Task composition inside the lock, then at least you are sure that the FlushAsync is being added to the same _channel as the WriteAsync.

If you add FlushAsync to the Task outside the lock then there is a risk (albeit probably rather small) that the FlushAsync will be related to some other _channel as the the _channel is no longer protected by the lock. Or?

JanEggers commented 7 years ago

_channel is always the same instance eg relation from channeladapter to channel 1:1.

first i would verify that sending is really the issue. can u add logging before and after https://github.com/chkr1011/MQTTnet/blob/37a5dc7abefdc3878fd58f12220c8fa84baa1078/MQTTnet.Core/Client/MqttClient.cs#L53

and before and after https://github.com/chkr1011/MQTTnet/blob/37a5dc7abefdc3878fd58f12220c8fa84baa1078/MQTTnet.Core/Client/MqttClient.cs#L65

to verify timing eg. that connect completes in time and send is called.

1iveowl commented 7 years ago

I tried to log the order.

The order looks ok:

  1. Connect
  2. Reciever Start
  3. Send Start
1iveowl commented 7 years ago

Looks like the issue is related to the awaiter here. It times out.

1iveowl commented 7 years ago

Can you help me here?

I apologies my ignorance, but I have to admit that I don't understand the pattern and what is going on here with the AddPacketAwaiter and TaskCompletionSource<MqttBasePacket>()

I suppose it's a clever way to wait for the reply back from the server, I just don't understand how it works and it seems to be the origin for the challenges I'm having on Windows 10 Mobile.

chkr1011 commented 7 years ago

@1iveowl The dispatcher handles several incoming messages. First the well known messages are directly handled like a ping request. Then the dispatcher used if the message was not handled before. The dispatcher is simply a list of TaskCompletionSource index by type of packet and packet ID. The advantage is that the calling code can await the wait process for a certain package. Or the wait operation times out if a requested packet was not received.

This is what the dispatcher is used for from a high level perspective. If you say that he code times out there must be a trace message saying that a particular message (including) details was not handled nor dispatched. Can you paste a full trace here?

chkr1011 commented 7 years ago

I remember a bug in the past with the dispatcher. The problem was that the packet was already received and the PacketAwaiter was not set up at the moment. The fix was to first setup the PacketAwaiter for the response and THEN sending the request packet.

1iveowl commented 7 years ago

It's the very first CONNECT that times out as illustrated by the two Wireshark snippets above in this issue thread.

Regarding the bug you mention. When was this fixed? I guess the order you outline is the current order.

I think I need to read up on TaskCompletionSource.

chkr1011 commented 7 years ago

One problem I can imagine is that the Dispatcher has a dictionary now and the key is the type of the message. In a previous version it was a List instead. So I am not sure but in theory it can happen that a type of message is received twice then the old one is overridden due to the same key. We may need a trace here to test for such situations.

chkr1011 commented 7 years ago

OK I will read this again...

1iveowl commented 7 years ago

If I read this code correctly then the Packet is send before the WaitForPacketAsync method is called. The methoed isn't called before the last return line.

private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket
{
    var wait = _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.DefaultCommunicationTimeout);
    await _adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, _cancellationTokenSource.Token, requestPacket).ConfigureAwait(false);
    return (TResponsePacket) await wait;
}

Is this the bug you talk about here:

I remember a bug in the past with the dispatcher. The problem was that the packet was already received and the PacketAwaiter was not set up at the moment. The fix was to first setup the PacketAwaiter for the response and THEN sending the request packet.

chkr1011 commented 7 years ago

But I am wondering why this is only a problem for Windows 10 Mobile... The desktop version has really no problems at all.

I am not an expert with Wireshark so please correct me if I am wrong. The client connects without a problem and then the client wants to send the CONNECT packet but this has a delay of 12 seconds which leads to timeouts etc? If this is correct I assume the dispatcher is not the problem. Can you please post a log from the trace? I want to see whats happening exactly. The problem is that I cannot reproduce it. So you have to deliver every little peace of information :-)

1iveowl commented 7 years ago

I understand. But you need to help mere regarding the trace. Where do I find this or how do I create it?

chkr1011 commented 7 years ago

There is a static class called MqttTrace. It has a event and you can attach to it and write to the Console i.e. You will also find some example code in the test apps.

1iveowl commented 7 years ago

OK. Thanks.

1iveowl commented 7 years ago

Here is how far the trace goes, which seems to correlate with the Wireshark trace. trace

And here is where it seems to throw the Exception.

BTW: I'm using the develop branch. I pulled the latest version ~30 minutes ago.

JanEggers commented 7 years ago

can you please add datetime.now to each message so we see the timing

chkr1011 commented 7 years ago

@1iveowl This is very interesting. Assuming the messages are in the correct order (please add timestamps to the trace) the connect packet is sent and THEN the receiving is started. This is wrong. I don't know why this happends because in code the receive is started and THEN the connect packet is sent. Please test the following things:

Branch is OK.

1iveowl commented 7 years ago

Here are the three traces with time.

trace - no wait

Changed to long running:

private void StartReceivePackets(CancellationToken cancellationToken)
{
    Task.Factory.StartNew(() => ReceivePackets(cancellationToken), cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}

trace - no wait - long running

With added 500ms delay after StartReceivePackets: trace - 500ms wait - long running

1iveowl commented 7 years ago

For reference, here are the same observations with Windows 10 Desktop:

Here are the three traces with time. w10 - trace - no wait

Changed to long running: w10 - trace - no wait - long running

With added 500ms delay after StartReceivePackets: w10 - trace - 500ms wait - long running

1iveowl commented 7 years ago

On rare occations I've also seen things fail on Windows 10 Desktop. Since it was so rare I've so far discharged this. However as I did the experiments above the whole things was very controlled and I saw Windows 10 Desktop fail.

This happened with no wait after StartReceivePackets and using Task.Run(() => ...) w10 - trace - no wait - failed

I can't help thinking that there is some timing or Race Condition at play here.

chkr1011 commented 7 years ago

Thank you. Can you maybe test with the server from this project? I want to see of the server receives the CONNECT packet. Maybe it is a network problem and not a thread or implementation issue.

1iveowl commented 7 years ago

The test I've run are all pointing to test.mosquitto.org.

I've also tested against MQTT Mosquitto server on my local network. I'm running all on Ethernet. No wifi.

Here's the test with broker.hivemq.com: local

Here the test with a Mosquitto MQTT on my local network. I can access this server using mosquitto_sub -h 192.168.0.205 -t '#' from the very same PC: broker hivemq com

I am pretty comfortable that this is not a network issue.

I can try your server, but that will have to be later as I've not used it yet and need understand how it works.

1iveowl commented 7 years ago

From the WireShart trace above, the remote server sends a CONNACK. Would it do this if no CONNECT was send?

The strange things is that I don't see a CONNECT in the Wireshark trace.

1iveowl commented 7 years ago

OK. That was easy enough.

Here's the result:

server

client

From the timing it looks as if the client makes a connection. Then the client just hangs there 12 seconds before sending the CONNECT. The server responses correctly to the CONNECT and replies with CONNACK, but then the client closes the connection. The question is why?

So far how I understand it, the client is deadlocking and timing out . I just don't understand why.

chkr1011 commented 7 years ago

I know the flow but I want to be 100 % sure :)

I will install the emulator also on my machine in the evening. Then I can test on my own. Can you maybe try to deploy the UWP test app from this repository and test with that one?

1iveowl commented 7 years ago

I changed SendPacketsAsync to this and the issue evaporated!

public async Task SendPacketsAsync(TimeSpan timeout, CancellationToken cancellationToken, IEnumerable<MqttBasePacket> packets)
{

    try
    {
        byte[] buffer;

        using (var bufferStream = new MemoryStream())
        {
            foreach (var packet in packets)
            {
                MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), "TX >>> {0} [Timeout={1}]", packet, timeout);

                var writeBuffer = PacketSerializer.Serialize(packet);

                bufferStream.Write(writeBuffer, 0, writeBuffer.Length);
            }
            buffer = bufferStream.ToArray();
        }

        await _channel.SendStream.WriteAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);

        if (timeout > TimeSpan.Zero)
        {
            await _channel.SendStream.FlushAsync(cancellationToken).TimeoutAfter(timeout).ConfigureAwait(false);
        }
        else
        {
            await _channel.SendStream.FlushAsync(cancellationToken);
        }
    }
    catch (TaskCanceledException)
    {
        throw;
    }
    catch (MqttCommunicationTimedOutException)
    {
        throw;
    }
    catch (MqttCommunicationException)
    {
        throw;
    }
    catch (Exception exception)
    {
        throw new MqttCommunicationException(exception);
    }
}
1iveowl commented 7 years ago

Is the above an acceptable solution? Do you want me to create a PR?

JanEggers commented 7 years ago

no for me it is not acceptable, the lock is gone. if there are two concurrent write operations the networkstrem gets corrupted. anyway at least we know whats the direction we have to investigate.

1iveowl commented 7 years ago

You are right. I would argue that the former solution with the lock had the same issue. In the original form the Task was composed with ContinueWith inside the lock, but the actual execution of the composed task was outside the lock at await _sendTaskand hence you had the same issue with a risk of multiple concurrent threads trying to write to the stream.

What I think we need here is a ConcurrentQueue or something. If you agree I'll look at this. I've created something similar in the Websocket Client I've created.

JanEggers commented 7 years ago

it was fine by storing the sendtask and use continue with there may be only one send active at a time. i cant get the uwp testapp running on my machine :(

chkr1011 commented 7 years ago

I am wondering. I assume that the Lock is useless because the library is not thread safe at all. So you should Lock Access to it on your own.

I am thinking abt the continue with. If we move write and flush to one continuation it should be fi

1iveowl commented 7 years ago

I'm beginning to think that the issue was that the lock was on the _channel which holds both the Write and Read Stream. Hence the Race Condition.

If you want to make sure that only one Write is taking place at the time a ConcurrentQueue and some pump watching this queue seems like a more clean solution in my view. Here is what I did in another library.

JanEggers commented 7 years ago

i am investigating on system.task.threading.channels for msg buffer which looks promising but is alpha. the idea with the lock context might be right. can you verify the original code with a separate lock object and see if that fixes the issue?

chkr1011 commented 7 years ago

The continuation is the Queue then because the Task is held in a field.

1iveowl commented 7 years ago

@chkr1011 The combination of the Task held in a field and locking on _channel, seem to have a queuing effect. But I'm not sure that it all crystal clear to me how it all works together. And also, the design clearly have some negative side-effect on UWP. Side-effect that shoe themselves consistently on Windows 10 Mobile and shows itself occasionally on Windows 10 Desktop.

@JanEggers Yes, I was thinking the same thing. I will investigate.

I still think that the code would be more clear if all Stream Writes from all threads was going into one ConcurrentQueue and then being executed one by one. This is essentially what we are trying to accomplish, right?

chkr1011 commented 7 years ago

I am about 100 % Sure that we can avoid the Lock and the continuation is a suitable queueing mechanism. A separate Queue with a thread seems overkill to me because ist is similar to packet awaiter. Because the caller still needs the exception. So

1iveowl commented 7 years ago

If I understand the code right, there are currently three thread being spun up here.

All three thread have code paths that lead them through: SendPacketsAsync.

SendPacketsAsync contains code that writes to the same stream. With multiple threads trying to write to the same stream you need some way of handling concurrency. I think we agree on this?

Looking more careful at SendPacketsAsync I'd say it has two parts:

1) a compose part, which defines a sequence and order of the byte that go into a single message. The current mechanism used for composing this sequence is Task.ContinueWith(...).

2) an executing part, where the actual writing to the stream occurs. This currently happens at await _sendtask.

Step 2) is the critical one, because it is here that the sequence is run and something is being written to the Stream. Step one doesn't do anything. Step one only defines what is to be done in step 2).

Step 1) resides inside the lock. There are no locks involved with the second step, which is good, because locking over await task is an anti-pattern.

I don't know if a ConcurrentQueue is overkill or not. I guess only performance testing will show, if throughput performance is the goal. My point was, that conceptually what we are trying to create here is some sort of a queue. A queue where only one message can be written to the Stream at a time.Stream.WriteAsync is not thread safe, so therefore we need to manage this concurrency somehow.

Also, we probably want the order of messages to be FIFO so that connect comes before connact etc.

Is this outline representative for what we are trying to accomplish?

1iveowl commented 7 years ago

You can see my summary above expressed in code here #47

Let me know what you think. I've tested it for the issue at hand for UWP across Desktop and Phone. And it works.

Could be interesting understand how it performs compared to the current solution.

chkr1011 commented 7 years ago

@1iveowl Yes that is correctly explained.

I was able to install the Windows Mobile 10 Emulator and test it on my own. The UWP test app from this project works without problems. I tested it with broker.hivemq.com.

I have reviewed your pull request but to me it seems too much to solve the issue. I fixed it in my way in the current develop branch. I tested it with the UWP test app in the emulator again and it still works.

Can you please test this version as well? Also with multiple threads?

@JanEggers Do you want to review my change also? In my opinion this should work even with multiple threads because the continuations are not executed in parallel. The only thing which can happen is that first 2 buffers are written and then the flush method is called 2 times in a row but I assume this is fine because at the second time there is no data or a fully written packet in the buffer.

1iveowl commented 7 years ago

@chkr1011

I just tested this and unfortunately it does not work: waittask

Did you test in Release Mode? Or rather with Native Tool Chain?