dotnet / reactive

The Reactive Extensions for .NET
http://reactivex.io
MIT License
6.74k stars 751 forks source link

Observable completion exhibits O(n^2) behavior in GroupBy+[SelectMany/Merge] in the number of groups #2005

Closed alewisfm closed 5 months ago

alewisfm commented 1 year ago

Which library version?

System.Reactive 6.0.0

What are the platform(s), environment(s) and related component version(s)?

dotnet 7.0.203

What is the use case or problem?

We are processing an observable that contains sensor samples taken from many different sensors. We want to gather statistics related to each individual sensor, and so perform a GroupBy() on the observable to create per-sensor observables. There is a long delay between the observable completing and the final subscriber completing during which there is 100% CPU usage.

A simple reproducer has been created based on an observable:

For example:

void Test(int[] data)
{
  data.ToObservable()
      .GroupBy(value => value % numberOfGroups)
      .SelectMany(groupOfInts => groupOfInts)
      .Subscribe(intValue => {});
}

or

void Test(int[] data)
{
  data.ToObservable()
      .GroupBy(value => value % numberOfGroups)
      .Merge()
      .Subscribe(intValue => {});
}

What is the expected outcome?

The time taken to complete is O(m) where m is the number of elements in the array.

What is the actual outcome?

The time taken to complete is O(n^2) where n is the number of groups that were allocated (numberOfGroups from the above example).

What is the stacktrace of the exception(s) if any?

Running under a profiler the problem appears to be that when each of the IGroupedObservables completes, the subscriber created by the SelectMany() or Merge() is individually removed from a CompositeDisposable. This removal results in a linear search of an IList in the CompositeDisposable that contains one entry per group. As all the subscribers are removed, one after the other, this removal process is O(n^2) on the number of groups.

Profiler output:

  23.2%   <Main>$  •  1,659 ms  •  Program.<Main>$(String[])
    22.7%   GroupByMerge  •  1,624 ms  •  RxPerf.RxGroupByMicroBenchmark.GroupByMerge()
      22.6%   Subscribe  •  1,617 ms  •  System.ObservableExtensions.Subscribe(IObservable, Action)
        22.6%   Subscribe  •  1,617 ms  •  System.Reactive.Producer`2.Subscribe(IObserver)
          22.6%   SubscribeRaw  •  1,617 ms  •  System.Reactive.Producer`2.SubscribeRaw(IObserver, Boolean)
            22.6%   ScheduleAction  •  1,617 ms  •  System.Reactive.Concurrency.Scheduler.ScheduleAction(IScheduler, TState, Action)
              22.6%   Schedule  •  1,617 ms  •  System.Reactive.Concurrency.LocalScheduler.Schedule(TState, Func)
                22.6%   Schedule  •  1,617 ms  •  System.Reactive.Concurrency.CurrentThreadScheduler.Schedule(TState, TimeSpan, Func)
                  22.5%   Run  •  1,610 ms  •  System.Reactive.Concurrency.CurrentThreadScheduler+Trampoline.Run(SchedulerQueue)
                    22.1%   InvokeCore  •  1,585 ms  •  System.Reactive.Concurrency.ScheduledItem`2.InvokeCore()
                      22.1%   <LoopRec>b__5_0  •  1,585 ms  •  System.Reactive.Linq.ObservableImpl.ToObservableRecursive`1+_+<>c.<LoopRec>b__5_0(IScheduler, _)
                        22.1%   LoopRec  •  1,579 ms  •  System.Reactive.Linq.ObservableImpl.ToObservableRecursive`1+_.LoopRec(IScheduler)
                          20.4%   ForwardOnCompleted  •  1,459 ms  •  System.Reactive.Sink`1.ForwardOnCompleted()
                            20.4%   OnCompleted  •  1,459 ms  •  System.Reactive.Linq.ObservableImpl.GroupBy`3+_.OnCompleted()
                              20.4%   OnCompleted  •  1,459 ms  •  System.Reactive.Subjects.Subject`1.OnCompleted()
                                18.2%   OnCompletedCore  •  1,302 ms  •  System.Reactive.AutoDetachObserver`1.OnCompletedCore()
                                  18.2%   OnCompleted  •  1,302 ms  •  System.Reactive.Linq.ObservableImpl.Merge`1+Observables+_+InnerObserver.OnCompleted()
                                    18.2%   Remove  •  1,302 ms  •  System.Reactive.Disposables.CompositeDisposable.Remove(IDisposable)
                                      17.8%   IndexOf  •  1,273 ms  •  System.Array.IndexOf(T[], T, Int32, Int32)

Do you have a code snippet or project that reproduces the problem?

The following class runs under BenchmarkDotNet to exhibit the issue:

using System.Reactive.Linq;
using BenchmarkDotNet.Attributes;

namespace RxPerf;

[MemoryDiagnoser]
public class RxGroupByMicroBenchmark
{

    [Params(200_000, 1_000_000)]
    public int NumberOfSamples { get; set; }

    [Params(10, 100, 1_000, 10_000, 100_000, 150_000, 200_000)]
    public int NumberOfGroups { get; set; }

    private int[] data = Array.Empty<int>();

    [GlobalSetup]
    public void GlobalSetup()
    {
        data = new int[NumberOfSamples];
        for (var i = 0; i < data.Length; ++i)
        {
            data[i] = i;
        }
    }

    private IObservable<int>? observable;

    [IterationSetup]
    public void IterationSetup()
    {
        observable = data.ToObservable();
    }

    [Benchmark]
    public void GroupBySelectMany()
    {
        var numberOfGroups = NumberOfGroups;

        observable!.GroupBy(value => value % numberOfGroups)
            .SelectMany(groupOfInts => groupOfInts)
            .Subscribe(intValue => {});
    }

    [Benchmark]
    public void GroupByMerge()
    {
        var numberOfGroups = NumberOfGroups;

        observable!.GroupBy(value => value % numberOfGroups)
            .Merge()
            .Subscribe(intValue => {});
    }

}

Sample output is:

| Method            | NumberOfSamples | NumberOfGroups | Mean        | Error     | StdDev    | Median      | Gen0       | Gen1      | Gen2      | Allocated |
|------------------ |---------------- |--------------- |------------:|----------:|----------:|------------:|-----------:|----------:|----------:|----------:|
| GroupBySelectMany | 200000          | 10             |    26.77 ms |  0.534 ms |  1.378 ms |    26.29 ms |  2000.0000 |         - |         - |  12.21 MB |
| GroupByMerge      | 200000          | 10             |    26.46 ms |  0.523 ms |  1.020 ms |    26.20 ms |  2000.0000 |         - |         - |  12.21 MB |
| GroupBySelectMany | 200000          | 100            |    27.71 ms |  0.548 ms |  1.030 ms |    27.48 ms |  2000.0000 |         - |         - |  12.25 MB |
| GroupByMerge      | 200000          | 100            |    26.99 ms |  0.530 ms |  0.589 ms |    26.82 ms |  2000.0000 |         - |         - |  12.25 MB |
| GroupBySelectMany | 200000          | 1000           |    28.64 ms |  0.556 ms |  0.882 ms |    28.42 ms |  2000.0000 | 1000.0000 |         - |  12.58 MB |
| GroupByMerge      | 200000          | 1000           |    28.04 ms |  0.547 ms |  0.692 ms |    28.05 ms |  2000.0000 | 1000.0000 |         - |  12.58 MB |
| GroupBySelectMany | 200000          | 10000          |    41.60 ms |  0.823 ms |  1.375 ms |    41.43 ms |  2000.0000 | 1000.0000 |         - |     16 MB |
| GroupByMerge      | 200000          | 10000          |    42.33 ms |  0.835 ms |  0.928 ms |    42.16 ms |  2000.0000 | 1000.0000 |         - |     16 MB |
| GroupBySelectMany | 200000          | 100000         |   475.65 ms |  9.385 ms | 13.157 ms |   469.82 ms |  8000.0000 | 5000.0000 | 2000.0000 |  48.45 MB |
| GroupByMerge      | 200000          | 100000         |   468.15 ms |  5.728 ms |  5.078 ms |   467.44 ms |  8000.0000 | 5000.0000 | 2000.0000 |  48.45 MB |
| GroupBySelectMany | 200000          | 150000         |   929.67 ms | 10.310 ms |  9.140 ms |   928.65 ms | 10000.0000 | 6000.0000 | 2000.0000 |  64.04 MB |
| GroupByMerge      | 200000          | 150000         |   920.18 ms |  5.883 ms |  4.593 ms |   921.05 ms | 10000.0000 | 6000.0000 | 2000.0000 |  64.04 MB |
| GroupBySelectMany | 200000          | 200000         | 1,502.65 ms | 15.967 ms | 14.936 ms | 1,494.68 ms | 13000.0000 | 8000.0000 | 3000.0000 |  85.29 MB |
| GroupByMerge      | 200000          | 200000         | 1,490.90 ms | 12.406 ms | 11.604 ms | 1,488.47 ms | 13000.0000 | 8000.0000 | 3000.0000 |  85.29 MB |
| GroupBySelectMany | 1000000         | 10             |   128.24 ms |  0.648 ms |  0.506 ms |   128.35 ms | 10000.0000 |         - |         - |  61.04 MB |
| GroupByMerge      | 1000000         | 10             |   138.52 ms |  2.745 ms |  6.471 ms |   136.22 ms | 10000.0000 |         - |         - |  61.04 MB |
| GroupBySelectMany | 1000000         | 100            |   136.67 ms |  2.585 ms |  2.654 ms |   135.71 ms | 10000.0000 |         - |         - |  61.07 MB |
| GroupByMerge      | 1000000         | 100            |   140.28 ms |  1.412 ms |  1.179 ms |   140.16 ms | 10000.0000 |         - |         - |  61.07 MB |
| GroupBySelectMany | 1000000         | 1000           |   141.69 ms |  2.704 ms |  2.893 ms |   141.58 ms | 10000.0000 | 1000.0000 |         - |  61.41 MB |
| GroupByMerge      | 1000000         | 1000           |   134.36 ms |  2.269 ms |  2.123 ms |   133.69 ms | 10000.0000 | 1000.0000 |         - |  61.41 MB |
| GroupBySelectMany | 1000000         | 10000          |   152.31 ms |  2.960 ms |  2.769 ms |   153.28 ms | 10000.0000 | 1000.0000 |         - |  64.83 MB |
| GroupByMerge      | 1000000         | 10000          |   150.45 ms |  2.716 ms |  2.668 ms |   150.68 ms | 10000.0000 | 1000.0000 |         - |  64.83 MB |
| GroupBySelectMany | 1000000         | 100000         |   578.28 ms | 11.323 ms | 14.722 ms |   572.63 ms | 15000.0000 | 5000.0000 | 1000.0000 |  97.28 MB |
| GroupByMerge      | 1000000         | 100000         |   576.79 ms | 10.564 ms |  9.365 ms |   573.79 ms | 15000.0000 | 5000.0000 | 1000.0000 |  97.28 MB |
| GroupBySelectMany | 1000000         | 150000         | 1,031.99 ms |  7.762 ms |  6.482 ms | 1,029.68 ms | 18000.0000 | 7000.0000 | 2000.0000 | 112.87 MB |
| GroupByMerge      | 1000000         | 150000         | 1,053.82 ms | 15.343 ms | 15.069 ms | 1,047.47 ms | 18000.0000 | 7000.0000 | 2000.0000 | 112.88 MB |
| GroupBySelectMany | 1000000         | 200000         | 1,632.03 ms | 18.536 ms | 17.339 ms | 1,633.38 ms | 20000.0000 | 8000.0000 | 2000.0000 | 134.12 MB |
| GroupByMerge      | 1000000         | 200000         | 1,673.68 ms | 27.066 ms | 25.318 ms | 1,667.54 ms | 20000.0000 | 8000.0000 | 2000.0000 | 134.12 MB |

Note that increasing the number of samples has a relatively consistent increase the total time of about 100 ms, whereas increasing the number of groups has a more quadratic relationship with overall time, becoming the dominating factor at around 100,000 groups.

HowardvanRooijen commented 1 year ago

Thanks for all the details!

idg10 commented 9 months ago

I think there are two possible strategies we could use to fix this:

  1. detect when the completion occurs because the source has finished, and use some bulk mode to dispose everything more efficiently
  2. make CompositeDisposable.Remove more efficient when it is tracking large numbers of disposables

I think 1 might be tricky in this case, because it's the GroupBy that knows that it is shutting down all downstream observers, but it's the SelectMany that would need to process the shutdown in bulk. (The CompositeDisposable is owned by the SelectMany in this case.) It's not completely inconceivable that there could be some sort of protocol by which an upstream source could notify something downstream that it's going to be calling OnComplete on all its downstream subscribers. We do already have some cases where we detect that multiple System.Reactive operators are chained together, and they can behave differently than they would when dealing with some random unknown IObserver<T>. We use that to avoid redundant safe wrappers for example. But this would be a significant change because at the moment, there's nothing at all that enables a set of downstream observers in some sort of "fan out" configuration to be handled as a single logical group.

Also, attempting 1 seems less general: it seems like it would be too easy to end up solving this only for certain specific combinations of known operators, and it would definitely break if someone threw their own operator into the mix. Also, there are cases where you could have thousands of groups, and they would not necessarily all complete at once. Ideally we'd want to make that more efficient, instead of fixing only the very particular case in which everything shuts down at once.

So approach 2 seems more promising.

The way I would approach this would, I think, be to make CompositeDisposable operate in one of two modes. For collections below some threshold we would continue to use a List<IDisposable> internally. I believe a lot of CompositeDisposables contain very small lists in practice, and a linear search is usually more efficient than a more scalable algorithm for small collections. Also, this type is used very widely, and changing its internal makeup could have a significant effect on memory usage by Rx.NET.

So I'm thinking we'd change the field that currently is of type List<IDisposable?> to be of type object, and for that to contain a reference either to a List<IDisposable> as now, or some hash-based type once the number of items exceeds some threshold. (Round about 1000 seems like a reasonable threshold, based on the numbers in the benchmark.)

The alternate representation couldn't be a HashSet<IDisposable> because at the moment, nothing stops you from adding entries to CompositeDisposable twice, and it will Dispose them twice. Although it would be a bad idea for anything to depend on that, it's possible that something is in fact depending on it. So I'm thinking we could use Dictionary<IDisposable, int> where the int is a count of how many times the relevant disposable has been added.

In cases where the IDisposable has a reasonable hash code, this means individual removals would no longer have $O(N)$ cost. As the docs for Dictionary say, locating items typically has $O(1)$ cost, so the total shutdown cost would become $O(N)$ instead of $O(N^2)$.

idg10 commented 8 months ago

I've prototyped approach 2 described above, making CompositeDisposable operate in two different modes according to how many disposables it contains. Below a threshold (currently 1024) it uses a list exactly like it did before. But once this threshold is exceeded, it switches into a mode where it uses a dictionary instead.

See https://github.com/dotnet/reactive/pull/2092

Here are how your benchmarks look on my machine (which is quite old now, and significantly slower than yours) running against the current CompositeDisposable:

image

(The dark blue line here lines up almost precisely with the green one here which is why it appears to be missing. Likewise the orange line behind the pale blue one.)

You can see the lines curving upwards in way that looks consistent with the $O(N^2)$ behaviour predicted by the analysis.

Here's how it looks with prototype:

image

Note that the vertical scale is different—this goes up only to 600 whereas the first went up to 3500. (Not quite sure why it went for that instead of 3000, but there we are.) Because we're effectively zoomed in a bit more, it's now possible to see that there really are 4 lines, thanks to minor variations in the runs. For comparison, if I show it at the same vertical scale as the first graph, you can see that it's a significant overall improvement for larger group sizes:

image

Perhaps more importantly, we no longer see the upward curve. It looks consistent with the predicted $O(N)$ behaviour.

My one concern with this was that because CompositeDisposable is used very widely, and because it's common for it to contain small numbers of items, adding this extra mode to enable better performance for large collections would have a net negative effect because it's going to slow things down at least slightly in all scenarios that don't need the large mode. So I ran the full benchmark suite before and after the change, to visualize the overall effect:

image

Each point represents one particular ([Benchmark] method, parameter set) combination. Its X position indicates how long the benchmark took to run before I made these changes to CompositeDisposable, and its Y position shows how long the same benchmark took after the change.

The red line marks X=Y. Any points falling on this line ran at the same speed before and after the change. Any falling above it got slower, and any falling beneath it got faster.

You can see the positive effect of this change very easily—the points that are way off the line and below it all represent the new benchmarks added to capture this problem, and their position on this plot shows that they all got significantly faster.

We can also see that nothing got dramatically slower. There's a bit of variation, but I think that's just variability between runs. (I was running this suite on my desktop machine and since it takes several hours, I was doing other things at the same time.) We are aiming to introduce some more consistent benchmark analysis, with execution procedures that should produce less variable results, and we also want to run the benchmarks over some older versions of Rx to see if any performance regressions were introduced. So at this stage, with respect to this particular problem, our main concern is to determine that we didn't cause any major new problems. It's looking OK from that perspective.

idg10 commented 5 months ago

Note, this change went into System.Reactive 6.0.1.