dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.06k stars 2.03k forks source link

Fan-out processing? #7676

Closed turowicz closed 2 years ago

turowicz commented 2 years ago

I have a UnitGrain that posts a message to a Stream, which then gets picked up by GeneratorGrain that needs to call N different SubGeneratorGrains. Currently the GeneratorGrain calls them asynchronously inside a foreach statement. I would like to make this generation work in parallel, ie. every call to SubGeneratorGrain happen at the same time.

Can I achieve that with TPL or TPL Dataflow?

turowicz commented 2 years ago

cc @ReubenBond @benjaminpetit

turowicz commented 2 years ago

Should I look into this pattern and make GeneratorGrain a StatelessWorker?

https://github.com/OrleansContrib/DesignPatterns/blob/master/Dispatcher.md

turowicz commented 2 years ago

Or maybe I can just add these Tasks to the list and await all?

JorgeCandeias commented 2 years ago

Regular TPL fan-out code works fine and with the same behaviour. You can use Task.WhenAll() on some enumerable of fan-out tasks to await for completion of all the work.

There are a couple of extra things to watch out for, due to the distributed context of the work.

The TPL Dataflow in particular works fine with Orleans. Timeout rules still apply.

Here's some pseudo-code for basic fan-out, this assumes some workload variable exists of type IEnumerable<WorkItem>.


Task DoWorkloadAsync(WorkItem item)
{
    return GrainFactory.GetGrain<ISubGeneratorGrain>(<key>).DoSomeWork(<parameters>);
}

var tasks = new List<Task>();
var pending = new List<WorkItem>(); // for option 3
var failures = new List<WorkItem>();  // for option 3
foreach (var item in workload)
{
    tasks.Add(DoWorkloadAsync(item));
    pending.Add(item); // for option 3
}

// option 1: await all of them - if one fails, the fan-out fails
await Task.WhenAll(tasks);

// option 2: handle each fan-out call as it completes and decide what to do
foreach (var task in tasks)
{
    try
    {
        await task;
    }
    catch (Exception ex)
    {
        // handle the exception here
    }
}

// option 3: handle each fan-out as it completes and prepare for fan-out retry
for (var i = 0; i < tasks.Count; i++)
{
    try
    {
        await tasks[i];
    }
    catch (Exception ex)
    {
        // log exception and keep the work item for retry
        failures.Add(pending[i]);
    }
}
ghost commented 2 years ago

We're moving this issue to the 4.0-Planning milestone for future evaluation / consideration. Because it's not immediately obvious that this is a bug in our framework, we would like to keep this around to collect more feedback, which can later help us determine the impact of it. We will re-evaluate this issue, during our next planning meeting(s). If we later determine, that the issue has no community involvement, or it's very rare and low-impact issue, we will close it - so that the team can focus on more important and high impact issues.

turowicz commented 2 years ago

Thank you for the replies. Eventually the best option was to split the Stream by more IDs (each SubGenerated Report has one) so I get all SubGenerations to fire at the same time. I tried with the TPL way like @JorgeCandeias suggested but somehow it did not "parallelise" the workflow.