Azure / azure-functions-durable-js

JavaScript library for using the Durable Functions bindings
https://www.npmjs.com/package/durable-functions
MIT License
128 stars 46 forks source link

context.df.Task.all throws as soon as one task fails so one can't get the result of other tasks #477

Open GP4cK opened 1 year ago

GP4cK commented 1 year ago

Describe the bug context.df.Task.all throws as soon as one task fails so one can't get the result of other tasks. I've made a minimum reproducible repo here: https://github.com/GP4cK/azure-durable-task-all-bug.

Investigative information

To Reproduce Steps to reproduce the behavior:

  1. Clone https://github.com/GP4cK/azure-durable-task-all-bug
  2. Follow the steps in the README

Expected behavior I'm expecting to be able to loop over the tasks and that for each task, task.isCompleted || task.isFaulted should be true and task.result should not be undefined (if the Activity/SubOrchestration is returning something).

Actual behavior When the first task fails, the second task is neither completed or faulted and its result is undefined.

Known workarounds I guess the workaround is to not use the fan-out fan-in pattern and use the chaining pattern instead.

davidmrdavid commented 1 year ago

@GP4K: I noticed that reproducer has what looks like an explicit storage account connection string. You may want to remove it if so.

Looking at this issue shortly

davidmrdavid commented 1 year ago

@GP4cK:

Ah, you've stumbled upon a bit of a low level detail in the implementation. Let me try to clarify what's going on and how to achieve the result you want.

When you schedule tasks with Task.all, all those task get scheduled and are guaranteed to produce a result. However, the semantics of Task.all only need a single failing task in order to error out. Therefore, as soon as single task results in an exception (and that exception becomes available to your orchestrator), your orchestrator will stop waiting for the other results and move on to the next line of your code.

That's pretty much what's happening here. Your orchestrator is no longer waiting for other results. It should be noted that this is the intended behavior. Put simply Task.all only waits for all results if all tasks succeed. However, those tasks are running in the background and will produce a result, so obtaining them is possible, but it requires a few extra steps.

Given today's API, to obtain the result of your ActivitySuccess task, you would need to explicitly yield it after the Task.all operation produces its exception. That tells the framework that you want to "materialize" the result of that task, and therefore the framework will do the work of fetching its result, if available, from the orchestrator History. If it's available, it will provide. If it's not available yet, the orchestrator will suspend until the task provides an output.

To summarize: you can use Task.all to schedule, in parallel, many tasks. However, if any of them fail, Task.all will short-circuit waiting for all results and produce an exception. After they've been scheduled, yu can force a task's result by yield'ing it explicitly. Since you have a collection of tasks whose results you want to obtain, then you could just iterate over them (assuming you have them saved in a list) via a .map operation or something equivalent.

Please let me know if that clarifies your question. Thanks!

FYI - @hossam-nasr. This is an interesting case. Perhaps something we can facilitate in future releases.

GP4cK commented 1 year ago

@GP4K: I noticed that reproducer has what looks like an explicit storage account connection string. You may want to remove it if so.

That's the default connection string to Azurite so it's ok.

Thanks for the explanation. So if I translate it to code, that should be it, right?

try {
  yield context.df.Task.all(tasks);
} catch (err) {
  // One task failed
}

for (let task of tasks) {
  try { // Could yield task throw? Do I need to use try/catch?
    yield task;
    // Do something with task.result
  } catch (err) {
  }
}

Or should I filter the tasks that haven't completed yet and call Task.all recursively like:

const pendingTasks = task.filter((task) => !task.isCompleted && !task.isFaulted);
if (pendingTasks.length === 0) return;
try {
  yield context.df.Task.all(pendingTasks);
} catch (err) {
  // check again if there are pending tasks
}
davidmrdavid commented 1 year ago

I recommend option 1 as it guarantees that, at the end of the loop, all your tasks have completed. With your second snippet, you have to check again and again on different subsets of "pendingTasks", as you noted.

GP4cK commented 1 year ago

I see. But regarding performances, it would be roughly the same since the tasks have been initially scheduled by the orchestrator, right?

davidmrdavid commented 1 year ago

Yes, that's correct. Only the first Task.all should be needed to schedule all tasks. Performance wise, this shouldn't be an issue.

GP4cK commented 1 year ago

Thank you very much for your help. It would be really nice if this was documented because there are other issues that are confusing. Ex: https://github.com/Azure/azure-functions-durable-js/issues/66#issuecomment-484984188

ejizba commented 1 year ago

It appears like Task.all is consistent with the Node.js Promise.all method in terms of throwing on the first error. However, Node.js also has a Promise.allSettled method which I believe is consistent with the requested behavior. Perhaps durable should add a Task.allSettled method to match

danieltskv commented 1 year ago

Hi @davidmrdavid,

I have a similar scenario where I want to schedule multiple parallel activity functions where some can fail or throw an error, and thats is fine, as I will only use the results of the ones that succeeded.

The issue I'm noticing with the proposed solution to @GP4cK's code is that some activity functions that eventually succeed are being called multiple times.

I was assuming the later loop that iterates over the tasks simply continues or fetches the result, but it seems that when the replay happens on the orchestrator function, some activity functions are ran again. I notice this by putting breakpoints and logs inside the activity functions. I can provide sample code if needed.

Is this expected behavior? Is there anything I can do to prevent re-calling the activity functions? I.e., only run each activity functions once, regardless of its result.

Thank you

davidmrdavid commented 1 year ago

Hi @danieltskv,

That is definitely not expected behavior. Do you think you could open a new issue, link this ticket in it, and provide some reproducer code? That would help us get to the bottom of this. Thanks!

danieltskv commented 1 year ago

@davidmrdavid Done: https://github.com/Azure/azure-functions-durable-js/issues/485 Thanks!

IDrumsey commented 1 year ago

+1 for the Task.allSettled function as mentioned by @ejizba. Would be helpful.

IDrumsey commented 1 year ago

I recommend option 1 as it guarantees that, at the end of the loop, all your tasks have completed. With your second snippet, you have to check again and again on different subsets of "pendingTasks", as you noted.

@davidmrdavid, I have attempted to apply solution 1 (from @GP4cK) in Python. It may work for Node.js, but doesn't seem to work for Python. Here is the code I'm running and I can move this to a new issue in https://github.com/Azure/azure-functions-durable-python/issues if requested.

orchestrator

def orchestrator_function(context: df.DurableOrchestrationContext):

    tasks = [
        context.call_activity('Hello1'),
        context.call_activity('Hello1'),
        context.call_activity('Hello1')
    ]

    try:

        allResults = yield context.task_all(tasks)

    except Exception as e:

        # one of the tasks threw an exception so the orchestrator is no longer waiting for the tasks to all complete
        # Need to re-attach the orchestrator to the tasks to wait for them
        # https://github.com/Azure/azure-functions-durable-js/issues/477#issuecomment-1428850454

        for task in tasks:

            try:
                result = yield task

            except Exception as e:

                pass

    return 'done'

main = df.Orchestrator.create(orchestrator_function)

Hello1 activity function

def main(name: str) -> str:

    raise Exception('test error')

After running the orchestrator function, I receive the following response

Orchestrator function 'test1' failed: 'AtomicTask' object has no attribute 'append'

Pretty sure this relates to this comment of yours. This issue may resolve this my issue as well? Looks like you had said it would take a couple of weeks awhile ago and it hasn't been resolved, is there a timeline for this?

Is there a workaround for this currently? I want to have the results for all successful tasks even if some of the tasks raised an exception. The only way I can think of currently is to run some non-orchestrator code that externally calls the activity function for each task and track each task individually. This way the exception handling will be for each call, not task_all and I likely wouldn't run into the AtomicTask bug. What are your thoughts on this whole situation? Again, I think @ejizba's solution would be a good long term solution.

davidmrdavid commented 1 year ago

Hi @IDrumsey:

Yes it would be great if you could re-open that request in the Python SDK repo (https://github.com/Azure/azure-functions-durable-python/issues ) and attach a .zip'ed reproducer that I can use the quickly identify the issue. Thanks!