dotnet / runtime

.NET is a cross-platform runtime for cloud, mobile, desktop, and IoT apps.
https://docs.microsoft.com/dotnet/core/
MIT License
15.47k stars 4.76k forks source link

BlockingCollection can get back to uncompleted state even after `IsCompleted` was true #109217

Open Mad-Lynx opened 1 month ago

Mad-Lynx commented 1 month ago

Description

We have a Thread that consumes some data from the BlockingCollection, as some point in time, we will finish adding elements and we want to wait for the consumer to process all the data. So the natural thing is to check if IsCompleted is true - as it indicate if the collection is empty - so all elements were processed.

At some cases processing the queue can be long operation, so we just want to cancel it - even if we didn't process all elements.

Turns out if you pass the CancellationToken to GetConsumingEnumerable and cancel it, even after IsCompleted is true, at some point the element will be "added back" and your collection will get back to the uncompleted sate.

Reproduction Steps

t - represent the consumer... not the correct implementation, just for the simplicity we ignore all errors and start reading over - so no need to creating new every loop

using System.Collections.Concurrent;

CancellationTokenSource cts = null;
BlockingCollection<object> collection = null;

var t = new Thread(() =>
{
    while (true)
    {
        try
        {
            foreach (var _ in collection?.GetConsumingEnumerable(cts?.Token ?? default) ?? [])
            {
            }
        }
        catch (Exception e)
        {
            //
        }
    }
});

t.IsBackground = true;
t.Start();

// Arrange
for (var i = 0; i < 100; i++)
{
    cts = new CancellationTokenSource();
    collection = new BlockingCollection<object>(1000);

    collection.Add(new object());
    collection.CompleteAdding();

    SpinWait spin = default;
    while (!collection.IsCompleted)
    {
        spin.SpinOnce();
    }

    cts.Cancel();

    var counter = 50;
    spin.Reset();

    while (collection.IsCompleted && counter > 0)
    {
        counter--;
        spin.SpinOnce();
    }

    if (collection is { IsCompleted: false })
    {
        throw new Exception($"Error! Colection is not completed anymore! {i}-{counter}");
    }
}

Console.WriteLine("All good!");
GC.KeepAlive(t);

Expected behavior

I know that reacting to faulty removal or cancellation token (as we don't want to process with the data) should NOT remove the element from the collection and keep it still uncompleted.

So I see two options: 1) the collection.IsCompleted will stay true even after we cancelled the token - that means that if we already removed element (and it's successful) and it's gonna be returned to the consumer, we will not mess up with the IsCompleted state. 2) the collection.IsCompleted should not be set to true if we are still processing element, and there is possibility that it will get back to the queue - only when we are returning to the consumer and we know that will be the last element, the IsCompleted will become true

Actual behavior

collection.IsCompleted will change to false even if it was set before to true.

Regression?

Don't think it's regression, as the same behavior was in previous versions.

It's caused by this line: https://github.com/dotnet/runtime/blob/a0fdddab98ad95186d84d4667df4db8a4e651990/src/libraries/System.Collections.Concurrent/src/System/Collections/Concurrent/BlockingCollection.cs#L693 But cannot find the reference to the "bug #702328" - so don't know what was real reason to put that check there.

Known Workarounds

Two possibilities: 1) put delay before setting the cancellation or setting the cancellation with the timeout (so it will not fire immediately) 2) not use token in GetConsumingEnumerable, do the check manually inside the loop.

Configuration

No response

Other information

No response

dotnet-policy-service[bot] commented 1 month ago

Tagging subscribers to this area: @dotnet/area-system-collections See info in area-owners.md if you want to be subscribed.

theodorzoulias commented 1 month ago

Your repro is not perfect because the mutable variables cts and collection are accessed by two different threads without synchronization. Nevertheless after fixing this flaw the bug is still reproducible.

Mad-Lynx commented 1 month ago

Your repro is not perfect because the mutable variables cts and collection are accessed by two different threads without synchronization.

That's not my repo, that's simple repo-steps which I mentioned above:

t - represent the consumer... not the correct implementation, just for the simplicity we ignore all errors and start reading over - so no need to creating new every loop

I'm assuming you are referring to the fact that cts and collection are replaced each loop 😉 As that is the whole point of it - otherwise you will have to run it as test and run till failure as it's not replicable in single pass.


Even assuming that would be production code, I don't see issue there - as you read instances on threads t, but change the values in the beginning of each loop - so the t could have "old" data but that's not relevant, because the old data will never be checked by the loop anyway. And as reference assignment atomic operation, so there is no way threads t could read only partially the reference to cts or collection. So don't see a real reason for synchronization. (but I digress from the main topic)

Clockwork-Muse commented 1 month ago

And as reference assignment atomic operation, so there is no way threads t could read only partially the reference to cts or collection. So don't see a real reason for synchronization.

Assignment to one reference is atomic, so no, the individual reference won't be torn. Assignment to both references is not atomic, and can be re-ordered, so you can have mismatches between cts and collection. And even then, might not be picked up immediately on write anyways (CPU cache), which is a separate problem.

That's the problem @theodorzoulias is referring to.

theodorzoulias commented 1 month ago

Even a single assignment is problematic, because the compiler/Jitter/processor is allowed to reorder the instructions of a program in a way that the cts or the collection could point for a brief period of time to a partially initialized object. This is the problem that the volatile keyword solves, by adding appropriate memory barriers that prevent such a reordering to occur.

In my tests I fixed your repro (=reproduction proof) by using a record class State(CancellationTokenSource CTS, BlockingCollection<T> Collection) and declaring a volatile field of this type. This makes the repro more verbose, that's why I am not sharing it, but the bug emerges regardless. It's a genuine bug, not a product of a flawed repro.