reaqtive / reaqtor

Reaqtor is a framework for reliable, stateful, distributed, and scalable event processing based on Rx.
https://reaqtive.net/
MIT License
627 stars 40 forks source link

[Proposal] Operator: Chunk #104

Open ronnygunawan opened 3 years ago

ronnygunawan commented 3 years ago

Link to .NET 6 Linq documentation

How it will work in Reaqtive:

Input    1    2        3    4        5    |
------------------------------------------------ Chunk(2)
Output        [1, 2]        [3, 4]        [5] |
bartdesmet commented 3 years ago

There is some functional overlap with Buffer which returns an IList<T> for historical reasons (see below), but we should aim for parity with traditional LINQ to Objects, so I approve of the proposal in principle.

One question could be what to do with Buffer overloads such as int count, int skip and the TimeSpan-based variants. For each such overload, do we provide a Chunk alternative that only differs in its result type? I.e. xs.Chunk(args) == xs.Buffer(args).Select(t => t.ToArray()) where Chunk only differs in its performance profile around allocating arrays and potentially just resizing the last one.

I'm thinking we can add Chunk(int size) (matching the parameter name from the Enumerable operator in .NET 6) and Chunk(int size, int skip) as both can be implemented by new T[size] allocations and a single Array.Resize for the last emitted chunk.

The time-based variants don't have such luxury because they can't predict the size of the chunks upfront, which explains why we ended up choosing IList<T> in the Rx days; we have to allocate a list anyway because we don't know the number of elements that will end up in time-based buffers, so we may as well return the list to the user and allow them to mutate it any way they see fit; it's theirs to party on (rather than us then copying it over to a T[] or even just returning it as an IEnumerable<T>). To make all of the Buffer operators consistent in the Rx days, we picked IList<T> for count-based and time-based operators. (We picked an interface to have some flexibility in how we implemented it, not necessarily being tied to List<T>.)

So, in conclusion, my proposal is to have:

IO<T[]> Chunk<T>(IO<T> source, int size);
IO<T[]> Chunk<T>(IO<T> source, int size, int skip);

where IO<T> is the concept of an observable in whatever shape or form it takes (ISubscribable<T> at the operator level, quoted variants higher up). And the operator implementation is based on new T[size] with a Resize for the last chunk. (As such, the implementation isn't a composition of Buffer with Select and ToArray.)

We should also do this work in http://github.com/dotnet/reactive for IObservable<T>, IAsyncEnumerable<T>, and IAsyncObservable<T>.