Closed Neo-vortex closed 7 months ago
Actually, the code you mentioned in this repository is already running tasks in parallel. Although the for loop could make you think it's running handlers sequentially, it is not. More on that through this SO answer - from which the code is probably taken from.
Actually, the code you mentioned in this repository is already running tasks in parallel. Although the for loop could make you think it's running handlers sequentially, it is not. More on that through this SO answer - from which the code is probably taken from.
The difference between your link and this, is that the SO method takes an array of ValueTask
. These would have been already started (in a hot state) when ValueTask<T[]> WhenAll<T>
is called, so calling await
is fine there.
In our case the ValueTask
s haven't started yet, and are currently being awaited immediately after creation, resulting in sequential execution.
Sorry if I'm wrong, I'm unable to do a benchmark rn π
Hmm that's very correct! I've overseen it too fast it seems!
There is no ValueTask<T[]> WhenAll<T>
, so the ValueTask
would have to be boxed with AsTask
like in the PR and we would pay more in terms of allocation. That said, as long as it's configurable/opt-in then I think it's OK. One of the key goals of this library is to keep overhead low (close to zero). Ideally, only the users that require parallelism in notification handlers should pay the cost for it, which is how MediatR landed on its current API I'm assuming.
There's a little more nuance required to decide wether something actually runs parallel or not though. For example this code:
var tasks = new Task[4];
for (int i = 0; i < 4; i++)
tasks[i] = Work(i);
await Task.WhenAll(tasks);
async Task Work(int i)
{
const int SIZE = 100_000_000;
var n = new float[SIZE];
var rng = Random.Shared;
for (int j = 0; j < SIZE; j++)
{
n[j] = rng.NextSingle();
}
Console.WriteLine($"{i} done!");
}
Does not actually run parallel, becaues Work
never actually yields the thread. So in this case "N done!" would be printed sequentially. If we insert Task.Yield
in the beginning of the function then it will yield the thread and each task will (presumably) be scheduled to different threads by the runtime. Another way to ensure parallelism would be to wrap Work(i)
in Task.Run
or similar.
The curent TaskWhenAllPublisher
in MediatR does not use something ilke Task.Run
AFAICT, and therefore handlers might run sequential: https://github.com/jbogard/MediatR/blob/c295291c4e8105d11a453004b42609cbf490c1cf/src/MediatR/NotificationPublishers/TaskWhenAllPublisher.cs
@martinothamar
This API proposal outlines the implementation of an optional parallel notification execution feature within our service. This feature allows us to handle notifications concurrently, enhancing the system's performance.
We will introduce a new configuration option for our Mediator setup, as shown in the code snippet below:
builder.Services.AddMediator(options =>
{
options.ServiceLifetime = ServiceLifetime.Transient;
// NotificationHandlingStrategy can be configured here
options.NotificationHandlingStrategy = NotificationHandlingStrategy.PARALLEL;
});
we introduce a NotificationHandlingStrategy to specify how notifications are processed. The NotificationHandlingStrategy
is an enumeration with two possible values: PARALLEL
and SEQUENTIAL.
default value of options.NotificationHandlingStrategy
would be NotificationHandlingStrategy.SEQUENTIAL
to maintain the old behavior.
NotificationHandlingStrategy
EnumerationWe define the NotificationHandlingStrategy enumeration in this section:
public enum NotificationHandlingStrategy {
PARALLEL, // Notifications are executed concurrently
SEQUENTIAL // Notifications are executed one after another
}
This enumeration allows users to choose between parallel and sequential notification processing based on their specific requirements.
NotificationHandlingStrategy.PARALLEL
I think a more appropiate name would be Concurrent
execution right?
Task.WhenAll
alternativeThere is no ValueTask<T[]> WhenAll
, so the ValueTask would have to be boxed with AsTask like in the PR and we would pay more in terms of allocation.
I don't think its necessary to use Task.WhenAll
, we could write an awaiter like so:
global::System.Collections.Generic.List<global::System.Exception>? exceptions = null;
// start each task
ValueTask[] valueTasks = handlers
.Select(handler => handler.Handle(notification, cancellationToken).ConfigureAwait(false)).ToArray();
foreach(var task in valueTasks)
{
try
{
// we await each task, it doesn't matter if we await the slowest one as we'd have to wait for it anyway
// this way all the other tasks will have completed.
await task;
}
catch (global::System.Exception ex)
{
exceptions ??= new global::System.Collections.Generic.List<global::System.Exception>();
exceptions.Add(ex);
}
}
Would this be an appropiate place to use IsCompletedSuccessfully
? I can't remember the downsides or benefits of this π, the blog post didn't elaborate much. I'll benchmark/play around with it later.
try
{
if (!task.IsCompletedSuccessfully)
{
await task;
}
}
One of the key goals of this library is to keep overhead low (close to zero)
My suggested implementation causes an array allocation π, this could be avoided by using the .NET 8 inline array feature. Mediator
could add Buffer2<T>
, Buffer3<T>
types up to a max number and use the appropiate collection depending on the handler length (IIRC Mediator
knows the number of handlers for each notification AOT, we generate a buffer type when it is needed). A Publish
method would have to be created for each buffer length due to the lack of convenient shared type.
I suspect most projects only use a small number of notification handlers so this would be used in most cases, otherwise an array will be allocated (could look at ThreadStaticLocal
as well).
This is probably redundant as if any of the handlers yield, the ValueTask
would have to allocate a Task
to the heap, moving the inline array to the heap.
This seems to work as well. But i do not understand why you think '. AsTask()' would have more overhead (be it allocation or anything) 'As' keyword usually means little to no overhrad when converting the source to the target type
ValueTask
is a struct capable of wrapping either a TResult
or a Task<TResult>
. Meaning it can be returned from an async method, and if that method completes synchronously and successfully, nothing need be allocated. Only if the method completes asynchronously does a Task<TResult>
need to be allocated.
Calling AsTask
returns the Task<TResult>
object that is wrapped in this ValueTask<TResult>
if one exists, or a new Task<TResult>
object that represents the result. Meaning that if a given ValueTask
would run synchronously, calling AsTask
will cause an unnecessary allocation of Task
.
Benchmarks clearly show that AsTask
always causes a task to be allocated, even for methods which do not await. Observe how all methods allocate for AsyncAwaitTask
. This is because awaiting Task.Delay
makes the method complete asynchronously, requiring a Task
to be allocated to the heap,
It looks like IsCompletedSuccessfullyResult
is faster than awaiting a synchronous method, perhaps this should be used?
Method | Mean | Error | StdDev | Gen0 | Allocated |
---|---|---|---|---|---|
AwaitResult | 33.13 ns | 0.707 ns | 0.757 ns | - | - |
AsTaskResult | 32.72 ns | 0.451 ns | 0.352 ns | 0.0459 | 72 B |
IsCompletedSuccessfullyResult | 22.86 ns | 0.343 ns | 0.304 ns | - | - |
Method | Mean | Error | StdDev | Gen0 | Allocated |
---|---|---|---|---|---|
AwaitResult | 46.34 ns | 0.970 ns | 1.328 ns | - | - |
AsTaskResult | 52.49 ns | 0.336 ns | 0.280 ns | 0.0459 | 72 B |
IsCompletedSuccessfullyResult | 39.43 ns | 0.817 ns | 1.146 ns | - | - |
I awaited Task.Yield() , hence why it is so much slower. It still demonstrates that awaiting causes all variations to allocate a Task . |
Method | Mean | Error | StdDev | Gen0 | Allocated |
---|---|---|---|---|---|---|
AwaitResult | 1.108 us | 0.0120 us | 0.0113 us | 0.1564 | 248 B | |
AsTaskResult | 1.108 us | 0.0161 us | 0.0150 us | 0.1526 | 240 B | |
IsCompletedSuccessfullyResult | 1.131 us | 0.0116 us | 0.0108 us | 0.1564 | 248 B |
Hey, sorry for the late reply. I'll be trying to catch up this week :)
I think a more appropiate name would be Concurrent execution right?
I agree! Also the fact that you used PascalCase, I think that's the idiomatic way for enums in .NET. Apart from this I think the API and naming is great @Neo-vortex :+1:
I'd be interested to see more benchmarks running notification publishing workloads, testing various optimisations. Optimizations such as being opportunistic about IsCompletedSuccessfully
might drown in the noise of the rest of the code since the function is already async, I've mostly seen that optimization being useful when you can avoid the async state machine altogether.
Maybe we should agree on what the common case is. I would guess that you are right @TimothyMakkison in that there usually is a single or few handlers. I also think that most handlers will be async. Do you agree?
I'd be interested to see more benchmarks running notification publishing workloads, testing various optimisations. Optimizations such as being opportunistic about IsCompletedSuccessfully might drown in the noise of the rest of the code since the function is already async, I've mostly seen that optimization being useful when you can avoid the async state machine altogether.
I agree, IsCompletedSuccessfully
is best used to avoid allocations.
I did run some benchmarks to experiment with different methods, I tried to use small amounts of handlers and varied between all async, sync and mixed handlers. I can't find the benchmark code rn but this is what I found:
Task.WhenAll(Task[])
will copy the array doubling duplicationsTask.WhenAll(IEnumerable<Task>)
internally uses ToArray
so won't duplicate an array. Unfortunately var tasks = array.Select(static x => x()).Where(static x => !x.IsCompletedSuccessfully);
will allocate a linq state machine of (iirc) 112 bytes, making any saving redundant.AsTask
will allocate state machines for async methods, using AsTask
on ValueTask.CompletedTask
won't allocate. I found a variant on the following worked quite well, it could run synchronously with no allocations if all the handlers were synchronous. If an async handler was called an array large enough to fit all the remaining handlers would be allocated. (List has a large overhead)
Task?[]? tasks = null;
var pos = 0;
for (var i = 0; i < array.Length; i++)
{
var task = array[i]();
if(task.IsCompletedSuccessfully)
continue;
tasks ??= new Task[array.Length - i];
tasks[pos] = task.AsTask();
pos++;
}
if(tasks == null)
return;
foreach (var t in tasks)
{
if (t == null)
break;
await t.ConfigureAwait(false);
}
Maybe we should agree on what the common case is. I would guess that you are right @TimothyMakkison in that there usually is a single or few handlers. I also think that most handlers will be async. Do you agree?
Not sure how it's used in the wild, but I suspect 95% of uses have one or two async handlers. I wonder if anyone has ever had more than 10 handlers for one notification π€
That code looks great :+1:
I was wondering if awaiting tasks in a loop causes more "scheduling churn"/CPU overhead than using Task.WhenAll
, but from my interpretation of the code in WhenAllPromise
it seems unlikely to be cheaper in any scenario, as there is quite a lot of bookeeping code and atomic operations in there
Task.WhenAll(Task[])
will copy the array doubling duplications
Doesn't look like that to me? If passed as array its just casted to a span. The array can't contain nulls though, so any remaining length would have to be populated by Task.CompletedTask
Doesn't look like that to me? If passed as array its just casted to a span. The array can't contain nulls though, so any remaining length would have to be populated by
Task.CompletedTask
Nice find, looks like they removed the duplication in .NET 8. I get the following in .NET 7 and below
Task[] tasksCopy = new Task[taskCount];
for (int i = 0; i < taskCount; i++)
{
Task task = tasks[i];
if (task == null) ThrowHelper.ThrowArgumentException(ExceptionResource.Task_MultiTaskContinuation_NullTask, ExceptionArgument.tasks);
tasksCopy[i] = task;
}
The array can't contain nulls though, so any remaining length would have to be populated by Task.CompletedTask
mb this version didn't need Task?
:
Task[]? tasks = null;
for (var i = 0; i < array.Length; i++)
{
var task = array[i]();
if(tasks == null && task.IsCompletedSuccessfully)
continue;
tasks ??= new Task[array.Length - i];
tasks[i - (array.Length - tasks.Length)] = task.AsTask();
}
if(tasks == null)
return;
foreach (var t in tasks)
{
await t.ConfigureAwait(false);
}
Is it better to use Parallel.ForEachAsync then task waitall, it's already optimized to deal with cpu cores, also checking if number of handler are not to low would be good idea, if you have single handler there is not point running it in parallel, waisting reasources
Is it better to use Parallel.ForEachAsync then task waitall, it's already optimized to deal with cpu cores
It varies from tasks to task but Parallel.ForEachAsync
uses more memory, alaways allocates a Task
(I Think) and has the overhead of managing multiple threads. I suspect it is overkill for 99% of scenarios but that's only my gut feeling π
It might be worth creating NotificationHandlingStrategy.Parallel
, NotificationHandlingStrategy.Sequential
and NotificationHandlingStrategy.Concurrent
. It might be possible to let the user define their own strategies using IServiceCollection
.
Method | Mean | Error | StdDev | Allocated |
---|---|---|---|---|
Concurrent | 15.73 ms | 0.101 ms | 0.095 ms | 1.06 KB |
ParallelForEach | 15.74 ms | 0.087 ms | 0.081 ms | 1.98 KB |
Feel free to play around with the number of threads/ the delay.
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
BenchmarkRunner.Run<Benchmark>();
[MemoryDiagnoser]
public class Benchmark
{
private Func<Task>[] _tasks = Enumerable.Range(0,3).Select<int, Func<Task>>(x => async () => { await Task.Delay(1); }).ToArray();
[Benchmark]
public async Task Concurrent()
{
var tasks = _tasks.Select(x => x()).ToArray();
foreach (var task in tasks)
{
await task;
}
}
[Benchmark]
public async Task ParallelForEach()
{
await Parallel.ForEachAsync(_tasks, async (x, _) => await x(); });
}
}
Yeah, for small number of task maybe it's not worthy, if you put 100 or more we could probably see better result depending on work done in tasks (IO, CPU) but not sure if someone is using 100 handlers for for single type of message π
agree with point let user define it's own execution strategy implementation, i think MediatR has that option
I kind of like the configuration API from MediatR for this, just having the user specify a type. That will allow users to hook into more stuff in the notification pipeline as well (for example distributed tracing or any custom instrumentation)
https://github.com/jbogard/MediatR/wiki#custom-notification-publishers
I agree, specifying a type for a custom INotificationPublisher has worked well for me in MediatR. Some examples would include using it to do something like MediatR's TaskWhenAllPublisher, using Parallel.ForEachAsync() (which does return a ValueTask for a delegate), having a sequential foreach loop, or fire-and-forget. I use the last case in an app to make notifications fire-and-forget.
Started work on this here: #145 Would appreciate reviews!
I did a bunch of experimentation trying to figure out the most lightweight way of awaiting an array of value tasks that complete asynchronously: https://gist.github.com/martinothamar/ed6c4bec532cbc9ca9311d15a382a3a6
Can you spot any errors or weaknesses in the code? Essentially my interpretation is that the Loop
variants do pretty much on par latency-wise, but has less allocations. So in a high throughput/concurrent situation (unlike these microbenchmarks), I think those should do well
Here's the current impl in my PR: https://github.com/martinothamar/Mediator/blob/9196821326f61fdd5962c50f783f716282a6fc4c/src/Mediator/INotificationPublisher.cs#L70-L86
I might try to use the PoolingAsyncValueTaskMethodBuilder
here too, but that might not work as well outside of microbenchmarks like these..
I forgot to address this but I incorrectly responded to @Neo-vortex s comment with my reply.
I do not understand why you think
AsTask()
would have more overhead (be it allocation or anything).
My response only looked at generic ValueTask<TResult>
and neglected the case for the relevant non-generic ValueTask
. In this case he is spot on and I completely ignored his point π
Calling AsTask
on a synchronous ValueTask
will return a static Task.CompletedTask
using zero memory. Calling AsTask
on a asynchronous ValueTask
will return the underlying allocated Task
object, this allocation would have occured even if the ValueTask
weren't awaited or if AsTask
was called.
Feeling pretty good about #145 now, also added a sample and some docs, would appreciate reviews as always. The PR has gotten pretty large but a big part of it is just snapshot-files changing. I'll merge tomorrow probably unless there are any big concerns
Changes published in 3.0.0-preview.25
Please still feel free to test and provide feedback
This is very exciting! I'll put it to the test. Thank you, @martinothamar! π
Good to hear! Some people are reporting issues. I'm actively looking into it so feedback and reports are great π https://github.com/martinothamar/Mediator/issues/146
One of the philosophy of notification is to notify different handlers at the same time (hopefully). currently what we do is :
MediatR 12 has implemented an option to define if Notifications handlers should run parallel or not. here is a link to a video demonstrate it : https://www.youtube.com/watch?v=hNxVjNO6RX4 Mediator library should add an option for these execution strategies.
imagine these two notification handlers :
with current implementation the handling time for all the handlers would be about ~ 12000 ms but with the new implementation it would be about ~ 10000 ms