morelinq / MoreLINQ

Extensions to LINQ to Objects
https://morelinq.github.io/
Apache License 2.0
3.69k stars 413 forks source link

Multiple aggregations in a single iteration #326

Closed atifaziz closed 5 years ago

atifaziz commented 7 years ago

The idea I wish to propose/discuss is best illustrated through examples. Suppose you want to count and sum up the numbers in a sequence. With LINQ's Count and Sum, you'd do it like this:

var xs = Enumerable.Range(10, 10);
Console.WriteLine((Count: xs.Count(), Sum: xs.Sum()));

Suppose now you want to also get the average:

Console.WriteLine((Count  : xs.Count(),
                   Sum    : xs.Sum(),
                   Average: xs.Average()));

Unfortunately, using these methods individually means the source ends up being iterated multiple times, once per aggregation. You could use LINQ's Aggregate to do multiple aggregations while iterating through the source once:

Console.WriteLine(
    xs.Aggregate(
        (Count: 0, Sum: 0),
        (a, x) => (a.Count + 1, a.Sum + x)));

Console.WriteLine(
    xs.Aggregate(
        (Count: 0, Sum: 0),
        (a, x) => (a.Count + 1, a.Sum + x),
        a => (Count: a.Count,
              Sum  : a.Sum,
              Avg  : (double) a.Sum / a.Count))); // average for free

However, as you increase the number of aggregations, some of which may not be trivial, the code becomes less readable and the seeds & accumulators get pulled further apart. By adding Aggregate overloads that allow multiple seeds, accumulators and a projector of all results like below:

public static TResult Aggregate<T, TState1, TState2, TResult>(
    this IEnumerable<T> source,
    TState1 seed1, Func<TState1, T, TState1> accumulator1,
    TState2 seed2, Func<TState2, T, TState2> accumulator2,
    Func<TState1, TState2, TResult> resultSelector) =>
    source.Aggregate(
        (seed1, seed2),
        (s, e) => (accumulator1(s.Item1, e), accumulator2(s.Item2, e)),
        s => resultSelector(s.Item1, s.Item2));

one can write something more natural:

Console.WriteLine(
    xs.Aggregate(
        0, (count, _) => count + 1,
        0, (sum, x) => sum + x,
        (count, sum) =>
            (Count: count,
             Sum  : sum,
             Avg  : (double) sum / count)));

Let's add another two aggregations, like getting the minimum and maximum value in addition to the count, sum and average. With an overload like this:

public static TResult Aggregate<T, TState1, TState2, TState3, TState4, TResult>(
    this IEnumerable<T> source,
    TState1 seed1, Func<TState1, T, TState1> accumulator1,
    TState2 seed2, Func<TState2, T, TState2> accumulator2,
    TState3 seed3, Func<TState3, T, TState3> accumulator3,
    TState4 seed4, Func<TState4, T, TState4> accumulator4,
    Func<TState1, TState2, TState3, TState4, TResult> resultSelector) =>
    source.Aggregate(
        (seed1, seed2, seed3, seed4),
        (s, e) => (accumulator1(s.Item1, e),
                   accumulator2(s.Item2, e),
                   accumulator3(s.Item3, e),
                   accumulator4(s.Item4, e)),
        s => resultSelector(s.Item1, s.Item2, s.Item3, s.Item4));

one can write:

Console.WriteLine(
    xs.Aggregate(
        0, (count, _) => count + 1,
        0, (sum, x) => sum + x,
        int.MaxValue, Math.Min,
        int.MinValue, Math.Max,
        (count, sum, min, max) =>
            (Count: count,
             Min  : count > 0 ? min : (int?) null,
             Max  : count > 0 ? max : (int?) null,
             Sum  : sum,
             Avg  : (double) sum / count)));

Note how using this approach, the existing Math.Min and Math.Max can be used directly as accumulators. With C# 7's local functions, complex accumulators can be expressed simply, even generically, and reused locally:

int Add(int x, int y) => x + y;
Func<int, int, int> AddRhsAs(int y) => (x, _) => x + y;
T Collect<T, U>(T collection, U item) where T : ICollection<U>
{
    collection.Add(item);
    return collection;
}

Console.WriteLine(
    xs.Aggregate(
        0, AddRhsAs(1),
        0, Add,
        int.MaxValue, Math.Min,
        int.MinValue, Math.Max,
        new List<int>(), Collect,
        (count, sum, min, max, list) =>
            (Count: count,
             Min  : count > 0 ? min : (int?) null,
             Max  : count > 0 ? max : (int?) null,
             Sum  : sum,
             Avg  : (double) sum / count,
             List : "[" + string.Join(", ", list) + "]")));
fsateler commented 7 years ago

This sounds like a good idea!

atifaziz commented 7 years ago

@fsateler Glad to hear you like the idea. The API still needs a bit of work I feel because writing out an aggregation in-place, even something as simple as counting, means re-writing what's already there and could be tedious and error-prone. What's more, there's no opportunity for writing reusable accumulators. Here's what I am aiming for the next (possibly final?) shape of the API:

Console.WriteLine(
    xs.Aggregate(
        a => a.Sum(),
        a => a.Count(),
        a => a.Min(),
        a => a.Max(),
        a => a.Collect(new List<int>()),
        (sum, count, min, max, list) => new
        {
            Sum     = sum,
            Count   = count,
            Average = (double) sum / count,
            Min     = min,
            Max     = max,
            Content = "[" + string.Join(", ", list) + "]",
        }));

To achieve this, we need a type to represent an aggregation algorithm:

interface IAggregator<T, TResult>
{
    void Next(T item);
    TResult GetResult();
}

Notice that the seed is gone as that is now the responsibility of the IAggregator<,> initialization. Next come two seemingly bizarre definitions but bear with me on those for now; take them as a mere hack for type inference and more:

struct AggregatorSource<T> { }

partial class Aggregator
{
    public static AggregatorSource<T> AsAggregatorSource<T>(this IEnumerable<T> source) =>
        default(AggregatorSource<T>);
}

Now an Aggregate overload for five aggregations looks like this:

public static TResult Aggregate<T, TResult1, TResult2, TResult3, TResult4, TResult5, TResult>(
    this IEnumerable<T> source,
    Func<AggregatorSource<T>, IAggregator<T, TResult1>> aggregatorSelector1,
    Func<AggregatorSource<T>, IAggregator<T, TResult2>> aggregatorSelector2,
    Func<AggregatorSource<T>, IAggregator<T, TResult3>> aggregatorSelector3,
    Func<AggregatorSource<T>, IAggregator<T, TResult4>> aggregatorSelector4,
    Func<AggregatorSource<T>, IAggregator<T, TResult5>> aggregatorSelector5,
    Func<TResult1, TResult2, TResult3, TResult4, TResult5, TResult> resultSelector) =>
    source.Aggregate(
        (Agg1: aggregatorSelector1(source.AsAggregatorSource()),
         Agg2: aggregatorSelector2(source.AsAggregatorSource()),
         Agg3: aggregatorSelector3(source.AsAggregatorSource()),
         Agg4: aggregatorSelector4(source.AsAggregatorSource()),
         Agg5: aggregatorSelector5(source.AsAggregatorSource())),
        (s, e) =>
        {
            s.Agg1.Next(e);
            s.Agg2.Next(e);
            s.Agg3.Next(e);
            s.Agg4.Next(e);
            s.Agg5.Next(e);
            return s;
        },
        s => resultSelector(s.Agg1.GetResult(),
                            s.Agg2.GetResult(),
                            s.Agg3.GetResult(),
                            s.Agg4.GetResult(),
                            s.Agg5.GetResult()));

The implementation is quite straightforward. The aggregators become the seeds themselves. Each aggregator maintains its own state and folds each value of the sequence into it. At the end, GetResult calls are used to string together the final result.

Next comes infrastructure to help create an IAggregator<,> implementation easily given a seed and functions for the Next and GetResult members of the interface:

static partial class Aggregator
{
    public static IAggregator<T, TResult> Create<T, TState, TResult>(
        TState seed,
        Func<TState, T, TState> accumulator,
        Func<TState, TResult> resultSelector) =>
        new DelegatingAggregator<T, TState, TResult>(seed, accumulator, resultSelector);

    sealed class DelegatingAggregator<T, TState, TResult> : IAggregator<T, TResult>
    {
        TState _state;
        readonly Func<TState, T, TState> _accumulator;
        readonly Func<TState, TResult> _resultSelector;

        public DelegatingAggregator(TState seed, Func<TState, T, TState> accumulator, Func<TState, TResult> resultSelector)
        {
            _state = seed;
            _accumulator = accumulator;
            _resultSelector = resultSelector;
        }

        public void Next(T item) => _state = _accumulator(_state, item);
        public TResult GetResult() => _resultSelector(_state);
    }
}

With that, here's how the aggregators can be implemented:

partial class Aggregator
{
    public static IAggregator<T, int> Count<T>(this AggregatorSource<T> _) =>
        Aggregator.Create(0, (int s, T t) => s + 1, s => s);

    public static IAggregator<int, int> Sum(this AggregatorSource<int> _) =>
        Aggregator.Create(0, (int s, int x) => s + x, s => s);

    public static IAggregator<T, T> Some<T>(Func<T, T, T> acc) =>
        Aggregator.Create((Count: 0, Result: default(T)),
                          ((int Count, T Result) s, T e) => (s.Count + 1, s.Count > 0 ? acc(s.Result, e) : e),
                          s => s.Count > 0 ? s.Result : throw new InvalidOperationException());

    public static IAggregator<int, int> Min(this AggregatorSource<int> _) => Some<int>(Math.Min);
    public static IAggregator<int, int> Max(this AggregatorSource<int> _) => Some<int>(Math.Max);
    public static IAggregator<double, double> Min(this AggregatorSource<double> _) => Some<double>(Math.Min);
    public static IAggregator<double, double> Max(this AggregatorSource<double> _) => Some<double>(Math.Max);

    public static IAggregator<T, TCollection> Collect<T, TCollection>(this AggregatorSource<T> _, TCollection collection)
        where TCollection : ICollection<T> =>
        Aggregator.Create(collection, (TCollection s, T t) => { s.Add(t); return s; }, s => s);
}

Notice how Min and Max reuse Some. Notice further how the right Min and Max overloads will be picked depending on T, thanks to AggregatorSource<>. Count is also agnostic to T.

We can also now introduce a Count overload that counts elements satisfying a predicate:

partial class Aggregator
{
    public static IAggregator<T, int> Count<T>(this AggregatorSource<T> _, Func<T, bool> predicate) =>
        Aggregator.Create(0, (int s, T t) => s + (predicate(t) ? 1 : 0), s => s);
}

and use it like this:

Console.WriteLine(
    xs.Aggregate(
        a => a.Sum(),
        a => a.Count(),
        a => a.Min(),
        a => a.Max(),
        a => a.Count(x => x % 2 == 0),
        (sum, count, min, max, evens) => new
        {
            Sum       = sum,
            Count     = count,
            Average   = (double) sum / count,
            Min       = min,
            Max       = max,
            EvenCount = evens
        }));

Earlier, we saw Collect, which can collect elements into any collection. If collecting into a list is a common scenario then one can also introduce a simple helper for that now:

partial class Aggregator
{
    public static IAggregator<T, List<T>> List<T>(this AggregatorSource<T> _) =>
        _.Collect(new List<T>());
}

And naturally it builds on top of Collect. This makes the usage really readable and clean:

Console.WriteLine(
    xs.Aggregate(
        a => a.Sum(),
        a => a.Count(),
        a => a.Min(),
        a => a.Max(),
        a => a.List(), // instead of // a => a.Collect(new List<int>()),
        (sum, count, min, max, list) => new
        {
            Sum     = sum,
            Count   = count,
            Average = (double) sum / count,
            Min     = min,
            Max     = max,
            Content = "[" + string.Join(", ", list) + "]",
        }));

What's nice is that the calls to Sum, Count, Min and Max look just like how you would use them in normal LINQ, but here they do their work in conjunction with others and doing so in a single iteration.

Thoughts & feedback on this new design?

fsateler commented 7 years ago

This is an interesting design. I like the IAggregator<> interface as that can be used to implement optimized operators that do not allocate new memory on each iteration (and it would be nice if we did that for the standard operators).

I'm still thinking about your proposed design. I've so far failed to come up with something that dispenses with the AggregatorSource but preserves an easy way to add aggregators with selectors (I mean .Sum(obj => obj.Property)). All alternatives I think about are quite ugly. With the AggregatorSource things become fairly easy:

    public static IAggregator<T, int> Sum<T>(this AggregatorSource<T> _, Func<T, int> selector) =>
        Aggregator.Create(0, (int s, T x) => s + selector(x), s => s);

It's ugly but it makes things a lot easier.

atifaziz commented 7 years ago

I had a fairly ugly start as well but after coming up with AggregatorSource<>, everything just fell nicely into place. Perhaps the name does throw one off but its purpose is to communicate the source type, not the source sequence. This is why it's just a unit/singleton for every T. Does it still feel ugly then?

Your projected Sum example is interesting. I think we can even generalize that via a Select:

public static IAggregator<T1, TResult> Select<T1, T2, TResult>(
    this AggregatorSource<T1> source, Func<T1, T2> selector,
    Func<AggregatorSource<T2>, IAggregator<T2, TResult>> aggregatorSelector)
{
    var aggregator = aggregatorSelector(default(AggregatorSource<T2>));
    return Aggregator.Create(default(object),
                             (object _, T1 t) => { aggregator.Next(selector(t)); return _; },
                             _ => aggregator.GetResult());
}

So given another aggregator, it injects a projection. You can then even reuse it to implement your Sum overload:

public static IAggregator<T, int> Sum<T>(this AggregatorSource<T> _, Func<T, int> selector) =>
    _.Select(selector, a => a.Sum());

You could even use Select inline:

Console.WriteLine(
    "1,2,3,4,5,6,7,8,9,10"
        .Split(',')
        .Aggregate(
            a => a.Select(int.Parse, aa => aa.Sum()),
            a => a.Count(),
            (sum, count) => new { Sum = sum, Count = count }));

Since we are composing aggregators now, I wonder if there's another trick left that could make the composition read more like a.Select(int.Parse).Sum(), as in:

Console.WriteLine(
    "1,2,3,4,5,6,7,8,9,10"
        .Split(',')
        .Aggregate(
            a => a.Select(int.Parse).Sum(),
            a => a.Count(),
            (sum, count) => new { Sum = sum, Count = count }));

:thinking:

fsateler commented 7 years ago

Since we are composing aggregators now, I wonder if there's another trick left that could make the composition read more like a.Select(int.Parse).Sum()

This could be solved by making the AggregatorSource be a useful object:


public class AggregatorSource<T, TProp> {
    public AggregatorSource(Func<T, TProp> getValue) {
        GetValue = getValue;
    }
    public Func<T, TProp> GetValue { get; }

    public AggregatorSource<T, TNestedProp> Select<TNestedProp>(Func<TProp, TNestedProp> projection) {
        var gv = GetValue;
        return new AggregatorSource<T, TNestedProp>(v => projection(gv(v)));
    }
}

public static class Aggregator {
    public static AggregatorSource<T, T> AsAggregatorSource<T>(this IEnumerable<T> source) =>
        new AggregatorSource<T, T>(i => i);

    public static IAggregator<T, int> Sum<T>(this AggregatorSource<T, int> source) =>
        Aggregator.Create(0, (int s, T x) => s + source.GetValue(x), s => s);

    public static IAggregator<T, int> Count<T>(this AggregatorSource<T, T> _) =>
        Aggregator.Create(0, (int s, T t) => s + 1, s => s);
}

partial class MoreEnumerable {

public static TResult Aggregate<T, TResult1, TResult2, TResult>(
    this IEnumerable<T> source,
    Func<AggregatorSource<T, T>, IAggregator<T, TResult1>> aggregatorSelector1,
    Func<AggregatorSource<T, T>, IAggregator<T, TResult2>> aggregatorSelector2,
    Func<TResult1, TResult2, TResult> resultSelector) =>
    source.Aggregate(
        (Agg1: aggregatorSelector1(source.AsAggregatorSource()),
         Agg2: aggregatorSelector2(source.AsAggregatorSource())
         ),
        (s, e) =>
        {
            s.Agg1.Next(e);
            s.Agg2.Next(e);
            return s;
        },
        s => resultSelector(s.Agg1.GetResult(),
                            s.Agg2.GetResult())
                            );
}

And now your last example compiles.

leandromoh commented 7 years ago

2 questions:

  1. why simple dont apply ToList() to sorce before do the aggregations? So you only iterate source once.
  2. how common is it ?
atifaziz commented 7 years ago

@fsateler Thanks for your feedback, ideas and participating in the design.

making the AggregatorSource be a useful object

I agree and I think we're getting closer. More than just Select, I wanted to find a general way to combine operations for each item of the source, like a Where for filtering (as LINQ's Count supports). What follows is my next take on the design of the API.

I've dropped IAggregator<> for now and redefined the aggregator source as:

interface IAggregatorSource<T>
{
    void Connect(Action<T> sink);
}

Each aggregator will be responsible for connecting to the source. The sink is where items will be pushed. Here's some helper code for implementing the interface given a connection handler:

static partial class AggregatorSource
{
    public static IAggregatorSource<T> Create<T>(Action<Action<T>> connectionHandler) =>
        new DelegatingAggregatorSource<T>(connectionHandler);

    sealed class DelegatingAggregatorSource<T> : IAggregatorSource<T>
    {
        readonly Action<Action<T>> _connectionHandler;

        public DelegatingAggregatorSource(Action<Action<T>> connectionHandler) =>
            _connectionHandler = connectionHandler;

        public void Connect(Action<T> sink) => _connectionHandler(sink);
    }
}

Now the interesting part is that we can define Select and Where in terms of IAggregatorSource<>:

partial class AggregatorSource
{
    public static IAggregatorSource<U> Select<T, U>(this IAggregatorSource<T> source, Func<T, U> selector) =>
        Create<U>(sink => source.Connect(item => sink(selector(item))));

    public static IAggregatorSource<T> Where<T>(this IAggregatorSource<T> source, Func<T, bool> predicate) =>
        Create<T>(sink => source.Connect(item => { if (predicate(item)) sink(item); }));
}

Aggregators now connect to a source, supplying a handler for each item (what used to be Next on IAggregator<>), and return a function for obtaining the final result (what used to be GetResult on IAggregator<>):

static partial class Aggregator
{
    public static Func<TState> Connect<TState, T>(
        this IAggregatorSource<T> source,
        TState seed, Func<TState, T, TState> accumulator) =>
        source.Connect(seed, accumulator, s => s);

    public static Func<TResult> Connect<TState, T, TResult>(
        this IAggregatorSource<T> source,
        TState seed, Func<TState, T, TState> accumulator,
        Func<TState, TResult> resultSelector)
    {
        var state = seed;
        source.Connect(item => state = accumulator(state, item));
        return () => resultSelector(state);
    }

    public static Func<int> Sum(this IAggregatorSource<int> source) =>
        source.Connect(0, (s, x) => s + x);

    public static Func<int> Count<T>(this IAggregatorSource<T> source) =>
        source.Connect(0, (s, _) => s + 1);

    public static Func<T> Some<T>(this IAggregatorSource<T> source, Func<T, T, T> accumulator) =>
        source.Connect(
            (Count: 0, State: default(T)),
            (s, e) => (s.Count + 1, s.Count > 0 ? accumulator(s.State, e) : e),
            s => s.Count > 0 ? s.State : throw new InvalidOperationException());

    public static Func<T> Min<T>(this IAggregatorSource<T> source)
        where T : IComparable<T> =>
        source.Some((x, y) => x.CompareTo(y) < 0 ? x : y);

    public static Func<T> Max<T>(this IAggregatorSource<T> source)
        where T : IComparable<T> =>
        source.Some((x, y) => x.CompareTo(y) > 0 ? x : y);

    public static Func<List<T>> List<T>(this IAggregatorSource<T> source) =>
        source.Collect(new List<T>());

    public static Func<ISet<T>> Distinct<T>(this IAggregatorSource<T> source, IEqualityComparer<T> comparer = null) =>
        source.Collect(new HashSet<T>(comparer));

    public static Func<TCollection> Collect<T, TCollection>(this IAggregatorSource<T> source, TCollection collection)
        where TCollection : ICollection<T> =>
        source.Connect(collection, (TCollection s, T t) => { s.Add(t); return s; }, s => s);
}

Finally, here's the Aggregate method that puts it together:

static class MoreEnumerable
{
    public static TResult Aggregate<T, TResult1, TResult2, TResult3, TResult4, TResult5, TResult>(
        this IEnumerable<T> source,
        Func<IAggregatorSource<T>, Func<TResult1>> aggregatorConnector1,
        Func<IAggregatorSource<T>, Func<TResult2>> aggregatorConnector2,
        Func<IAggregatorSource<T>, Func<TResult3>> aggregatorConnector3,
        Func<IAggregatorSource<T>, Func<TResult4>> aggregatorConnector4,
        Func<IAggregatorSource<T>, Func<TResult5>> aggregatorConnector5,
        Func<TResult1, TResult2, TResult3, TResult4, TResult5, TResult> resultSelector)
    {
        var s1 = new AggregatorSource<T>(); var c1 = aggregatorConnector1(s1);
        var s2 = new AggregatorSource<T>(); var c2 = aggregatorConnector2(s2);
        var s3 = new AggregatorSource<T>(); var c3 = aggregatorConnector3(s3);
        var s4 = new AggregatorSource<T>(); var c4 = aggregatorConnector4(s4);
        var s5 = new AggregatorSource<T>(); var c5 = aggregatorConnector5(s5);

        foreach (var item in source)
        {
            s1.Next(item);
            s2.Next(item);
            s3.Next(item);
            s4.Next(item);
            s5.Next(item);
        }

        return resultSelector(c1(), c2(), c3(), c4(), c5());
    }

    sealed class AggregatorSource<T> : IAggregatorSource<T>
    {
        Action<T> _sink;
        public void Connect(Action<T> sink) => _sink = sink;
        public void Next(T item) => _sink(item);
    }
}

A push source is created to which every aggregator is connected. The items are then pulled from the source and pushed to each sink, each of which can do its projection, filtering and what not before being folded by the aggregator at the end of the chain.

Here's some usage showing all of the above in action:

var xs = "1,2,3,4,5,6,7,8,9,10".Split(',');
Console.WriteLine(
    xs.Select(s => new { Str = s, Num = int.Parse(s) })
        .Aggregate(
        s => s.Select(e => e.Num).Sum(),
        s => s.Count(),
        s => s.Select(e => e.Num).Min(),
        s => s.Select(e => e.Num).Max(),
        s => s.Where(e => e.Str != null) // can't happen but for illustration
              .Select(e => e.Str.Length)
              .Distinct(),
        (sum, count, min, max, lengths) => new
        {
            Sum      = sum,
            Count    = count,
            Avgerage = (double) sum / count,
            Min      = min,
            Max      = max,
            UniqueLengths
                     = "[" + string.Join(", ", lengths) + "]",
        }
    ));

An interesting thing to note is that IAggregatorSource<> with its connect and sink is there to model a push stream; put another way, aggregators are observers and subscribe to a shared iteration. In .NET, we already have IObservable<> (source) and IObserver<> (sink) to model that so we don't need any new types or representations. Also, Rx already has Select, Where and other operators implemented so we don't need to do anything! The two downsides are:

  1. Additional and possibly unneeded complexity of the IObservable<>-IObserver<> protocol. That is, most aggregators won't care about errors, completion notice and unsubscribing.
  2. Dependency on Rx

I think 1 is better than introducing new types that more or less mean the same thing. As for 2, you can mitigate some of that by building in the simple cases of projections and filtering directly into the aggregators as LINQ does today.

atifaziz commented 7 years ago
  1. why simple dont apply ToList() to sorce before do the aggregations? So you only iterate source once.

@leandromoh Suppose you have a directory of hundreds of CSV files, each with several thousands of lines of data. With ToList, you will not only waste time & space building a list (multiple re-allocations & copying as it grows dynamically) but you will also waste time traversing the list of hundreds of thousands of records several times (once per type of aggregation).

  1. how common is it ?

What may be common in my problem space may not be in yours so I don't see how I can answer this objectively. Everyone's mileage will vary.

fsateler commented 7 years ago

@atifaziz Interesting new proposal! It adds a bit of complexity but it is indeed much more powerful. I have some reservations on reusing the IObserver<> abstraction.

  1. OnCompleted is not used. In general I favor keeping interfaces as minimal as possible.
  2. For our purposes, OnError seems to be actively harmful. When IObservable<>-IObserver<> are used it makes sense to propagate errors down the sink chain because the error might be thrown in a separate thread, or where nobody is able to catch it. In our case, Aggregate is not only synchronous, but it is eagerly evaluated, and thus the point for catching errors is natural: just wrap the call with a try{} catch{} block. I'm not sure how exception handling would look like when OnError is (possibly) used. One would have to handle two possible error paths?

OTOH, I'm thinking this is already way too similar to Rx. It may well be that what I see as unneeded bloat is just domain complexity I don't see.

But, if reusing IObserver<> is the way forward, would it be best placed here or in Rx?

atifaziz commented 7 years ago

OnError isn't so much for catching an exception here as it is a way for Aggregate to notify the accumulators of an error, while either iterating the sequence (i.e. during a call to MoveNext) or when an adjacent accumulator throws. The only thing I can imagine an accumulator doing during OnError is clean-up and my guess is that that's almost never going to be needed. Same goes for OnComplete but it may be slightly more useful and I'll get into it another time.

just wrap the call with a try{} catch{} block

So yes, this would still be the case for the user of Aggregate but OnError is only for notification to accumulators during the main loop as the pull is converted to a push.

In general I favor keeping interfaces as minimal as possible.

Me too but I think having IAggregateSource<> with a Connect method, while simpler, is just confusing as it has nothing to do with aggregation per se. Initially, I had it there as a type inference hack and so struggled to give it a proper name and a real purpose. The refactorings that came with enabling a compositional pipeline over it lead to the observer pattern.

But, if reusing IObserver<> is the way forward, would it be best placed here or in Rx?

I'd say it still belongs here because fundamentally we're talking about Aggregate for sequences and any use of observable/observer is just to model how Aggregate interacts with the accumulators.

fsateler commented 7 years ago

@atifaziz OK, your explanation is convincing. I still have one thing I didn't quite understand correctly:

you can mitigate some of that by building in the simple cases of projections and filtering directly into the aggregators as LINQ does today.

Could you elaborate on this? I did not understand how would this problem would be mitigated.

atifaziz commented 7 years ago

Could you elaborate on this? I did not understand how would this problem would be mitigated.

@fsateler I wouldn't want to expose Select & Where as extensions of IObservable<> because we'd run the risk of creating conflicts for projects using Rx and LINQ. If someone wants to use those for an accumulation pipeline, as in a.Where(x => x % 2 == 0).Count() or a.Select(int.Parse).Sum(), they'll then have to bring in Rx. By offering projection and filtering on overloads of the accumulators, as in a.Count(x => x % 2 == 0) (what LINQ does too), we can help avoid anyone taking a dependency on Rx for the most common cases.

atifaziz commented 7 years ago

I wouldn't want to expose Select & Where as extensions of IObservable<> because we'd run the risk of creating conflicts for projects using Rx and LINQ.

Actually, come to think of it, there's still all the aggregation methods like Sum and Count that extend IObservable<> too so conflicts and confusion could still reign. Those aggregation methods actually terminate the composition chain by subscribing then aggregating and perhaps this needs to be reflected in the names? Like DoSum and DoCount? :thinking:

fsateler commented 7 years ago

Actually, come to think of it, there's still all the aggregation methods like Sum and Count that extend IObservable<> too so conflicts and confusion could still reign

Indeed. That's what I was thinking.

Those aggregation methods actually terminate the composition chain by subscribing then aggregating and perhaps this needs to be reflected in the names? Like DoSum and DoCount? :thinking:

I'm not really a fan of Do[Name], since it is very unclear on why would it differ from a non-Do method.

The aggregation methods provided by Rx are not useful for us, since they will block and try to aggregate instantly. I'm surprised there is no Async overload for all of those operators. Maybe we could provide Async operators in a separate namespace, and then when Rx adds them people would just stop importing the separate namespace.

The aggregation method would look something like this (untested since I'm not on my workstation right now):

static class MoreEnumerable
{
    public static TResult Aggregate<T, TResult1, TResult2>(
        this IEnumerable<T> source,
        Func<IObservable<T>, Task<TResult1>> aggregatorConnector1,
        Func<IObservable<T>, Task<TResult2>> aggregatorConnector2,
        Func<TResult1, TResult2, TResult> resultSelector)
    {
        var s1 = new AggregatorSource<T>(); var c1 = aggregatorConnector1(s1);
        var s2 = new AggregatorSource<T>(); var c2 = aggregatorConnector2(s2);

        foreach (var item in source)
        {
            s1.Next(item);
            s2.Next(item);
        }
        s1.Complete();
        s2.Complete();

        if (!c1.IsCompleted) {
               throw new InvalidOperationException("Should have completed!");
        }
        if (!c2.IsCompleted) {
               throw new InvalidOperationException("Should have completed!");
        }

        return resultSelector(c1.Value, c2.Value);
    }
    sealed class AggregatorSource<T> : IObservable<T>
    {
        readonly List<IObserver<T>> sinks = new List<IObserver<T>>();
        public IDisposable Subscribe(IObserver<T> observer) {
            sinks.Add(observer);
            return Delegating.Disposable(() => sinks.Remove(observer));
        }
        public void Next(T item) => sinks.ForEach(s => s.OnNext(item));
        public void Complete() => sinks.ForEach(s => s.OnCompleted());
    }
}

And the async aggregation methods:

class Aggregator<T, TResult> : IObserver<T> {
      private readonly TaskCompletionSource<T> CompletionSource = new TaskCompletionSource<T>();
      public Task<T> Task => CompletionSource.Task;

      private readonly Func<TResult> GetResult;
      private readonly Action<T> AggregatorFunc;

      public Aggregator(Action<T> aggregatorFunc, Func<TResult> getResult) { 
          GetResult = getResult;
          AggregatorFunc = aggregatorFunc;
      }

      public void OnNext(T item) {
           AggregatorFunc(item);
       }
      public void OnCompleted() {
           CompletionSource.SetResult(GetResult());
       }
      public void OnError(Exception e) {
           Source.SetException(e);
       }
}

public static Task<int> CountAsync(this IObservable<T> src) {
    int count = 0;
    var aggregator = new Aggregator<T, int>(_ => count++, () => count);
    src.Subscribe(aggregator);
    return aggregator.Task;
}
atifaziz commented 7 years ago

The aggregation methods provided by Rx are not useful for us, since they will block and try to aggregate instantly. I'm surprised there is no Async overload for all of those operators.

Are you sure? I can perfectly await on the Sum by Rx:

var sum = await Observable.Range(1, 10).Sum();

The async bits do muddy the waters conceptually. While one could argue that the aggregated results are future values, that won't be the case if we also decide to do a multi-Scan version.

Actually, you've just given me another idea! 💡 I'll be back 😆 but the basis of it is that Rx already has Publish that can be used to run concurrent aggregations:

var xs = Enumerable.Range(1, 10); // aggregate source
var source = xs.ToObservable().Publish(); // share
var sum = new int[1]; // by-ref
var count = new int[1]; // by-ref
source.Sum().Subscribe(a => sum[0] = a);
source.Count().Subscribe(a => count[0] = a);
source.Connect();
// pipeline above should have run in and entirely
// at time of connection so sum and count will
// have their final values
Console.WriteLine($"Sum = {sum[0]}, Count = {count[0]}");
atifaziz commented 7 years ago

@fsateler So with Aggregate completely based on and defined in terms of IObservable<>, we get this:

public static TResult Aggregate<TSource, TResult1, TResult2, TResult3, TResult4, TResult5, TResult>(
    this IEnumerable<TSource> source,
    Func<IObservable<TSource>, IObservable<TResult1>> aggregatorSelector1,
    Func<IObservable<TSource>, IObservable<TResult2>> aggregatorSelector2,
    Func<IObservable<TSource>, IObservable<TResult3>> aggregatorSelector3,
    Func<IObservable<TSource>, IObservable<TResult4>> aggregatorSelector4,
    Func<IObservable<TSource>, IObservable<TResult5>> aggregatorSelector5,
    Func<TResult1, TResult2, TResult3, TResult4, TResult5, TResult> resultSelector)
{
    var r1 = new TResult1[1]; var s1 = new Subject<TSource>();
    var r2 = new TResult2[1]; var s2 = new Subject<TSource>();
    var r3 = new TResult3[1]; var s3 = new Subject<TSource>();
    var r4 = new TResult4[1]; var s4 = new Subject<TSource>();
    var r5 = new TResult5[1]; var s5 = new Subject<TSource>();

    using (aggregatorSelector1(s1).Subscribe(Observer.Create((TResult1 r) => r1[0] = r)))
    using (aggregatorSelector2(s2).Subscribe(Observer.Create((TResult2 r) => r2[0] = r)))
    using (aggregatorSelector3(s3).Subscribe(Observer.Create((TResult3 r) => r3[0] = r)))
    using (aggregatorSelector4(s4).Subscribe(Observer.Create((TResult4 r) => r4[0] = r)))
    using (aggregatorSelector5(s5).Subscribe(Observer.Create((TResult5 r) => r5[0] = r)))
    {
        foreach (var item in source)
        {
            s1.OnNext(item);
            s2.OnNext(item);
            s3.OnNext(item);
            s4.OnNext(item);
            s5.OnNext(item);
        }

        s1.OnCompleted();
        s2.OnCompleted();
        s3.OnCompleted();
        s4.OnCompleted();
        s5.OnCompleted();
    }

    return resultSelector(r1[0], r2[0], r3[0], r4[0], r5[0]);
}

The big benefits are:

The usage is dead simple too and what we've been aiming for since early on:

Console.WriteLine(
    "1,2,3,4,5,6,7,8,9,10"
        .Split(',')
        .Select(s => new { Str = s, Num = int.Parse(s) })
        .Aggregate(
            s => s.Select(e => e.Num).Sum(),
            s => s.Count(),
            s => s.Select(e => e.Num).Min(),
            s => s.Select(e => e.Num).Max(),
            s => s.Select(e => e.Str.Length).Distinct().ToArray(),
            (sum, count, min, max, lengths) => new
            {
                Sum      = sum,
                Count    = count,
                Avgerage = (double) sum / count,
                Min      = min,
                Max      = max,
                UniqueLengths
                         = "[" + string.Join(", ", lengths) + "]",
            }
        ));

// outputs:
// { Sum = 55, Count = 10, Avgerage = 5.5, Min = 1, Max = 10, UniqueLengths = [1, 2] }

Note the special case of Distinct that must be combined with ToArray otherwise you only get back 2 (in fact with Distinct alone you see that lengths type is int instead of int[]), which is the last length among unique lengths. This may be a little non-obvious so Aggregate could be made to throw a run-time error if an aggregator publishes more than one result.

atifaziz commented 7 years ago

A downside to the latest design is that any aggregator not shipping in Rx must be implemented as an observable. While Rx does make this easy, it does mean taking a dependency on it. Perhaps a good compromise is to offer the first approach with accumulators as simple functions (little reuse, aggregators must be implemented locally but very simple) in addition to one based on observables (makes rich composition possible, allows use of projection, filtering and aggregate methods are already part of Rx but slightly more complex).

fsateler commented 7 years ago

The aggregation methods provided by Rx are not useful for us, since they will block and try to aggregate instantly. I'm surprised there is no Async overload for all of those operators.

Are you sure? I can perfectly await on the Sum by Rx

Oops, sorry. That's what I get from relying on memory and not testing. Indeed Sum and the other aggregation methods return an IObservable<>, and are thus awaitable (as long as Rx is installed, which you should otherwise you don't have Sum ;).

However, not all is well, because:

Note the special case of Distinct that must be combined with ToArray otherwise you only get back 2 (in fact with Distinct alone you see that lengths type is int instead of int[]), which is the last length among unique lengths. This may be a little non-obvious so Aggregate could be made to throw a run-time error if an aggregator publishes more than one result.

I see this as a major drawback. Aggregation methods returning the same type as selector methods is a serious flaw in the Rx API IMO. I wouldn't like to perpetuate that flaw in MoreLINQ (even if we catch it at runtime). Unfortunately, the only way out I see is to go back to Func<IObservable<TSource>, Task<>> as aggregator selectors, and require adding ToTask to all selectors:

Console.WriteLine(
    "1,2,3,4,5,6,7,8,9,10"
        .Split(',')
        .Select(s => new { Str = s, Num = int.Parse(s) })
        .Aggregate(
            s => s.Select(e => e.Num).Sum().ToTask(),
            s => s.Count().ToTask(),
            s => s.Select(e => e.Num).Min().ToTask(),
            s => s.Select(e => e.Num).Max().ToTask(),
            s => s.Select(e => e.Str.Length).Distinct().ToList().ToTask(),
            (sum, count, min, max, lengths) => new
            {
                Sum      = sum,
                Count    = count,
                Avgerage = (double) sum / count,
                Min      = min,
                Max      = max,
                UniqueLengths
                         = "[" + string.Join(", ", lengths) + "]",
            }
        ));

This makes invocations a bit cumbersome.

BTW, there is no need to have one subject per selector:

public static TResult Aggregate<TSource, TResult1, TResult2, TResult3, TResult4, TResult5, TResult>(
    this IEnumerable<TSource> source,
    Func<IObservable<TSource>, Task<TResult1>> aggregatorSelector1,
    Func<IObservable<TSource>, Task<TResult2>> aggregatorSelector2,
    Func<IObservable<TSource>, Task<TResult3>> aggregatorSelector3,
    Func<IObservable<TSource>, Task<TResult4>> aggregatorSelector4,
    Func<IObservable<TSource>, Task<TResult5>> aggregatorSelector5,
    Func<TResult1, TResult2, TResult3, TResult4, TResult5, TResult> resultSelector)
{
    var s = new Subject<TSource>();

    var c1 = aggregatorSelector1(s);
    var c2 = aggregatorSelector2(s);
    var c3 = aggregatorSelector3(s);
    var c4 = aggregatorSelector4(s);
    var c5 = aggregatorSelector5(s);

    foreach (var item in source)
    {
        s.OnNext(item);
    }

    s.OnCompleted();

    return resultSelector(c1.Result, c2.Result, c3.Result, c4.Result, c5.Result);
}
atifaziz commented 7 years ago

as long as Rx is installed, which you should otherwise you don't have Sum

True although it isn't very hard to implement at all if you really don't wish to take the dependency.

Aggregation methods returning the same type as selector methods is a serious flaw in the Rx API IMO. I wouldn't like to perpetuate that flaw in MoreLINQ (even if we catch it at runtime).

I agree it would be best if types could capture the expectation that a single result should come out at the other end of the chain and while Task<> does fit the bill (though somewhat feeling like a promise/future API of sorts), there's still one annoying issue. It requires that the aggregator must be combined with ToTask to satisfy the signature and by doing so, the subscription becomes the responsibility of the observable-to-task mapping function. We can then no longer make run-time behaviour checks for violation of the protocol, that for example the task never observes more than one result. The Rx implementation returns the last observation. If we take the case of Distinct, it means that you can get the wrong results if you forget to call ToList or ToArray. On the other hand, if Aggregate becomes the observer then it can make run-time checks, like the one you also do to make sure that the task was indeed completed:

        if (!c1.IsCompleted) {
               throw new InvalidOperationException("Should have completed!");
        }

A run-time check is better than a false result. A compile-time constrained API is even better but it requires introducing new types and providing our own implementation of all aggregators (plus filtering and projection).

Just for reference & discussion, here is what those run-time checks would look like:

public static TResult Aggregate<TSource, TResult1, TResult2, TResult3, TResult4, TResult5, TResult>(
    this IEnumerable<TSource> source,
    Func<IObservable<TSource>, IObservable<TResult1>> aggregatorSelector1,
    Func<IObservable<TSource>, IObservable<TResult2>> aggregatorSelector2,
    Func<IObservable<TSource>, IObservable<TResult3>> aggregatorSelector3,
    Func<IObservable<TSource>, IObservable<TResult4>> aggregatorSelector4,
    Func<IObservable<TSource>, IObservable<TResult5>> aggregatorSelector5,
    Func<TResult1, TResult2, TResult3, TResult4, TResult5, TResult> resultSelector)
{
    var r1 = new (bool, TResult1)[1];
    var r2 = new (bool, TResult2)[1];
    var r3 = new (bool, TResult3)[1];
    var r4 = new (bool, TResult4)[1];
    var r5 = new (bool, TResult5)[1];

    var subject = new Subject<TSource>();

    IDisposable Subscribe<T>(int n, IObservable<T> aggregator, (bool Defined, T)[] r, string paramName) =>
        ReferenceEquals(aggregator, subject)
        ? throw new ArgumentException($"Aggregator cannot be identical to the source.", paramName)
        : aggregator.Subscribe(s =>
              r[0] = r[0].Defined
                   ? throw new InvalidOperationException($"Aggregator #{n} produced multiple results when only one is allowed.")
                   : (true, s));

    using (Subscribe(1, aggregatorSelector1(subject), r1, nameof(aggregatorSelector1)))
    using (Subscribe(2, aggregatorSelector2(subject), r2, nameof(aggregatorSelector2)))
    using (Subscribe(3, aggregatorSelector3(subject), r3, nameof(aggregatorSelector3)))
    using (Subscribe(4, aggregatorSelector4(subject), r4, nameof(aggregatorSelector4)))
    using (Subscribe(5, aggregatorSelector5(subject), r5, nameof(aggregatorSelector5)))
    {
        foreach (var item in source)
            subject.OnNext(item);
        subject.OnCompleted();
    }

    T Get<T>(int n, (bool Defined, T Value) result) =>
        !result.Defined
        ? throw new InvalidOperationException($"Aggregator #{n} has an undefined result.")
        : result.Value;

    return resultSelector(Get(1, r1[0]), Get(2, r2[0]), Get(3, r3[0]), Get(4, r4[0]), Get(5, r5[0]));
}

Of course, the same can be done with tasks (which helps remove the by-reference/boxing array hack) but it would nonetheless remain an internal implementation detail:

public static TResult Aggregate<TSource, TResult1, TResult2, TResult3, TResult4, TResult5, TResult>(
    this IEnumerable<TSource> source,
    Func<IObservable<TSource>, IObservable<TResult1>> aggregatorSelector1,
    Func<IObservable<TSource>, IObservable<TResult2>> aggregatorSelector2,
    Func<IObservable<TSource>, IObservable<TResult3>> aggregatorSelector3,
    Func<IObservable<TSource>, IObservable<TResult4>> aggregatorSelector4,
    Func<IObservable<TSource>, IObservable<TResult5>> aggregatorSelector5,
    Func<TResult1, TResult2, TResult3, TResult4, TResult5, TResult> resultSelector)
{
    var r1 = new TaskCompletionSource<TResult1>();
    var r2 = new TaskCompletionSource<TResult2>();
    var r3 = new TaskCompletionSource<TResult3>();
    var r4 = new TaskCompletionSource<TResult4>();
    var r5 = new TaskCompletionSource<TResult5>();

    var subject = new Subject<TSource>();

    IDisposable Subscribe<T>(int n, IObservable<T> aggregator, TaskCompletionSource<T> r, string paramName) =>
        ReferenceEquals(aggregator, subject)
        ? throw new ArgumentException($"Aggregator cannot be identical to the source.", paramName)
        : aggregator.Subscribe(s =>
          {
              if (r.Task.IsCompleted)
                  throw new InvalidOperationException($"Aggregator #{n} produced multiple results when only one is allowed.");
              r.SetResult(s);
          });

    using (Subscribe(1, aggregatorSelector1(subject), r1, nameof(aggregatorSelector1)))
    using (Subscribe(2, aggregatorSelector2(subject), r2, nameof(aggregatorSelector2)))
    using (Subscribe(3, aggregatorSelector3(subject), r3, nameof(aggregatorSelector3)))
    using (Subscribe(4, aggregatorSelector4(subject), r4, nameof(aggregatorSelector4)))
    using (Subscribe(5, aggregatorSelector5(subject), r5, nameof(aggregatorSelector5)))
    {
        foreach (var item in source)
            subject.OnNext(item);
        subject.OnCompleted();
    }

    T Get<T>(int n, Task<T> result) =>
        !result.IsCompleted
        ? throw new InvalidOperationException($"Aggregator #{n} has an undefined result.")
        : result.Result;

    return resultSelector(Get(1, r1.Task), Get(2, r2.Task), Get(3, r3.Task), Get(4, r4.Task), Get(5, r5.Task));
}

We can externalize the checking by providing a ToTask extension for IObservable<> that allows for only a single result but this would come in conflict with a project using Rx and MoreLINQ (sigh). We could give it another name but then that doesn't guarantee that the aggregator selector function will call the right version!

BTW, there is no need to have one subject per selector

You're right. That was a leftover from a limitation of my own implementation of Subject<> that evolved out of AggregatorSource<> and which did not initially support multiple subscriptions. I wasn't using the one from Rx for my exploration.

Magnus12 commented 6 years ago

Did this ever make it into MoreLinq? Or anywhere else? Looks like a great feature @atifaziz

atifaziz commented 6 years ago

@Magnus12 Not yet although I'd still like to see that happen. I don't know if you followed the entire thread, but @fsateler and I were still pondering over the pros and cons of the API design, especially around signatures. I think once that is done (which frankly is and has been the hardest part), the rest is just a question of rolling up the sleeves and getting down to the implementation and tests.

Now that we have introduced an experimental namespace with #209, I can imagine throwing this out in the wild and let real-world field use drive the final decisions rather than leave it hanging here.

Meanwhile, I did some experimentation (see RxScans) that toyed with the idea of “accumulators for observable sources that emit their intermediate results.“ There's a runnable example in there too. The RxScans project shows that if you're willing to convert an enumerable into an observable then you can pretty much get the effect of “multiple aggregations in a single iteration” today.

atifaziz commented 5 years ago

Did this ever make it into MoreLinq?

@Magnus12 This is now shipping in 3.2. Check out also the examples.