Azure / durabletask

Durable Task Framework allows users to write long running persistent workflows in C# using the async/await capabilities.
Apache License 2.0
1.47k stars 287 forks source link

Question: Patterns for Parallelization? #533

Open jimmyzimms opened 3 years ago

jimmyzimms commented 3 years ago

So it's not fully clear reading the wiki nor examples if this is supported for scatter-gather scenarios inside orchestrations so bringing it up here. We have a set of work item that need to be done in total but they don't have any requirement for being serially processed. Just the orchestration has to have them all complete prior to finishing.

So lets say we have this naive orchestration that schedules a task per input (doesn't really matter) and when they are all done, it completes.

public class Orchestration : TaskOrchestration<String, List<String>>
{
    public override async Task<String> RunTask(OrchestrationContext context, List<String> input)
    {
        foreach (var thingToDo in input)
        {
            await context.ScheduleTask<Boolean>(typeof(DoWorkTask), thingToDo);
        }

        return "done";
    }
}

So in a short execution time for DoWorkTask with a short enough set of inputs, serialized execution doesn't matter. However let's postulate that DoWorkTask averages say 1 hour of execution time. In addition we regularly have 20 or 30 "DoWorks" we need to perform together. However, if each DoWork didn't care about anything else and wouldn't interfere can we schedule multiple tasks and then await?

Basically something along this naive example (assume there's a extension method to build the parallelized partitioned work for brevity)

public class Orchestration : TaskOrchestration<String, List<String>>
{
    public override async Task<String> RunTask(OrchestrationContext context, List<String> input)
    {
        foreach (List<List<String>> partitionedWork in input.Partition(5)) // Do up to 5 things in parallel
        {
            var tasks = new List<Task>();
            foreach (var thingToDo in partitionedWork)
            {
                tasks.Add(context.ScheduleTask<Boolean>(typeof(DoWorkTask), thingToDo));
            }

            await Task.WhenAll(tasks);
        }

        return "done";
    }
}

so this is the type of thing we're checking for supported use

cgillum commented 3 years ago

At a high level I'd say anything you can do with Tasks in C# you can do with orchestrations in this framework.

As far as how to limit concurrency, I think one way you could achieve this is by doing the following:

class Orchestration : TaskOrchestration<string, List<string>>
{
    public override async Task<string> RunTask(OrchestrationContext context, List<string> input)
    {
        var concurrentTasks = new HashSet<Task<bool>>();
        foreach (string thingTodo in input)
        {
            if (concurrentTasks.Count > 5)
            {
                // Wait for some outstanding task to finish first
                Task<bool> finished = await Task.WhenAny(concurrentTasks);
                concurrentTasks.Remove(finished);
            }

            concurrentTasks.Add(context.ScheduleTask<bool>(typeof(DoWorkTask), thingTodo));
        }

        // Wait for all the rest to finish
        await Task.WhenAll(concurrentTasks);

        return "done";
    }
}

OR - and this seems pretty interesting to me - you could go with the code you already wrote and just implement a Partition(count) method. I imagine it would look something like this:

public class Orchestration : TaskOrchestration<String, List<String>>
{
    public override async Task<String> RunTask(OrchestrationContext context, List<String> input)
    {
        foreach (List<List<String>> partitionedWork in input.Partition(5)) // Do up to 5 things in parallel
        {
            var tasks = new List<Task>();
            foreach (var thingToDo in partitionedWork)
            {
                tasks.Add(context.ScheduleTask<Boolean>(typeof(DoWorkTask), thingToDo));
            }

            await Task.WhenAll(tasks);
        }

        return "done";
    }
}

public static class ExtensionMethods
{
    public static IEnumerable<IEnumerable<T>> Partition<T>(this IEnumerable<T> input, int size)
    {
        var partition = new List<T>(size);
        foreach (T item in input)
        {
            partition.Add(item);
            if (partition.Count == size)
            {
                yield return partition;
                partition = new List<T>(size);
            }
        }

        if (partition.Count > 0)
        {
            yield return partition;
        }
    }
}
jimmyzimms commented 3 years ago

Ok thanks for confirming this is kosher and supported. I've actually run the above concept code repeatedly and it appeared to work in all the tests (other than 1) just fine in a POC over a few hours (basically just had a task with Task.Delay(n) in it to simulate time to execute). However if it wasn't actually supported scenario, it could suddenly just stop working at any time so I needed to explicitly check :) I know there's internally some very fancy scheduler code happening behind the scenes that could possibe not take kindly to the above.

  1. Anyone take issue with editing the wiki to add this as an example?
  2. your example Partition extension above is literally almost identical to the one we used. great minds