Azure / azure-functions-durable-extension

Durable Task Framework extension for Azure Functions
MIT License
715 stars 271 forks source link

Sub Orchestration stuck in Running state (one activity completed several times) #559

Closed remote-specialist closed 5 years ago

remote-specialist commented 5 years ago

Durable functions version 1.7.0 Steps to reproduce: Start sub orchestration with fan out activities

Actual Behavior Sometimes sub orchestration stuck in Running state - it looks like it process completed tasks badly (one task ("12" in screen) is completed 3 times according to logs and 2 tasks ("7" and "10") were not completed at all) - see attached screen screen1

cgillum commented 5 years ago

That is very odd. Did you encounter this while running in Azure? If so, which region?

remote-specialist commented 5 years ago

Yep - I have encountered this while running in Azure (Central US region). It is not stable to reproduce - it fails with ~10% probability.

cgillum commented 5 years ago

Thanks. I found the execution logs for your orchestration, but I didn't observe anything unusual: 13 activity functions were started and 13 finished, each with unique message IDs. No warnings or errors.

Unfortunately our telemetry doesn't show the EventIds (something I will need to add), so I can't confirm the behavior you're seeing in your storage account. The fact that I'm seeing unique message IDs and you're seeing duplicate Event IDs (with some missing), however, makes me think that any potential bug here would have to be in-memory.

I'll probably need to try reproducing this issue locally.

remote-specialist commented 5 years ago

thx a lot for the investigation! - let me know if you need any additional information.

remote-specialist commented 5 years ago

One more time - task #6 was completed twice. Sub orchestration stuck in Running state: screen2

cgillum commented 5 years ago

Thanks. I tried reproducing your issue locally (sub orchestration doing fan-out / fan-in) but I wasn't able to reproduce the problem, even after 100+ attempts. Could you share a code snippet that gives an idea of what your orchestration looks like?

One thing I'm going to do is add some additional tracing in our next release (v1.7.1) which should help me understand this problem better. If you're still able to reproduce with that new release (due out by tomorrow), I might be able to track this down more effectively.

remote-specialist commented 5 years ago

Sure. I will try to reproduce it on a new build. Code snippets host.json (I set the limit for activities to 4 - maybe it is important for reproducing) `{ "version": "2.0",

"extensions": { "durableTask": { "maxConcurrentActivityFunctions": 4, "maxConcurrentOrchestratorFunctions": 1 } } }`

Orchestrations (O_Worker stucks in Running state - there were no problems with O_Searcher): `namespace JiraReportsProcessor { using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using JiraAPI;

using JiraReportsProcessor.Models;

using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

public static class Orchestrators
{
    [FunctionName("O_FlowProcessor")]
    public static async Task<object> O_FlowProcessor(
        [OrchestrationTrigger] DurableOrchestrationContext context,
        ILogger log)
    {
        if (!context.IsReplaying)
        {
            log.LogWarning("Start O_FlowProcessor");
        }

        try
        {
            // search
            var searchTaskModel = context.GetInput<SearcherTaskModel>();
            var keys = await context.CallSubOrchestratorAsync<List<string>>("O_Searcher", searchTaskModel);
            var workerTaskModel = new WorkerTaskModel { Keys = keys, Start = searchTaskModel.Start };

            // work
            var activities = await context.CallSubOrchestratorAsync<List<ActivityDataModel>>("O_Worker", workerTaskModel);

            // build report
            var reporterTaskModel = new ReporterTaskModel { Start = searchTaskModel.Start, Activities = activities };
            await context.CallActivityAsync<string>("A_Reporter", reporterTaskModel);

            // return
            if (!context.IsReplaying)
            {
                log.LogWarning("Finish O_FlowProcessor");
            }

            return new
            {
                Success = true,
                Error = string.Empty
            };
        }
        catch (Exception e)
        {
            // stop any other tasks
            log.LogError($"Error: {e.Message}!");
            await context.CallActivityAsync("A_Stopper", e.Message);
            return new
                       {
                           Success = false,
                           Error = e.Message
                       };
        }
    }

    [FunctionName("O_Searcher")]
    public static async Task<List<string>> O_Searcher(
        [OrchestrationTrigger] DurableOrchestrationContext context,
        ILogger log)
    {
        if (!context.IsReplaying)
        {
            log.LogWarning("Start O_Searcher");
        }

        var searchTaskModel = context.GetInput<SearcherTaskModel>();
        var jqls = searchTaskModel.GetJqlsWithStart();

        var searcherTasks = new List<Task<List<string>>>();
        foreach (var jql in jqls)
        {
            var task = context.CallActivityAsync<List<string>>("A_Searcher", jql);
            searcherTasks.Add(task);
        }

        var searcherResults = await Task.WhenAll(searcherTasks);

        var allKeys = new List<string>();
        foreach (var result in searcherResults)
        {
            if (result.Count > 0)
            {
                allKeys.AddRange(result);
            }
        }

        if (!context.IsReplaying)
        {
            log.LogWarning("Finish O_Searcher");
        }

        return allKeys.Distinct().ToList();
    }

    [FunctionName("O_Worker")]
    public static async Task<List<ActivityDataModel>> O_Worker(
        [OrchestrationTrigger] DurableOrchestrationContext context,
        ILogger log)
    {
        if (!context.IsReplaying)
        {
            log.LogWarning("Start O_Worker");
        }

        var input = context.GetInput<WorkerTaskModel>();
        var queues = new List<List<string>>();

        for (var i = 0; i < input.Keys.Count; i++)
        {
            if (i % 50 == 0)
            {
                var queue = new List<string>();
                queues.Add(queue);
            }

            queues.Last().Add(input.Keys[i]);
        }

        // prepare tasks. VSTS task: keys.count = 0. Jira task: keys.count > 0 
        var vstsTaskModel = new WorkerTaskModel { Keys = new List<string>(), Start = input.Start };
        var workerTaskModels = new List<WorkerTaskModel> { vstsTaskModel };
        foreach (var keysQueue in queues)
        {
            if (keysQueue.Count <= 0)
            {
                continue;
            }

            var workerTaskModel = new WorkerTaskModel { Start = input.Start, Keys = keysQueue };
            workerTaskModels.Add(workerTaskModel);
        }

        var workerTasks = new List<Task<List<ActivityDataModel>>>();
        foreach (var workerTaskModel in workerTaskModels)
        {
            var task = context.CallActivityAsync<List<ActivityDataModel>>("A_Worker", workerTaskModel);
            workerTasks.Add(task);
        }

        var workerResults = await Task.WhenAll(workerTasks);

        var activities = new List<ActivityDataModel>();
        foreach (var result in workerResults)
        {
            activities.AddRange(result);
        }

        if (!context.IsReplaying)
        {
            log.LogWarning("Finish O_Worker");
        }

        return activities;
    }
}

}`

Activities: `namespace JiraReportsProcessor { using System.Collections.Generic; using System.Threading.Tasks; using Configuration; using JiraAPI; using JiraReportsProcessor.Models; using Microsoft.Azure.WebJobs; using Microsoft.Extensions.Logging;

using Newtonsoft.Json;

using VstsApi;

public static class Activities
{
    [FunctionName("A_Searcher")]
    public static async Task<List<string>> A_Searcher([ActivityTrigger] string jql, ILogger log)
    {
        var client = new Client(Configuration.JiraUrl, Configuration.JiraUser, Configuration.JiraPassword);
        var keys = await client.GetKeysAsync(jql);
        return keys;
    }

    [FunctionName("A_Worker")]
    public static async Task<List<ActivityDataModel>> A_Worker([ActivityTrigger] WorkerTaskModel task, ILogger log)
    {
        var client = new Client(Configuration.JiraUrl, Configuration.JiraUser, Configuration.JiraPassword);
        if (task.Keys.Count > 0)
        {
            // get Jira Activity by keys
            var activities = await client.WorkAsync(task.Keys, task.Start);
            return activities;
        }
        else
        {
            // get vsts activities
            var vstsClient = new VstsClient(client, Configuration.VstsUrl, Configuration.VstsUser, Configuration.VstsToken);
            var start = task.Start;

            var activities = vstsClient.GetCommits(start);
            var activityDataModels = new List<ActivityDataModel>();
            foreach (var activity in activities)
            {
                activityDataModels.Add(new ActivityDataModel(activity));
            }

            return activityDataModels;
        }
    }

    [FunctionName("A_Reporter")]
    public static async Task<string> A_Reporter([ActivityTrigger] ReporterTaskModel task, ILogger log)
    {
        var template = Configuration.Read(
            Configuration.StorageTimeSheetContainer,
            Configuration.StorageTimeSheetTemplateName);
        var html = template.Replace(Configuration.DataContainer, JsonConvert.SerializeObject(task.Activities));

        // replace current report
        var blobName = $"{task.Start.Year}-{task.Start.Month}-{task.Start.Day}.html";
        Configuration.WriteHtml(
            Configuration.StorageTimeSheetContainer,
            blobName,
            html);

        // complete reporting. so next orchestration can be started by Starter
        log.LogWarning("Set Starter.json -> InProgress = False.");
        await Configuration.WriteJsonAsync(
             Configuration.StorageTimeSheetContainer,
             Configuration.StarterTaskBlob,
             JsonConvert.SerializeObject(new StarterTaskModel { InProgress = false }));

        return blobName;
    }

    [FunctionName("A_Stopper")]
    public static async Task A_Stopper([ActivityTrigger] string stopReason, ILogger log)
    {
        var reason = string.IsNullOrEmpty(stopReason) 
                         ? "Error happen. No Reason provided!"
                         : stopReason;

        await Configuration.WriteJsonAsync(
            Configuration.StorageTimeSheetContainer,
            Configuration.StopperTaskBlob,
            JsonConvert.SerializeObject(new { StopReason = reason }));
    }
}

} `

remote-specialist commented 5 years ago

on build 1.7.1 everything is fine - the issue was not reproduced in the last two days.

cgillum commented 5 years ago

That's great to hear! This is the second issue that seems to have been mysteriously "fixed" by 1.7.1, so it seems one of the core issues I addressed and a variety of different symptoms. Thanks for checking and for confirming.

remote-specialist commented 5 years ago

Can we reopen this one - or should I create a new issue? Unfortunately, the error was reproduced again - see attach screen1

cgillum commented 5 years ago

Let's go ahead and reopen.

mark-r-cullen commented 5 years ago

I think we might have seen this same (or similar sounding?) issue today. As well as a reoccurance of #595, but unfortunately didn't have the app insights logging on at the time as we were unable to reproduce.

We had observed activities - the only activity which we call via WithRetry - successfully complete, so no exceptions thrown which would cause it to retry, could see our own app insight log messages suggesting it completed successfully and can see a successful result in DurableFunctionsHubHistory, but for some reason the activity ran again as if it had hit the retry logic.

I can see a TaskScheduleId which has been scheduled once on the same partition key, but actually ran the same activity with different input parameters (the correlation id in the input)... looks a bit suspicious to me?

The input with correlation id ending with 1 is the activity that ran twice, it looks like the other one ran just once and everything was fine.

duplicate task id

cgillum commented 5 years ago

Hi @mark-r-cullen. Sorry for the delay. I took a look at your case by searching for the instance ID in your screenshot. It looks like there was some partition movement that occurred while your orchestration (7daaaf9bd80e414cbec6a381ea7864a8) was executing, causing the orchestration to move from one VM to another. Unfortunately this resulted in several activity functions (I see 36 in this batch) being re-executed.

One of the design challenges with our use of Azure Storage is that Azure Storage doesn't support transactions between Tables and Queues. As a result, this kind of duplication can happen. The alternative is that messages get lost, which we decided is a worse outcome compared to duplicate function execution. We unfortunately do not have a way to deal with this that does not involve the possibility of duplicate activity execution. Because we offer "at least once" messaging guarantees, much like with regular queue trigger functions, our recommendation is to make activity functions idempotent so that duplicate executions don't result in negative business outcomes.

bensaundersnielsen commented 5 years ago

hi @cgillum,

Thanks for your response to @mark-r-cullen .

"at least once" - Is it possible for this duplicate activity execution to be executed simultaneously? Thinking about to what extents we would you need to go to ensure idempotency e.g. taking a 'lock' on some object within a transaction to notify other executions this is being worked on already.

Thanks

mark-r-cullen commented 5 years ago

Hi @cgillum

We believe that this issue may very well be the cause of #595. Over the past few days we have seen a more odd behaviour which we can only explain by inputs to activities / the result of activites being incorrect.

Looking at the DurableFunctionsHubHistory we can see that there are duplicate TaskScheduleIds. They tie up to the activities that we are seeing problems with.

In this particular example, here is what we saw: Activity 1-n -> do stuff Activity 3 -> creates a result, iterates the "assets" property (empty in this case e.g task id 1151) on the result, returns the result with no modifications. Orchestrator -> checks the "isSuccessful" property (had to do this for some exception handling workarounds iirc), its true so we run Activity 4. Activity 4 -> gets passed result of Activity 3, also tries to iterate the "assets" property, throws exception because the property is apparently null.

You can see in this screenshot that TaskScheduleId 1151 (and many others...) actually have two results, from different activities, both of which are successful. The second highlighted one is the actual result from Activity 3 above and the second one is a result from Activity 1-n.

So it seems like what we are actually seeing appears to be the orchestrator trying to deserialize the result of Activity 1-n in to the Activity 3 result class! In this case isSuccessful would still be true, so we would end up in Activity 4, but the assets property doesn't exist, so its deserialized as a null, and thus we end up throwing the exception we are seeing.

durable function 559

This seems like a pretty serious issue if we can't actually rely on an activities input and output being correct. What do you propose we do in this situation? The only thing we can think of doing to try and handle this would be to include a kind of "type" property on our results (like JSON $type?), which we would then be able to check and make sure it matches the actual object that we have.... though it doesn't really feel like this is something that we should have to be doing...

cgillum commented 5 years ago

Thanks @mark-r-cullen for the additional info. A few others have noticed this problem as well and it is indeed pretty serious. FYI @ConnorMcMahon

Are you able to reproduce the issue reliably? If so, would you be able to try out a private fix we created that includes some de-duplication logic? You can get an updated package here:

https://www.myget.org/feed/azure-appservice/package/nuget/Microsoft.Azure.WebJobs.Extensions.DurableTask/1.8.2-private

Note that this requires adding this "azure-appservice" myget feed to your VS/build settings.

mark-r-cullen commented 5 years ago

Hi @cgillum

Thanks for your quick response.

We seem to be seeing the issue quite often at the moment, but can't really reliably reproduce. It seems like it usually happens after a period of inactivity and then a sudden spike of activity? We hammered it this morning after it being idle overnight and it happened straight away.

Will look at getting another dev instance of the application deployed that we can tear down / redeploy and hammer it, to see if we can reliably reproduce the issue. If we can get that, then we can try again with the fix package.

mark-r-cullen commented 5 years ago

Hi @cgillum,

I will double check things tomorrow, but it seems like we have just managed to reproduce this even with your test fix package.

The execution id is 23ea5025f7bb41958a61c21b8f5d1d1a, see e.g task id 121

edit - ignore this... just double checked things and had installed Microsoft.Azure.DurableTask.Core.2.1.2-private instead of Microsoft.Azure.WebJobs.Extensions.DurableTask.1.8.2-private ... whoops!!

mark-r-cullen commented 5 years ago

@cgillum

Looks like we have run in to the issue again. I have double checked the deployment package and it is using 1.8.2-private this time.

The execution id is a02cf3ae181b4eb4b38beb8f115e50f0, see e.g task id 63, 66, 72

cgillum commented 5 years ago

Hmm...that's unfortunate. Others have had success with the new package, so I'll need to look and see what's different about your scenario.

cgillum commented 5 years ago

@mark-r-cullen I took a look at a02cf3ae181b4eb4b38beb8f115e50f0 and it seems to me that while there were some duplicate activity function executions caused by partition movement, they didn't actually break the orchestration instance. The 1.8.2 fix was intended to address the case where duplicate execution (and duplicate task IDs) resulted in a NonDeterministicWorkflowException and/or a stuck orchestration. Can you confirm that your orchestrations are completing successfully in spite of the duplicate activity calls?

mark-r-cullen commented 5 years ago

I dont think the orchestration hung no, just that our activities are being called with incorrect inputs due to the incorrect activity result being deserialized.

cgillum commented 5 years ago

We released the fix for the duplication problem that causes orchestrations to get stuck. The fix can be acquired by updating to v1.8.2.

There are still some duplicate execution issues that we're currently tracking in the DTFx repo: