dotnet / runtime

.NET is a cross-platform runtime for cloud, mobile, desktop, and IoT apps.
https://docs.microsoft.com/dotnet/core/
MIT License
15.27k stars 4.73k forks source link

[API Proposal]: Support specifying global degree of parallelism for Parallel.ForEachAsync by providing a ConcurrencyLimiter #78753

Open bill-poole opened 1 year ago

bill-poole commented 1 year ago

Background and motivation

The maximum degree of parallelism of a Parallel.ForEachAsync invocation is constrained by setting the ParallelOptions.MaxDegreeOfParallelism property. However, this constrains the degree of parallelism per invocation; not globally.

If a method that invokes Parallel.ForEachAsync is itself invoked on multiple threads (e.g., per received HTTP request), then this has a multiplying effect and can overwhelm the server. For example, a web server may use a ConcurrencyLimiter to constrain to handling no more than 100 simultaneous requests, each of which invoking a method that invokes Parallel.ForEachAsync with ParallelOptions.MaxDegreeOfParallelism also set to 100. This results in the body function passed to the Parallel.ForEachAsync method being invoked with a maximum effective concurrency of 10,000.

Ideally, each invocation of Parallel.ForEachAsync could be constrained using a global ConcurrencyLimiter for that invocation. This could be achieved by adding a ConcurrencyLimiter property to the ParallelOptions class. Thus in the above example, a static/global ConcurrencyLimiter could be defined in the class that contains the method that invokes Parallel.ForEachAsync and given to the ParallelOptions parameter, providing a global (process-wide) limit to the number of concurrent tasks invoked by that invocation of Parallel.ForEachAsync across all HTTP requests.

Furthermore, other invocations of Parallel.ForEachAsync in other places in the code can each set/use their own global limits, each governed by its own ConcurrencyLimiter instance.

Note that while the body function passed to the Parallel.ForEachAsync method can use a ConcurrencyLimiter internally to limit the global concurrency of given body function, the Parallel.ForEachAsync method would continue to consume items from the given source enumerable parameter up until the permitted degree of parallelism specified by the ParallelOptions.MaxDegreeOfParallelism property. Thus if the source enumerable is IAsyncEnumerable<T>, and the items being enumerated are very large in number and not all in memory (i.e., streamed from network/storage), and the Parallel.ForEachAsync consumes items from the given source enumerable faster than the given body function can process them, then this could lead to the server running out of memory.

However, if the Parallel.ForEachAsync method is ConcurrencyLimiter-aware, then it knows to only consume the next item from the given source enumerable when there is sufficient capacity to process it.

API Proposal

namespace System.Threading.Tasks;

public class ParallelOptions
{
    public ConcurrencyLimiter ConcurrencyLimiter { get; set; }
}

API Usage

public class RequestHandler
{
  private static readonly ConcurrencyLimiter s_concurrencyLimiter = new(new() 
  { 
    PermitLimit = 100, 
    QueueLimit = 0, 
    QueueProcessingOrder = QueueProcessingOrder.OldestFirst
  });

  public async Task Handle(Request request, CancellationToken cancellationToken)
  {
    await Parallel.ForEachAsync(request.Items, new ParallelOptions
    {
      CancellationToken = cancellationToken,
      ConcurrencyLimiter = s_concurrencyLimiter
    }, async (item, cancellationToken) =>
    {
      // Handle each item
    });
  }
}

Alternative Designs

No response

Risks

No response

ghost commented 1 year ago

Tagging subscribers to this area: @dotnet/area-system-collections See info in area-owners.md if you want to be subscribed.

Issue Details
### Background and motivation The maximum degree of parallelism of a `Parallel.ForEachAsync` invocation is constrained by setting the `ParallelOptions.MaxDegreeOfParallelism` property. However, this constrains the degree of parallelism _per invocation_; not globally. If a method that invokes `Parallel.ForEachAsync` is itself invoked on multiple threads (e.g., per received HTTP request), then this has a multiplying effect and can overwhelm the server. For example, a web server may use a `ConcurrencyLimiter` to constrain to handling no more than 100 simultaneous requests, each of which invoking a method that invokes `Parallel.ForEachAsync` with `ParallelOptions.MaxDegreeOfParallelism` also set to 100. This results in the `body` function passed to the `Parallel.ForEachAsync` method being invoked with a maximum effective concurrency of 10,000. Ideally, each invocation of `Parallel.ForEachAsync` could be constrained using a global `ConcurrencyLimiter` for that invocation. This could be achieved by adding a `ConcurrencyLimiter` property to the `ParallelOptions` class. Thus in the above example, a static/global `ConcurrencyLimiter` could be defined in the class that contains the method that invokes `Parallel.ForEachAsync` and given to the `ParallelOptions` parameter, providing a global (process-wide) limit to the number of concurrent tasks invoked by that invocation of `Parallel.ForEachAsync` across all HTTP requests. Furthermore, other invocations of `Parallel.ForEachAsync` in other places in the code can each set/use their own global limits, each governed by its own `ConcurrencyLimiter` instance. Note that while the `body` function passed to the `Parallel.ForEachAsync` method can use a `ConcurrencyLimiter` internally to limit the global concurrency of given `body` function, the `Parallel.ForEachAsync` method would continue to consume items from the given `source` enumerable parameter up until the permitted degree of parallelism specified by the `ParallelOptions.MaxDegreeOfParallelism` property. Thus if the `source` enumerable is `IAsyncEnumerable`, and the items being enumerated are very large in number and not all in memory (i.e., streamed from network/storage), and the `Parallel.ForEachAsync` consumes items from the given `source` enumerable faster than the given `body` function can process them, then this could lead to the server running out of memory. However, if the `Parallel.ForEachAsync` method is `ConcurrencyLimiter`-aware, then it knows to only consume the next item from the given `source` enumerable when there is sufficient capacity to process it. ### API Proposal ```csharp namespace System.Threading.Tasks; public class ParallelOptions { public ConcurrencyLimiter ConcurrencyLimiter { get; set; } } ``` ### API Usage ```csharp public class RequestHandler { private static readonly ConcurrencyLimiter s_concurrencyLimiter = new(new() { PermitLimit = 100, QueueLimit = 0, QueueProcessingOrder = QueueProcessingOrder.OldestFirst }); public async Task Handle(Request request, CancellationToken cancellationToken) { await Parallel.ForEachAsync(request.Items, new ParallelOptions { CancellationToken = cancellationToken, ConcurrencyLimiter = s_concurrencyLimiter }, async (item, cancellationToken) => { // Handle each item }); } } ``` ### Alternative Designs _No response_ ### Risks _No response_
Author: bill-poole
Assignees: -
Labels: `api-suggestion`, `area-System.Collections`
Milestone: -
ghost commented 1 year ago

Tagging subscribers to this area: @dotnet/area-system-threading-tasks See info in area-owners.md if you want to be subscribed.

Issue Details
### Background and motivation The maximum degree of parallelism of a `Parallel.ForEachAsync` invocation is constrained by setting the `ParallelOptions.MaxDegreeOfParallelism` property. However, this constrains the degree of parallelism _per invocation_; not globally. If a method that invokes `Parallel.ForEachAsync` is itself invoked on multiple threads (e.g., per received HTTP request), then this has a multiplying effect and can overwhelm the server. For example, a web server may use a `ConcurrencyLimiter` to constrain to handling no more than 100 simultaneous requests, each of which invoking a method that invokes `Parallel.ForEachAsync` with `ParallelOptions.MaxDegreeOfParallelism` also set to 100. This results in the `body` function passed to the `Parallel.ForEachAsync` method being invoked with a maximum effective concurrency of 10,000. Ideally, each invocation of `Parallel.ForEachAsync` could be constrained using a global `ConcurrencyLimiter` for that invocation. This could be achieved by adding a `ConcurrencyLimiter` property to the `ParallelOptions` class. Thus in the above example, a static/global `ConcurrencyLimiter` could be defined in the class that contains the method that invokes `Parallel.ForEachAsync` and given to the `ParallelOptions` parameter, providing a global (process-wide) limit to the number of concurrent tasks invoked by that invocation of `Parallel.ForEachAsync` across all HTTP requests. Furthermore, other invocations of `Parallel.ForEachAsync` in other places in the code can each set/use their own global limits, each governed by its own `ConcurrencyLimiter` instance. Note that while the `body` function passed to the `Parallel.ForEachAsync` method can use a `ConcurrencyLimiter` internally to limit the global concurrency of given `body` function, the `Parallel.ForEachAsync` method would continue to consume items from the given `source` enumerable parameter up until the permitted degree of parallelism specified by the `ParallelOptions.MaxDegreeOfParallelism` property. Thus if the `source` enumerable is `IAsyncEnumerable`, and the items being enumerated are very large in number and not all in memory (i.e., streamed from network/storage), and the `Parallel.ForEachAsync` consumes items from the given `source` enumerable faster than the given `body` function can process them, then this could lead to the server running out of memory. However, if the `Parallel.ForEachAsync` method is `ConcurrencyLimiter`-aware, then it knows to only consume the next item from the given `source` enumerable when there is sufficient capacity to process it. ### API Proposal ```csharp namespace System.Threading.Tasks; public class ParallelOptions { public ConcurrencyLimiter ConcurrencyLimiter { get; set; } } ``` ### API Usage ```csharp public class RequestHandler { private static readonly ConcurrencyLimiter s_concurrencyLimiter = new(new() { PermitLimit = 100, QueueLimit = 0, QueueProcessingOrder = QueueProcessingOrder.OldestFirst }); public async Task Handle(Request request, CancellationToken cancellationToken) { await Parallel.ForEachAsync(request.Items, new ParallelOptions { CancellationToken = cancellationToken, ConcurrencyLimiter = s_concurrencyLimiter }, async (item, cancellationToken) => { // Handle each item }); } } ``` ### Alternative Designs _No response_ ### Risks _No response_
Author: bill-poole
Assignees: -
Labels: `api-suggestion`, `area-System.Threading.Tasks`, `untriaged`
Milestone: -
stephentoub commented 1 year ago

This is reasonable in theory; whereas the ParallelOptions.TaskScheduler can be used to coordinate how many tasks are allowed to execute across any number of consumers at a time, the proposed ParallelOptions.ConcurrencyLimiter would enable doing so even when there was no work to actually run.

That said, it could add non-trivial overhead to the processing (which might otherwise be very fine-grained). It would also add complexity in understanding the mind-bending relationship between what limits the scheduler might put in place, what limits the concurrency limiter might put in place, and MaxDegreeOfParallelism. And as you've outlined, someone can achieve the majority of the benefits just by using the concurrency limiter in the body of the loop; you mentioned a downside of large memory consumption, but worst case is you're holding onto MaxDegreeOfParallelism items from the source, and if that's too large for the scenario, lower the MaxDegreeOfParallelism. Finally, there's also a practical issue, which is that Parallel is in the netcoreapp shared framework and ConcurrencyLimiter is not; taking this dependency would require pulling it in.

I'm not opposed to keeping this issue open to consider it again in the future, but I wouldn't rush to try to do something here now.

bill-poole commented 6 months ago

Thanks @stephentoub for considering this proposal.

if that's too large for the scenario, lower the MaxDegreeOfParallelism

Lowering the MaxDegreeOfParallelism isn't really a practical option in my case. In my case, the system needs a certain degree of parallelism to meet its performance requirements.

For example, let's say the system writes records to a database, each write has a latency of ~10 ms, and the system needs to write 100,000 records per second. Therefore, the required degree of parallelism is 1,000. If we lower the max degree of parallelism, then system won't meet its performance requirements.

Let's also say that the system performs many different types of operations that each writes a batch of an arbitrary number of records to the database. Furthermore, these operations are also performed concurrently. If Parallel.ForEachAsync is used to write each batch of records to the database, then there is no MaxDegreeOfParallelism parameter that we can use that will:

That then leaves us with the option of rate limiting inside the body provided to Parallel.ForEachAsync, but that can cause the system to run out of memory as we previously discussed.

I have however after some additional consideration found a third option, which is to rate limit the source enumerable given to Parallel.ForEachAsync:

public static async Task ForEachAsync<TSource>(IAsyncEnumerable<TSource> source,
    Func<TSource, CancellationToken, ValueTask> body, RateLimiter rateLimiter,
    CancellationToken cancellationToken)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(body);
    ArgumentNullException.ThrowIfNull(rateLimiter);

    await Parallel.ForEachAsync(RateLimited(source, rateLimiter, cancellationToken),
        new ParallelOptions
        {
            MaxDegreeOfParallelism = int.MaxValue,
            CancellationToken = cancellationToken
        }, async (element, cancellationToken) =>
        {
            try
            {
                await body(element.Source, cancellationToken);
            }
            finally
            {
                element.Lease.Dispose();
            }
        });

    static async IAsyncEnumerable<(TSource Source, RateLimitLease Lease)> RateLimited(
        IAsyncEnumerable<TSource> source, RateLimiter rateLimiter,
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        await foreach (var element in source.WithCancellation(cancellationToken))
        {
            // Wait until there is sufficient capacity and then yield the next element to
            // be processed. This ensures we don't consume any additional elements from the
            // given source enumerable until there is sufficient capacity to process the
            // current element. i.e., this transmits backpressure from the rate limiter to
            // the source enumerable.
            var lease = await rateLimiter.AcquireAsync(permitCount: 1, cancellationToken);
            yield return (element, lease);
        }
    }
}

However, this relies on:

This is necessary to ensure that every lease acquired from the rate limiter is disposed.

I have taken a look at the source code of the Parallel.ForEachAsync method and it seems to provide the required guarantee. However, that's an implementation detail that is not guaranteed to hold true in the future. But it seems this is my only option, other than to copy/modify the Parallel.ForEachAsync implementation.

bill-poole commented 6 months ago

Parallel is in the netcoreapp shared framework and ConcurrencyLimiter is not; taking this dependency would require pulling it in

Understood. The requirements would also be met by allowing a SemaphoreSlim to be provided (as opposed to a ConcurrencyLimiter).