microsoft / durabletask-java

Java SDK for Durable Functions and the Durable Task Framework
MIT License
13 stars 7 forks source link

Handlings errors from activity functions when fan-out/fan-in pattern is used #169

Closed akshaykumars10 closed 8 months ago

akshaykumars10 commented 8 months ago

We are implementing the fan-out/fan-in pattern in our use case. In this pattern, If one of the activity function fails, we need to identify the specific function that encountered the error, in the orchestrator function.

Consider the following code block as an example: List<Integer> results = ctx.allOf(parallelTasks).await();

In the event of a failure in any of the activity functions, the results variable remains unpopulated. This means that even if some activity functions have completed successfully, we are unable to distinguish them.

Is there a way to address this issue and accurately identify the successful activity functions?

kaibocai commented 8 months ago

Hi @akshaykumars10 , for allOf, if there any exception happens for one of the activities, we will throw a CompositeTaskFailedException and it contains a list of exceptions that will contain all exceptions that happen during the process. You can inspect the exception list and check which task has failed. For example:

    @FunctionName("Parallel")
    public List<String> parallelOrchestratorSad(
            @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx,
            ExecutionContext context) {
        try {
            List<Task<String>> tasks = new ArrayList<>();
            tasks.add(ctx.callActivity("AppendSad", "Input1", String.class));
            tasks.add(ctx.callActivity("AppendHappy", "Input2", String.class));
            return ctx.allOf(tasks).await();
        } catch (CompositeTaskFailedException e) {
            for (Exception exception : e.getExceptions()) {
                if (exception instanceof TaskFailedException) {
                    TaskFailedException taskFailedException = (TaskFailedException) exception;
                    System.out.println("Task: " + taskFailedException.getTaskName() + " Failed for cause: " + taskFailedException.getErrorDetails().getErrorMessage());
                }
            }
        }
        return null;
    }

    @FunctionName("AppendHappy")
    public String appendHappy(
            @DurableActivityTrigger(name = "name") String name,
            final ExecutionContext context) {
        context.getLogger().info("AppendHappy: " + name);
        return name + "-test-happy";
    }

    @FunctionName("AppendSad")
    public String appendSad(
            @DurableActivityTrigger(name = "name") String name,
            final ExecutionContext context) {
        context.getLogger().info("Throw Test Exception: " + name);
        throw new RuntimeException("Test kaibocai exception");
    }
image

Let me know if this helps with your issue. Thanks.

akshaykumars10 commented 8 months ago

Hi @kaIbocai I'm facing an issue with the CompositeTaskFailedException in my code. It seems that this exception is not being thrown in my case. Here's my code:

                List<String> parallelResult;
                try {
                    parallelResult = ctx.allOf(parallelTasks).await();
                } catch (CompositeTaskFailedException e) {
                    for (Exception exception : e.getExceptions()) {
                        if (exception instanceof TaskFailedException) {
                            TaskFailedException taskFailedException = (TaskFailedException) exception;
                            System.out.println("Task: " + taskFailedException.getTaskName() + " Failed for cause: " + taskFailedException.getErrorDetails().getErrorMessage());
                        }
                    }

In my case, once we have an exception from any activity function, Orchestrator function is going into a failed state with this error: WARNING: The orchestrator failed with an unhandled exception: java.lang.RuntimeException: Unexpected failure in the task execution

Earlier, I was catching TaskFailedException in orchestrator function like this:

List<String> parallelResult;
                try {
                    parallelResult = ctx.allOf(parallelTasks).await();
                } catch (TaskFailedException e) {
                    LOGGER.info("{}: Task failed after all the retries: {}", instanceId, e.getErrorDetails());
                    }

This was working but I was not able to distinguish the failed task.

Am I missing something?

kaibocai commented 8 months ago

Hi @akshaykumars10, can you provide me the error details in the TaskFailedException caught in your function app? Also, is it possible we can get a simple repro code for this issue? Thanks.

akshaykumars10 commented 8 months ago

Hi @kaibocai I attempted to compare your sample code with my orchestrator function. The only distinction I found is that I also incorporate retries when calling activity functions. When I introduce retries into your code, it encounters the same error:

 @FunctionName("Parallel")
    public List<String> parallelOrchestratorSad(
            @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx,
            ExecutionContext context) {
        try {
            List<Task<String>> tasks = new ArrayList<>();
            RetryPolicy policy = new RetryPolicy(2, Duration.ofSeconds(15));
            TaskOptions options = new TaskOptions(policy);
            tasks.add(ctx.callActivity("AppendSad", "Input1", options, String.class));
            tasks.add(ctx.callActivity("AppendHappy", "Input2", options, String.class));
            return ctx.allOf(tasks).await();
        } catch (CompositeTaskFailedException e) {
            for (Exception exception : e.getExceptions()) {
                if (exception instanceof TaskFailedException) {
                    TaskFailedException taskFailedException = (TaskFailedException) exception;
                    System.out.println("Task: " + taskFailedException.getTaskName() + " Failed for cause: " + taskFailedException.getErrorDetails().getErrorMessage());
                }
            }
        }
        return null;
    }

    @FunctionName("AppendHappy")
    public String appendHappy(
            @DurableActivityTrigger(name = "name") String name,
            final ExecutionContext context) {
        context.getLogger().info("AppendHappy: " + name);
        return name + "-test-happy";
    }

    @FunctionName("AppendSad")
    public String appendSad(
            @DurableActivityTrigger(name = "name") String name,
            final ExecutionContext context) {
        context.getLogger().info("Throw Test Exception: " + name);
        throw new NullPointerException("Test kaibocai exception");
    }

Please inform me if you can replicate this issue or if you require additional details.

kaibocai commented 8 months ago

Hi @akshaykumars10 , thanks for your reply, I can repro the issue. Currently, I am working on the fix of it. Thanks.

akshaykumars10 commented 8 months ago

Hi @kaibocai, do you know when this fix is scheduled to be released?

kaibocai commented 8 months ago

Hi @akshaykumars10, we will have a release by the end of next week. There are a few more updates we are working on for this release. Thanks.

kaibocai commented 8 months ago

@akshaykumars10 the latest version v1.5.0 is out here https://repo.maven.apache.org/maven2/com/microsoft/durabletask-azure-functions/, which contains the fix.

akshaykumars10 commented 8 months ago

Thank you, @kaibocai. It is working as expected.

I have a quick question:

Let's say we submit 2 tasks for execution using the fan-out/fan-in pattern. If one of these tasks fails with an exception, we can retrieve its details by catching a CompositeTaskFailedException. But what about the task that was successful? How can I access its response in the orchestrator function's code?"

kaibocai commented 8 months ago

Let's say we submit 2 tasks for execution using the fan-out/fan-in pattern. If one of these tasks fails with an exception, we can retrieve its details by catching a CompositeTaskFailedException. But what about the task that was successful? How can I access its response in the orchestrator function's code?"

Hi @akshaykumars10 , based on the current implementation I don't think we have a way to access the success task, when the other task is failing. Can you please open a new issue and describe this situation? so we can plan to support it in future. Thanks.

akshaykumars10 commented 8 months ago

Hi @kaibocai, As suggested, I have created a new issue: https://github.com/microsoft/durabletask-java/issues/181