dapr / dotnet-sdk

Dapr SDK for .NET
Apache License 2.0
1.12k stars 340 forks source link

Cannot use Task.Run to execute multiple activities in fan-in-out scenario in a workflow #1356

Closed tmarkovski closed 1 month ago

tmarkovski commented 1 month ago

Expected Behavior

Cannot run fan-in-out scenario with composite tasks.

Actual Behavior

I should be able to run and compose tasks. In this case, the code never completes, the activities are not called.

Steps to Reproduce the Problem

This results in the task never completing:

// In a Workflow implementation

var tasks = new List<Task<string>>();

foreach (var name in input)
{
    tasks.Add(Task.Run(async () =>
    {
         await context.CallActivityAsync<string>(nameof(CreateGreetingActivity), name));
         await context.CallActivityAsync<string>(nameof(CreateGreetingActivity), name));
    });
}

var messages = await Task.WhenAll(tasks);

Note that the following is possible and works. The tasks will run in parallel and get awaited correctly:

// In a Workflow implementation

var tasks = new List<Task<string>>();

foreach (var name in input)
{
    tasks.Add(context.CallActivityAsync<string>(nameof(CreateGreetingActivity), name));
}

var messages = await Task.WhenAll(tasks);

Release Note

I'm not sure if this is a bug, but one potential way would be to use custom TaskScheduler.

WhitWaldo commented 1 month ago

I wonder to what degree your use of Task.Run to wrap the activities before awaiting it externally isn't messing with this. Have you tried implementing it more aligned with the example from the docs (bottom of the section):

//Revisiting the earlier example...
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("GetWorkBatch", null);

const int MaxParallelism = 5;
var results = new List<int>();
var inFlightTasks = new HashSet<Task<int>>();
foreach(var workItem in workBatch)
{
  if (inFlightTasks.Count >= MaxParallelism)
  {
    var finishedTask = await Task.WhenAny(inFlightTasks);
    results.Add(finishedTask.Result);
    inFlightTasks.Remove(finishedTask);
  }

  inFlightTasks.Add(context.CallActivityAsync<int>("ProcessWorkItem", workItem));
}
results.AddRange(await Task.WhenAll(inFlightTasks));

var sum = results.Sum(t => t);
await context.CallActivityAsync("PostResults", sum);
tmarkovski commented 1 month ago

@WhitWaldo I edited my original post to add comment that the scenario you're describing works fine. My use case is simply running groups of activities that need to execute in parallel, but within the group, they should still execute in order, and pass inputs and outputs between them. A workaround in my case would be to group the activities in a separate workflow, and then run those child workflows in parallel. This would achieve the same goal, but it would make the code less readable as my tasks are now spread into different workflows.

You might be right that Task.Run is likely messing with it, although outside of Dapr context, this is a perfectly fine way of composing tasks.

tmarkovski commented 1 month ago

Alright, I tried a different approach, and it seems to work fine. I don't know why Task.Run behaves like this.

// In a Workflow implementation

var tasks = new List<Task<string>>();

foreach (var name in input)
{
    tasks.Add(ProcessTasks());
}

var messages = await Task.WhenAll(tasks);

async Task ProcessTasks()
{
    await context.CallActivityAsync<string>(nameof(CreateGreetingActivity), name));
    await context.CallActivityAsync<string>(nameof(CreateGreetingActivity), name));
}
WhitWaldo commented 1 month ago

I expect it has to do with the event sourcing and how it's scoping activities by task. Perhaps the workflow is easily able to replay the tasks through when it is awaited, whether an individual activity or via Task.WhenAll, but my bet is that by using Task.Run to wrap the async activities, you're effectively hiding these operations from the workflow scheduling.

tmarkovski commented 1 month ago

That makes sense. Thank you for your thoughts. Closing this, as I have a solution.