dotnet / runtime

.NET is a cross-platform runtime for cloud, mobile, desktop, and IoT apps.
https://docs.microsoft.com/dotnet/core/
MIT License
15.01k stars 4.67k forks source link

Dataflow - SendAsync - Loses Messages #1177

Closed abbotware closed 4 years ago

abbotware commented 4 years ago

When I setup a pipeline that branches I lose messages if the buffer capacity is lower than the messages published even though await SendAsync always returns true

the scenario is fairly boiler plate:

[producer]-> [BufferBlock] -> [TransformBlock] -> [ActionBlock]
                           -> [TransformBlock] -> [ActionBlock]

Everything uses completion propagation.

The buffer block is unbound, but the transform block have a capacity of 5. If I publish 20, then send complete to the buffer block, only 5 messages make it through to the transform blocks 15 messages seem lost.

The same pipeline works as expected (processes 20 message) when the 2nd branch is removed.

The lost messages are unexpected.

I would expect the following to happen on the producer:

  1. SendAsync to return false if messages are to be lost OR
  2. SendAsync to block until messages can be sent (ie back pressure)

Is there something fundamentally wrong in my understanding? Or is this a bug? ( I should be able extract out a small example with reproducibility)

Side Question - Do I need a BroadcastBlock? or is linking 1 Source to 2 Targets equivalent? (it seems the same from my tests)

Clockwork-Muse commented 4 years ago

SendAsync will always return immediately - with a Task<bool>. Are you not checking the status of the returned task? (Without having used the library myself, I'd bet that SendAsync is returning incomplete tasks, which you could use to do things like implement backpressure for submitting to the BufferBlock)

A simple repro will help here, if you could please provide one.

scalablecory commented 4 years ago

Please provide a small repro that demonstrates the issue.

abbotware commented 4 years ago

@Clockwork-Muse I am doing await on the SendAsync @scalablecory - working on small sample that repro's the issue

abbotware commented 4 years ago
async Task Main()
{
    // 1 buffer -> 1 pipeline - at the end abCount is 10 (expected)
    {
        var abCount = 0;
        var bb = new BufferBlock<int>();

        var lo = new DataflowLinkOptions { PropagateCompletion = true };

        var edbo = new ExecutionDataflowBlockOptions();
        edbo.BoundedCapacity = 5;

        var tb1 = new TransformBlock<int, string>(x => x.ToString(), edbo);
        var ab1 = new ActionBlock<string>(x => Interlocked.Increment(ref abCount), edbo);

        var link1 = bb.LinkTo(tb1, lo);
        var link2 = tb1.LinkTo(ab1, lo);

        for (var i = 0; i < 10; ++i)
        {
            var r = await bb.SendAsync(i);

            if (!r)
            {
                throw new Exception("Send Failed");
            }
        }

        bb.Complete();
        await ab1.Completion;

        abCount.Dump();
    }

    // 1 buffer -> 2 pipelines - at the end abCount is only 5 (unexpected!)
    {
        var ab1Count = 0;
        var ab2Count = 0;

        var bb = new BufferBlock<int>();

        var lo = new DataflowLinkOptions { PropagateCompletion = true };

        var edbo = new ExecutionDataflowBlockOptions();
        edbo.BoundedCapacity = 5;

        var tb1 = new TransformBlock<int, string>(x => x.ToString(), edbo);
        var ab1 = new ActionBlock<string>(x => Interlocked.Increment(ref ab1Count), edbo);

        var tb2 = new TransformBlock<int, string>(x => x.ToString(), edbo);
        var ab2 = new ActionBlock<string>(x => Interlocked.Increment(ref ab2Count), edbo);

        var link1 = bb.LinkTo(tb1, lo);
        var link2 = tb1.LinkTo(ab1, lo);

        var link3 = bb.LinkTo(tb2, lo);
        var link4 = tb2.LinkTo(ab2, lo);

        for (var i = 0; i < 10; ++i)
        {
            var r = await bb.SendAsync(i);

            if (!r)
            {
                throw new Exception("Send Failed");
            }

        }

        bb.Complete();
        await ab1.Completion;
        await ab2.Completion;

        ab1Count.Dump();
        ab2Count.Dump();
    }
}

// Define other methods and classes here
abbotware commented 4 years ago

attached code - made it quickly using linqpad

The first version seems to work as expected - (Is the await on SendAsync causing the producer to wait until the the consumer is available since its buffer is only 5?) (i call this implicit back pressure since I await on each one)

The second version seems to overflow the consumers? - the only thing different is buffer is linked to 2 blocks instead of one.- If I set the bounded capacity to greater than 5 - all messages are received by both blocks. However - that seems to mean I can no longer use the implicit await back pressure?

Or is my assumption in example 1 wrong?

I have been linking pipelines together with each block having different capacities and never had any issues - the producer's await on the SendAsync seemed to cause the entire pipeline to have built in 'flow control'. (This is the first time I am trying branch a stream into 2 different pipelines)

@scalablecory @Clockwork-Muse

abbotware commented 4 years ago

ok - i think i see what is happening. Is the buffer block is load balancing? (hence my 'side question' above) - ab1count + ab2count = # of messages sent

In either case, in my original code I had a broadcast block I took it out since it seemed to not matter.

see below the problem happening with the broadcast block back in place.

// 1 buffer -> [broadcast] -> 2 pipelines - at the end abCount is only 5 (unexpected!)
    {
        var ab1Count = 0;
        var ab2Count = 0;

        var edbo = new ExecutionDataflowBlockOptions();
        edbo.BoundedCapacity = 5;

        var buffer = new BufferBlock<int>();

        var broadcast = new BroadcastBlock<int>( x=> x);

        var lo = new DataflowLinkOptions { PropagateCompletion = true };

        var tb1 = new TransformBlock<int, string>(x => x.ToString(), edbo);
        var ab1 = new ActionBlock<string>(x => Interlocked.Increment(ref ab1Count), edbo);

        var tb2 = new TransformBlock<int, string>(x => x.ToString(), edbo);
        var ab2 = new ActionBlock<string>(x => Interlocked.Increment(ref ab2Count), edbo);

        var link0 = buffer.LinkTo(broadcast, lo);

        var link1 = broadcast.LinkTo(tb1, lo);
        var link2 = tb1.LinkTo(ab1, lo);

        var link3 = broadcast.LinkTo(tb2, lo);
        var link4 = tb2.LinkTo(ab2, lo);

        for (var i = 0; i < 100; ++i)
        {
            var r = await buffer.SendAsync(i);

            if (!r)
            {
                throw new Exception("Send Failed");
            }

        }

        buffer.Complete();
        await ab1.Completion;
        await ab2.Completion;

        ab1Count.Dump();  
        ab2Count.Dump(); 
    }

The only way I can get 'all messages' to both consumer pipelines is if I set the bounded capacity to match the number of messages sent, but It would be impossible for me to know this ahead of time..

Clockwork-Muse commented 4 years ago

If you want each element to be processed by both pipelines, you have to use a BroadcastBlock (It copies the elements to all linked targets - although the documentation's "any" seems misleading). Otherwise, elements are greedily consumed, probably only by the first block.

I'm not sure why your last example only handles the first 5 elements, though. Adding a short delay before buffer.Complete(); actually gets it to handle a sixth element - 99, the last one. This implies the Broadcast block is just overwriting the "current" element, but the length of the delay doesn't appear to matter, and it's always 6 elements handled. It also seems contrary to the expectation in the documentation:

... ensures that the current element is broadcast to any linked targets before allowing the element to be overwritten.

... which at least implies it should be passing along all elements. So where did the middle 94 go?

abbotware commented 4 years ago

@Clockwork-Muse - so there is something strange going on?

Not sure what is the point of the broadcast block if it causes messages to be lost... I expected it to be a 1 to many branch - but seems a little dangerous to use if it can't be deterministic.

I believe I can work around this by replacing the broadcast block with something like:

for (var i = 0; i < 100; ++i)
{
   await tba1.SendAsync(i);
   await tba2.SendAsync(i);
}

Doing this will get the behavior I am expecting from the single pipeline, at some reduced parallelism Since if tba1 is slow, tba2 will be blocked before it get messages

Clockwork-Muse commented 4 years ago

I don't know. I've never used the TPL, but the little documentation I've read so far would seem to support your expectation. I'm hoping somebody like @stephentoub can shed some light on things.

AlgorithmsAreCool commented 4 years ago

@abbotware

Hey, i ran your example and everything seems to be working as expected. Here is the code i ran : https://gist.github.com/AlgorithmsAreCool/fe01df36be764bf444238072fb29a351

I almost never use bounded capacity and it always works ok for me.

Clockwork-Muse commented 4 years ago

@AlgorithmsAreCool - Uh, the point is mostly about the bounded capacity, so leaving it out kinda negates the point.

scalablecory commented 4 years ago

For BufferBlock, the value will be sent to the first linked target that has space (non-full queue) available to accept it, not to every target.

For BroadcastBlock, the value will be sent to every target that has space (non-full queue). If a target doesn't have space, it won't be retried and that target will never observe that value -- there is no backpressure.

There is no block that will clone a value to every target, waiting if there is backpressure.

abbotware commented 4 years ago

@scalablecory - Thank you for your explanations - That clears up my confusion on the blocks and explains the behavior I was observing. I would highly suggest adding those sentences to the class definitions on the MSDN pages as the behavior of full / vs non-full target queues is not described anywhere that I have seen.

https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.bufferblock-1?view=netcore-3.1 and https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.broadcastblock-1?view=netcore-3.1

abbotware commented 4 years ago

It does seem a little disappointing that there is no 'safe' way to split a stream into multiple streams out of the box. This seems like the type of problem that DataFlow was designed to solve.

I think it is trivial to create a 'back pressure broadcast' (call it Fan-out? - since I think this is the MQ equivalent) - In my example above I just have to await on each target send.

The question I would have is in this type of broadcast would I need buffer blocks per target? or just targets with bounded capacity... I have always been confused me in DataFlow: BufferBlock vs BoundedCapacity

It seems (based on the above explanation) that the only time I would ever want a BufferBlock is in a load balancing scenario... where there are multiple targets, and some may be busy. Is this correct?

If an ExecutionBlock has an internal BoundedCapacity, why would I ever use a BufferBlock in front? It seems redundant and possible latency source due to the extra operations involved.

Clockwork-Muse commented 4 years ago

@abbotware - probably what you want to do is make the BroadcastBlock also/instead have the same bounded capacity, which would solve the issue, one assumes.

AlgorithmsAreCool commented 4 years ago

@abbotware

You are right, pretty much every block, including execution blocks already has a built in buffer. Bufferblocks are usually not needed.

But broadcast blocks are the pain point.

If you peek inside of the implementation you can see that if a block declines or postpones a message that the broadcast block offers, it just skips it.

Maybe i'm misunderstanding but I don't see how setting bounded capacity would help here.

I did find this article that explains how to get a broadcast style block with backpressure implemented. https://jack-vanlightly.com/blog/2018/4/19/processing-pipelines-series-tpl-dataflow-alternate-scenario

Another alternative is to implement a linear dataflow with no broadcast blocks and just pass along a union structure that can represent all branches.

abbotware commented 4 years ago

@Clockwork-Muse - setting the broadcast block capacity to match the 2 attached transform blocks does not make a difference. It is just losing messages because they are full.. as @AlgorithmsAreCool pointed out - this is by design - so my question has been answered for the most part.

@AlgorithmsAreCool - I have seen that article in the past - but it makes more sense in light of my new understanding. Its clever - but feels like its a hack for a missing block type. Also seems like it would be a lot more overhead to to dynamically change the dataflow at run time.

I suppose I was hoping DataFlow 'blocks' were a more complete API - it feels like its only ~70% there :-)

scalablecory commented 4 years ago

I would highly suggest adding those sentences to the class definitions on the MSDN pages as the behavior of full / vs non-full target queues is not described anywhere that I have seen.

I agree, this could be better documented. Would you like to submit a PR against our https://github.com/dotnet/dotnet-api-docs repo?

It seems (based on the above explanation) that the only time I would ever want a BufferBlock is in a load balancing scenario... where there are multiple targets, and some may be busy. Is this correct?

If an ExecutionBlock has an internal BoundedCapacity, why would I ever use a BufferBlock in front? It seems redundant and possible latency source due to the extra operations involved.

This sort of load balancing is principally what I've used BufferBlock for, though there may be some other uses that escape me.

I suppose I was hoping DataFlow 'blocks' were a more complete API - it feels like its only ~70% there :-)

If you have an API suggestion and some good scenarios you can detail, feel free to submit a new issue. This sort of "BroadcastBlock with backpressure" doesn't seem unreasonable to me.

AlgorithmsAreCool commented 4 years ago

@abbotware

Another option, (one that i currently use), is to abandon Dataflow.

Honestly, you can get decent workflow functionality with just Channels and Tasks running on the thread pool.

Dataflow is a really cool library, but it is pretty complex, even something as conceptually simple as the broadcastBlock is 1200+ lines of very tricky code to implement.

Here is a super untested and incomplete example of what i have used in place of dataflow. I wrote this in a few minutes and it optionally supports backpressure on broadcasts.

https://gist.github.com/AlgorithmsAreCool/492564e6baec44eea0c95f3e8b4d0941

The primitives are much easier to understand.

Of course this doesn't try to implement things like per-stage concurrency (although that wouldn't be too hard to implement) or custom task scheduling which TPL does, but it is also like 5% of the code.

abbotware commented 4 years ago

@scalablecory - already planning to do that

@AlgorithmsAreCool - Hah... I had considered rolling my own similar to your suggestion - but what attracted me to DataFlow was that it looked like it provided the 'blocks' to already do many things that I would want / need. It definitely has some gotchas (ie this issue is an example) and complexity..

I have not seen Threading.Channels before.. but I have seen IO.Pipelines, Rx.. those along with DataFow seem to be competing to solve the the problem: "data stream processing within a single process" Out of process can always be handled by an ESB / MQ platform

I may still write opt write my own, but only if doing so provides a significant boost in latency.

On the whole though - the determinism that DataFlow provides using bounded capacity and and per stage concurrency is exactly what works well. As I try to do 'fancier' things is where the cracks start to show.

AlgorithmsAreCool commented 4 years ago

@abbotware I will say that my internal implementation of "dataflow" using Channels was a fair bit faster than traditional Dataflow for high throughput workflows. The internals of Channels are all using more modern async code than Dataflow is.

But, RX is just plain old method calls, if you don't need parallel processing then RX will be the lowest latency solution generally.