Closed AlbertSchreuder-0 closed 6 months ago
The term split seems like maybe you mean "route"? Wouldn't split just "odd"/"even" split?
First, love this library. We use it for all our batch processes. We have this PR implemented in an internal library to support a few scenarios, but in general it goes like this... We have a stream of records that all need to go to some final destination. Along the way, some of them need to be enriched by an expensive operation in batches. Others need to be enriched in batches by other expensive operations.
If we didn't need to batch, we could do the need-to-enrich check in the processing code and just skip it. Filter works to identify and process records that need enrichment but we lose the ones that were already enriched.
Names are hard. We chose split/merge in hopes it wouldn't be confused with batch/join, but we'd be happy no matter how you name them.
Ok so... There is a point, where I have to ask, (especially with more complex scenarios) if you should be using DataFlow instead?
As much as I love Channels, I have no qualms about using DataFlow. Love it. Way more versatile, and has some significant advantages.
That said, if we can get it right, I'm open to adding these extensions.
I had looked at DataFlow, but channels seemed easier to use and offered everything we needed. Then we discovered this library and things got even easier. I don't think our use cases are very complex. This library provides everything we need except this one use case. It's really just Filter, but with a way to access the unmatched items.
Sorry if I disappeared for the weekend. ;)
Working with Channels can have some quirks if you're not careful. There's a concept I call "reverse pipe-lining" where you need to prioritize the end (tail) of the pipe instead of the beginning (head). Open.ChannelExtensions covers a lot of different scenarios, but really doesn't allow for easily building tail-priority based pipelines. You are more likely to see "surges" in the channels than "flow". The channel can easily "fill" before being drained. Where as DataFlow can manage priorities a bit better and can act as a more "fluid" system.
Parallelism is baked in to DataFlow. Where it's kind-of an 'add-on' with Channels.
Next up, let's review these scenarios:
In simple terms, you have two (or more?) channels that you want to dump into a single output channel.
Questions:
I would strongly suggest No. 2, as then you won't need to worry about creating a new buffer.
I imagine a LinkedList<ChannelReader<T>>
as the underlying source and as each channel closes, you would remove it.
The priority problem will be interesting. Maybe it's just a matter of Task.WhenAll( ... WaitToReadAsync )
for all available source channels. Or you simply loop through with TryRead
to get the next item.
Thoughts?
To keep it simple, we limited to two channels. We were thinking of it as the compliment to Split
but even in a general sense, you could always chain calls: c1.Merge(c2).Merge(c3)
- How do you prioritize that? Does one channel take precedence? Do you continuously alternate between them?
We (naively) gave precedence to the source channel, calling ReadAllAsync
followed by ReadAllAsync
on the channel passed as a parameter to Merge. I think this will be an issue. In Split
we created an unbounded channel which could lead to an OOM issue if we see a large number of "unmatched" items from the Split predicate. If we create that channel bounded, we could deadlock.
- Does it need to a buffer? Or can you simply use a "reader" to "pull" from the merged channels? Depending on a free thread to do the underlying work has it's own issues.
We created a new merged channel (returning its reader) and wrote each source channel as described above. I'm thinking it would be better to avoid creating the new channel, and instead return a new MergedChannelReader
that holds the source channels and overrides TryRead
, calling TryRead
on each of the underlying channels. That should prevent the source channels from backing up or deadlocking.
Does that seem like a better approach?
Thanks, -Ed
Yeah. One feature at a time:
Let's try a .Merge extension method that takes any IEnumerable<ChannelReader<T>>
and returns a ChannelReader<T>
.
I'll see what I can mock-up tonight.
Here's my .Merge proposal: https://github.com/Open-NET-Libraries/Open.ChannelExtensions/pull/36
Thoughts?
Important must haves (included):
Just for thought, let's consider how .Split should be done. I definitely don't like the out parameter. I'm also not excited about configuring and creating another channel.
Note that there are already Read methods than provide reasonably means of doing this and include a long
index to decide on how they should be split.
Now consider what you want... .Route? .Split? .Unzip?
An .Unzip method would simply "Route" based upon index.
The predicate style .Split you are proposing can be done with the .Filter method and passed on to another channel while returning false so that you would only have to have one more channel to "dump" to .
Brainstorming for now. I really like where .Merge may end up. I'm a little concerned about split because there are so many nuanced scenarios.
Just for thought, let's consider how .Split should be done. I definitely don't like the out parameter. I'm also not excited about configuring and creating another channel.
Those were the two options we considered, and chose the out param as it seemed easier for the caller. I realize there is a risk that the caller ignores it, which is why we chose a new Split
method rather than overloading Filter
. We considered returning multiple channels, but that would have broken the fluent api we love so much.
Note that there are already Read methods than provide reasonably means of doing this and include a
long
index to decide on how they should be split.
Sorry, I'm not following this. Which Read methods?
Now consider what you want... .Route? .Split? .Unzip? An .Unzip method would simply "Route" based upon index. The predicate style .Split you are proposing can be done with the .Filter method and passed on to another channel while returning false so that you would only have to have one more channel to "dump" to .
For our use cases we need to inspect the properties of the item in the pipeline to determine whether or not an action is taken on it in that stage, so I think that means the predicate style. But all items need to be available to later stages and since Filter discards unmatched items I can't see how to use it as is. I can't think of a scenario where unzip (route by index) would be useful to me. I'm imagining Route to be more complex and could be accomplished by a series of Splits.
Brainstorming for now. I really like where .Merge may end up. I'm a little concerned about split because there are so many nuanced scenarios.
I realize there are many ways to use this library, but the pattern I have found very powerful and easy to reason about is defining a pipeline with a single fluent statement starting with an IAsyncEnumerable<t>.ToChannel()
, ending with a ReadAll, and Pipe and Batch calls in the middle as required. Manually setting up additional channels would make the code more error prone and difficult to read (we used to manually set up channels until we found this library).
Your merge PR looks cool - very brave of you to use goto ;) With the proposed Merge
signature extending IEnumerable<ChannelReader<T>>
I'm having trouble imagining how we could split/route and then merge in a fluent way.
Ha! Brave using goto. Nah... Sometimes, while or do/while just aren't ideal.
Ok, so you're trying to stick with fluent API and want it to work that way.
IMO, if there's an out
parameter, you just killed that.
If you look how it's done with DataFlow, you have another "target" that you dump to went receiving an item. It gets complicated because it has to. :(
Experimenting with .Split/.Route... I'm finding it challenging, because what if one of the channels is closed? How does the predicate/route-function handle that?
I only want to add extensions to this lib that provide significant and reliable value to cover either the majority use case, or flexible enough to cover many use cases.
Experimenting with .Split/.Route... what if one of the channels is closed?
If we just consider split (two routes) then I think Filter
could be the model for half the solution. Split
could return a SplittingChannelReader
whose TryRead
looks like the one from Filter
but when the predicate doesn't match, the item would be written to the "other channel". If that other channel was created and managed internally (like ToChannel
) would you have the same concern about it being closed? That would allow the items matching the predicate to continue fluently and leave the problem of how to expose the other reader.
What do you want the signature of .Split/.Route to look like? What is your biggest concern with the out parameter?
Ok. I did some more work on the .Merge method and I think you'll like it a lot. Here's the release: https://www.nuget.org/packages/Open.ChannelExtensions/8.1.0
2 options:
ChannelReader<T>
can be combined into one..Merge
extension for any ChannelReader<T>
to allow for method chaining.Calling .Merge
on a ChannelReader<T>
creates a MergingChannelReader<T>
that can subsequent .Merge
calls to build up a resultant single merged reader, but it's not ideal as it still hooks up and waits on Tasks for each Merge call. But is still available as a convenience. Either way, channels are not "disposed" of and readers are only attached to their parents, so if you throw everything away, the GC should pick it up no problem.
I'm still very resistant to implement a .Split method because it simply hasn't "clicked" for me. I'll give it some more thought.
Thanks for Merge
and for thinking about split. If it helps, all we need is a way to get at the "unmatched" items from Filter
.
Looking at your Split implementation...
1) You should almost never have an unbounded channel. Regardless of upstream or downstream. Apply some limit even if it seems high but prefer an 'estimate' of what is a theoretical maximum. Sometimes it can be advantageous to have a smaller bounds in order to create "flow" in your system. Without it you can have "surges" or "dumps" into a channel before the downstream picks up the entries. I'd suggest if you keep the extension, name it .SplitUnbound
so it's more obvious what it does.
2) There are simply too many configuration possibilities with this Split/Fork/Route/Sort/Filter (numerous synonymous names). It's also very opinionated which one should be the unmatched vs matched reader to be coming from the return or from the out param. If it was me, I would have called it .FilterOut
and then it would make more sense that the unmatched reader is the one the has the rejected items. As you've written, you're simply calling .ReadAllConcurrentlyAsync
to distribute the items coming out of the source channel.
IMO, the best way to handle this is with the delegate from .ReadAllConcurrentlyAsync
.
The question then becomes, why do you need two other channels to handle the split results?
I'm also still fighting with the notion that if one of the channels is bounded, it will essentially stop it for both.
Another way to approach this would be to add an overload of Filter
that accepts a ChannelWriter for unmatched items. The caller would then have to create the channel, but could do so fully aware of the use case. Later in the flow, they could be brought back together with new Merge
.
Or, whatever the method is called, accept channel config params much like ToChannel
does and out the reader.
Another way to approach this would be to add an overload of
Filter
that accepts a ChannelWriter for unmatched items.
Yeah, I've been thinking about that, but again, I don't want to code anything that would potentially demonstrate unexpected behavior. The .ToChannel
overloads do seem to fit the bill, but they only have to deal with one channel.
When you "split" you are dealing with two channels where one can back up the other and then you're wondering what happened.
You can't properly split using just a reader (like .Filter
does now) as there can be a race condition for when the source channel completes it may cause a channel closed exception.
So the best strategy is still to use .ReadAll
or something like it with two new channels, and let the source channel drain completely before closing the downstream channels.
Let's consider this adaption:
Almost exactly as you have it now, but let's say the channels are configurable and they have bounds of 100. As long as they are not left alone and something is consuming them both, then if one gets backed up, the other can re-commence consumption. So I'm starting to think the bounds issue is not that big of a deal, as long as you don't neglect one of them.
Gonna tinker a bit with it now.
Here's what I landed on. Thank you for sticking with me and being patient. I like where this ended up, and I can definitely see the usefulness. I adjusted the method signatures to be more like the lib while attempting to be very careful to expose the ChannelOptions
as these scenarios can get a bit tricky.
.PipeFilter
and .PipeFilterAsync
are what I decided because the term "Pipe" indicates transferring items to another channel. Quite often meaning 'creating a new channel' to get the job done. In this case, we are creating two channels.
https://github.com/Open-NET-Libraries/Open.ChannelExtensions/pull/39/files
https://www.nuget.org/packages/Open.ChannelExtensions/8.3.0
Closing this PR, but feel free to continue to comment. :)
Thank you so much for this. I'm glad you were able to find a way to avoid creating another channel externally. Last night I was fiddling with this trying to take inspiration from ToChannel
, Filter
and PipeTo
(between them they seemed to have all the bits), but I wasn't able to get it to run to completion. You made my day.
Yeah, it can be tricky to make sure you don't create a deadlock. If you can share your code, I can take a look.
Thanks, I appreciate the offer, but I ditched it and switched gears to incorporate PipeFilter/Merge into our batch processes. Should have it in production tomorrow. Thanks again, this is an awesome library.
Love to hear that you're switching. :) I think the big improvement you'll see is the "merging" reader. You won't need more channels created. :) Just remember that you'll definitely want to migrate to channels that are "bound" but you'll also want to stress test where you might encounter deadlocks.
Hmmm... This is interesting. Could you provide a real world example of use?