Azure / azure-functions-durable-extension

Durable Task Framework extension for Azure Functions
MIT License
713 stars 267 forks source link

Events are raised but not processed #1410

Closed wynandjordaan closed 4 years ago

wynandjordaan commented 4 years ago

Description

We have a few durable functions that are based on a workflow process. Recently we became aware that some of the EventsRaised are not processed and our CustomStatus is then out of sync with what we were expecting. I have tried a few times already to reproduce it locally, but only managed to get it into this state every now and then. Although its not easy to reproduce it does happen more in our production environment. It feels to me like an internal race condition of some sort :), my best guess is that when a new Event is Raised somewhere one of the other unprocessed events are cleared. Our function can be simplified in the following example: A Durable Function with one External event "TriggerRaised" and a timer task, we then do a WaitAny on them both. Each TriggerRaised task must complete 4 Activities and multiple TriggerRaised events can be Raised at any point.

Expected behavior

All the events raised are processed and none of them are just ignored. So if we receive 5 events the 4 activities for each of them are processed.

Actual behavior

I can clearly see in the CustomState that our processing has missed an event every now and then. When examining the History table, the EventRaised are there but then there are some missing activities.

Relevant source code snippets

Orchestrator

    public class TestJobWorkflow
    {
        private readonly IFunctionLogger<TestJobWorkflow> _logger;
        private readonly List<string> _messages = new List<string>();
        private DurableOrchestrationContext _context;

        public TestJobWorkflow(IFunctionLogger<TestJobWorkflow> logger)
        {
            _logger = logger;
        }

        [FunctionName(nameof(TestJobWorkflow))]
        public async Task RunOrchestrator([OrchestrationTrigger] DurableOrchestrationContext context)
        {
            try
            {
                _logger.LogInformation("Starting workflow {WorkflowName}", nameof(TestJobWorkflow));

                _context = context;

                var input = context.GetInput<TestTriggerModel>();
                _logger.LogInformation("Input: {@Input}", input);

                _messages.Add($"{_context.CurrentUtcDateTime:yyyy-MM-dd HH:mm:ss}: Received external trigger with input: {input.Message}");
                _context.SetCustomStatus(_messages);

                const string endConditionMessage = "DONE";
                while (input?.Message != endConditionMessage)
                {
                    using (var cancellationTokenSource = new CancellationTokenSource())
                    {
                        var triggersToAwait = new List<Task>();

                        Task<TestTriggerModel> externalTriggerTask = _context.WaitForExternalEvent<TestTriggerModel>(Constants.Workflow.Triggers.TriggerRaised);
                        if (externalTriggerTask != null)
                            triggersToAwait.Add(externalTriggerTask);

                        Task timeOutTriggerTask = _context.CreateTimer(_context.CurrentUtcDateTime.AddMinutes(2), cancellationTokenSource.Token);
                        if (timeOutTriggerTask != null)
                            triggersToAwait.Add(timeOutTriggerTask);

                        var triggers = triggersToAwait.Where(x => x != null);
                        var winner = await Task.WhenAny(triggers);

                        cancellationTokenSource.Cancel();

                        if (winner is Task<TestTriggerModel> externalTrigger)
                        {
                            input = externalTrigger.Result;
                            _messages.Add($"{_context.CurrentUtcDateTime:yyyy-MM-dd HH:mm:ss}: Received external trigger with input: {input.Message}");

                        }
                        else if (winner == timeOutTriggerTask)
                        {
                            _messages.Add($"{_context.CurrentUtcDateTime:yyyy-MM-dd HH:mm:ss}: Time Out Trigger");
                        }
                        _context.SetCustomStatus(_messages);

                        await ProcessTrigger(input);
                    }

                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Workflow Error: {ErrorMessage}", ex.Message);
                throw new Exception($"Error processing generic job: {ex.Message}", ex);
            }
        }

        private async Task ProcessTrigger(TestTriggerModel input)
        {
            await PublishProcessTriggerStatusUpdate(input);

            await TransitionStatus(input);

            await PublishJobStatusChanges(input);

            await PublishProcessTriggerStatusUpdate(input);
        }

        private async Task PublishProcessTriggerStatusUpdate(TestTriggerModel input)
        {
            var command = new ActivityCommand<TestTriggerModel>() { Data = input };
            await _context.CallActivityAsync<ActivityCommand<TestTriggerModel>>(nameof(PublishTriggerStatusActivity), command);
        }

        private async Task TransitionStatus(TestTriggerModel input)
        {
            var command = new ActivityCommand<TestTriggerModel>() { Data = input };
            await _context.CallActivityAsync<ActivityCommand<TestTriggerModel>>(nameof(TransitionStatusActivity), command);
        }

        private async Task PublishJobStatusChanges(TestTriggerModel input)
        {
            var command = new ActivityCommand<TestTriggerModel>() { Data = input };
            await _context.CallActivityAsync<ActivityCommand<TestTriggerModel>>(nameof(PublishJobStatusChangesActivity), command);
        }

    }

Trigger

    public class TriggerTestWorkflow : IActivity
    {
        private readonly IFunctionLogger _logger;

        public TriggerTestWorkflow(IFunctionLogger<TriggerTestWorkflow> logger)
        {
            _logger = logger;
        }

        [FunctionName("TriggerTestWorkflow")]
        public async Task<HttpResponseMessage> Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "testworkflow/trigger/{instanceId:guid}")]
            HttpRequestMessage request,
            string instanceId,
            [OrchestrationClient] DurableOrchestrationClientBase orchestrationClient)
        {
            try
            {
                var parsedInstanceId = string.IsNullOrWhiteSpace(instanceId) ? Guid.NewGuid() : Guid.Parse(instanceId);
                if (parsedInstanceId == Guid.Empty)
                    parsedInstanceId = Guid.NewGuid();

                instanceId = parsedInstanceId.ToString("N");

                var input = await request.Content.ReadAsAsync<TestTriggerModel>();

                await EnsureRunningInstance.EnsureRunningInstanceExistsAsync(orchestrationClient, instanceId, _logger);

                await orchestrationClient.RaiseEventAsync(instanceId, Constants.Workflow.Triggers.TriggerRaised, input);

                var response = request.CreateResponse(HttpStatusCode.OK);
                return response;
            }
            catch (UnauthorizedAccessException noAuthEx)
            {
                _logger.LogWarning(noAuthEx, "Unauthorised access attempt for input: {Input}, {Message}", request, noAuthEx.Message);
                var response = request.CreateErrorResponse(HttpStatusCode.BadRequest, noAuthEx.Message, noAuthEx);
                return response;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error: {ErrorMessage}", ex.Message);
                var response = request.CreateErrorResponse(HttpStatusCode.InternalServerError, ex.Message, ex);
                return response;
            }
        }
    }

App Details

Screenshots

Here is the History table with the failed event. As you can see one Event's 4 activities are missing. (I am filtering Name > '') Annotation 2020-07-16 210243 Here is the following instance that worked as ecpected. 3 Events raised and 3 time the 4 activities. (Then there is two minute gap and the Timer kicks in) Annotation 2020-07-16 210508

This is from our production environment: Annotation 2020-07-16 212424

If deployed to Azure

This test instance is not deployed on Azure, but I have provided our production instance where we see this problem daily. My test example is very simplified.

olitomlinson commented 4 years ago

@wynandjordaan are you able to expand the orchestration code so we can see the entire function. Thanks.

wynandjordaan commented 4 years ago

I have updated the original post with the source code

olitomlinson commented 4 years ago

@wynandjordaan I would definitely recommend upgrading to 1.8.6 before you go any further. There have been many improvements particularly around race conditions which may help.

Also, you might want to consider bringing the _messages variable into the scope of your function. As it stands, its currently outside the function which means its state can be lost as it won't be tracked by the DF runtime.

I would also wrap calls to messages.Add with if (!context.IsReplaying) as you don't want to clutter your customStatus with messages about replays. ignore this, I think its the wrong advice for what you are trying to do.

Finally for now, I notice that the durable timer is never cancelled. Is this by design? I ask because its a possibility that after many iterations of the while loop there may be several durable timers scheduled. I'm not sure what implications this may have. If possible, cancel the Durable Timer using cancellationTokenSource.Cancel();

Something like..

if (winner is Task<TestTriggerModel> externalTrigger)
{
   input = externalTrigger.Result;
   _messages.Add($"{_context.CurrentUtcDateTime:yyyy-MM-dd HH:mm:ss}: Received external trigger with input: {input.Message}");
   cancellationTokenSource .Cancel();
}

Give these changes a try for now, see if you get more consistent results. If not, we can keep digging.

p.s. I think this kind of design where you have a potential infinite loop coupled with waiting for events, can be better represented using an eternal orchestration. It might be easier to debug whats happening if the code is in the eternal orchestration format, but I think that's just personal preference.

wynandjordaan commented 4 years ago

@olitomlinson thanks for the reply. Will definitely upgrade - from what I saw in the Commits it looked like version we are using has fixed some race conditions already.

Finally for now, I notice that the durable timer is never cancelled. Is this by design?

We do cancel it already just after our WhenAny.

Task timeOutTriggerTask = _context.CreateTimer(_context.CurrentUtcDateTime.AddMinutes(2), cancellationTokenSource.Token);
if (timeOutTriggerTask != null)
triggersToAwait.Add(timeOutTriggerTask);

var triggers = triggersToAwait.Where(x => x != null);
var winner = await Task.WhenAny(triggers);

cancellationTokenSource.Cancel();

if (winner is Task<TestTriggerModel> externalTrigger)
{
      input = externalTrigger.Result;
      _messages.Add($"{_context.CurrentUtcDateTime:yyyy-MM-dd HH:mm:ss}: Received external trigger with input: {input.Message}");
}

p.s. I think this kind of design where you have a potential infinite loop coupled with waiting for events, can be better represented using an eternal orchestration. It might be easier to debug whats happening if the code is in the eternal orchestration format, but I think that's just personal preference.

I will have a look at the eternal orchestration pattern also, might make our code less complex.

Thanks for the info. Will keep in touch.

wynandjordaan commented 4 years ago

I upgraded the project to 1.8.6, but still I am seeing this behavior in our production environment.

These have both received two events and only processes one of them. OrchestrationIDs: 3cf3c9b1283246b380a1e50c4bb12a57, baae3d1066f24590bc5ba2fa524fb983 Timestamp: Around 2020-07-17T15:01:32.439Z and 2020-07-17T15:46:13.735Z

image

I have also managed to recreate it locally again.

cgillum commented 4 years ago

This sounds similar to the problem being described in https://github.com/Azure/azure-functions-durable-extension/issues/1409. I'm currently investigating.

wynandjordaan commented 4 years ago

Yes I was following that problem also.

Thanks man. Good luck there.

This has been haunting us for the last two months..

Let me know if you need more info.

cgillum commented 4 years ago

Unfortunately I'm not able to reproduce this problem using some samples that I created by hand. I'm doing essentially the same thing:

static int MessageCount = 100;

[FunctionName(nameof(ExternalEventRaceOrchestrator))]
public static async Task ExternalEventRaceOrchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context,
    ILogger log)
{
    for (int i = 0; i < MessageCount; i++)
    {
        await WaitForEventAndDelay(context, log);
    }
}

static async Task WaitForEventAndDelay(DurableOrchestrationContext context, ILogger log)
{
    using (var cancellationTokenSource = new CancellationTokenSource())
    {
        Task t1 = context.WaitForExternalEvent("MyEvent");
        Task t2 = context.CreateTimer(context.CurrentUtcDateTime.AddSeconds(30), cancellationTokenSource.Token);
        if (t2 == await Task.WhenAny(t1, t2))
        {
            if (!context.IsReplaying)
            {
                log.LogWarning("Failed to receive one of the events!");
            }
        }

        cancellationTokenSource.Cancel();
    }
}

Any more hints you can provide that might help me reproduce this problem?

wynandjordaan commented 4 years ago

From what I saw in our logs the problem must come in somewhere while the one Activity is being processed and a new external event is raised. So what you are missing is the CallActivity parts - even with a slight delay in there.

In addition to the above I also used a Azure Storage account not the local developer emulator. In our workflows we have to wait quite a while before we get our first EventRaised, this might be that when the Azure function sleeps that we get the external event. I stop and start my function also after I sent the start, not sure if this brings out the race condition.

In my example I also have the ProcessTrigger part that calls 4 activites, maybe you can add something similar to your sample also.

 await ProcessTrigger(input);

I use a linqpad script to trigger the workflow as fast as I can without waiting for the HTTP response.

Check the timings on this instance: after both instances where we got two External Events there were supposed to be double the amount of Events processed. image

I am in the process of creating a test sample for you that I will share with you.

Hopefully this helps you. We are seeing this issue daily in our production environment.

Thanks for the help

olitomlinson commented 4 years ago

@wynandjordaan

We do cancel it already just after our WhenAny.

var winner = await Task.WhenAny(triggers);

 cancellationTokenSource.Cancel();

Apologies! I completely missed that!

This could be a red herring, but I do notice that you are calling Cancel immediately after the whenAny

In the docs for Timers, it suggests cancelling when the winner is the non-timer task. @cgillum could this cause a problem, or is it perfectly legal to cancel the timer immediately after the whenAny?

wynandjordaan commented 4 years ago

@olitomlinson I will give it a try - to see if it changes anything

cgillum commented 4 years ago

@olitomlinson I wondered the same thing, which is why I chose to copy the cancellation strategy here in my own repro. I can't think of a reason why it would be problematic, but it doesn't seem to cause any issues from what I've observed. It's just odd the "cancel" something that has already completed. :)

@wynandjordaan If you're able to provide a test sample, that would be really helpful. However, I'll still try some of the variations you suggested. The use of a real storage account is notable because a real storage account is massively faster, even from a local machine. If this is a race condition, then that certainly matters.

cgillum commented 4 years ago

I've improved my test app and configured it to run against Azure Storage directly, but I'm still not able to reproduce this. Here is my full code, including the HTTP trigger at the bottom. I'm sending 1,000 events to the orchestrator, which processes them in a loop and runs 3 activity functions after each processed event. In my tests, 100% of events are getting processed. I'm using v1.8.6 running on a Functions v3 host.

public static class ExternalEventRace
{
    const int MessageCount = 1000;

    [FunctionName(nameof(ExternalEventRaceOrchestrator))]
    public static async Task ExternalEventRaceOrchestrator(
        [OrchestrationTrigger] DurableOrchestrationContext context,
        ILogger log)
    {
        for (int i = 0; i < MessageCount; i++)
        {
            string result = await WaitForEventAndDelay(context, log);

            await context.CallActivityAsync(nameof(DoSomething), "A" + result);
            await context.CallActivityAsync(nameof(DoSomething), "B" + result);
            await context.CallActivityAsync(nameof(DoSomething), "C" + result);
        }
    }

    static async Task<string> WaitForEventAndDelay(DurableOrchestrationContext context, ILogger log)
    {
        string result;
        using (var cancellationTokenSource = new CancellationTokenSource())
        {
            Task<int> t1 = context.WaitForExternalEvent<int>("MyEvent");
            Task t2 = context.CreateTimer(context.CurrentUtcDateTime.AddSeconds(10), cancellationTokenSource.Token);
            if (t2 == await Task.WhenAny(t1, t2))
            {
                log.LogWarning("Failed to receive one of the events!");
                result = "FAIL";
            }
            else
            {
                result = t1.Result.ToString();
            }

            cancellationTokenSource.Cancel();
        }

        return result;
    }

    [FunctionName(nameof(DoSomething))]
    public static void DoSomething([ActivityTrigger] DurableActivityContext context, ILogger log)
    {
        string input = context.GetInput<string>();
        log.LogWarning($"Executing \"DoSomething({input})\"...");
        Thread.Sleep(10);
    }

    [FunctionName(nameof(ExternalEventRaceHttpTrigger))]
    public static async Task<IActionResult> ExternalEventRaceHttpTrigger(
        [HttpTrigger] HttpRequest request,
        [OrchestrationClient] DurableOrchestrationClient client,
        ILogger log)
    {
        string instanceId = await client.StartNewAsync(nameof(ExternalEventRaceOrchestrator), null);
        while (true)
        {
            var status = await client.GetStatusAsync(instanceId, showHistory: false);
            if (status?.RuntimeStatus != OrchestrationRuntimeStatus.Running)
            {
                await Task.Delay(TimeSpan.FromSeconds(1));
            }

            break;
        }

        log.LogWarning($"Sending {MessageCount} events in sequence...");

        int concurrency = 5;
        var tasks = new HashSet<Task>();
        for (int i = 0; i < MessageCount; i++)
        {
            tasks.Add(client.RaiseEventAsync(instanceId, "MyEvent", i));
            if (tasks.Count > concurrency)
            {
                Task completed = await Task.WhenAny(tasks);
                tasks.Remove(completed);
            }
        }

        HttpManagementPayload response = client.CreateHttpManagementPayload(instanceId);
        return new OkObjectResult(response);
    }
}

Let me know if you have other ideas.

cgillum commented 4 years ago

I'm wondering if the problem is actually different than what I think it is. I looked further back in time to see everything that happened to 3cf3c9b1283246b380a1e50c4bb12a57 and the fact that there is no ExternalEventSaved trace event is puzzling. It makes me believe that the problem has to do with the fact that a previous loop iteration ended without receiving any event (because of a timeout) and this is causing a subsequent iteration to handle multiple concurrent events in a way that one of them results in no action. I'll need to look at this more closely to see if there is a more general design problem here related to this pattern (while loop + external events + timeouts).

There might be another way to author your orchestration logic that gets around this kind of problem. Let me think about this a bit more and get back to you.

cgillum commented 4 years ago

I managed to put together a sample which reproduces the problem 100% of the time:

class ExternalEventLoss
{
    [FunctionName(nameof(ExternalEventLossOrchestrator))]
    public static async Task<int> ExternalEventLossOrchestrator(
        [OrchestrationTrigger] DurableOrchestrationContext context,
        ILogger log)
    {
        int receivedCount = 0;
        for (int i = 0; i < 10; i++)
        {
            using (var cts = new CancellationTokenSource())
            {
                var t1 = context.CreateTimer(context.CurrentUtcDateTime.AddSeconds(5), cts.Token);
                var t2 = context.WaitForExternalEvent("TriggerEvent");
                if (t2 == await Task.WhenAny(t1, t2))
                {
                    if (!context.IsReplaying) log.LogWarning($"Received trigger event {++receivedCount}");
                    await context.CreateTimer(context.CurrentUtcDateTime.AddSeconds(10), cts.Token);
                    cts.Cancel();
                }
                else
                {
                    if (!context.IsReplaying) log.LogWarning("Timeout");
                }

                if (receivedCount == 2)
                {
                    log.LogWarning($"Received both events!");
                    break;
                }
            }
        }

        log.LogWarning($"Done. Received events: {receivedCount}");
        return receivedCount;
    }

    [FunctionName(nameof(ExternalEventLossHttpTrigger))]
    public static async Task<HttpResponseMessage> ExternalEventLossHttpTrigger(
        [HttpTrigger] HttpRequestMessage request,
        [OrchestrationClient] DurableOrchestrationClient client,
        ILogger log)
    {
        string instanceId = await client.StartNewAsync(nameof(ExternalEventLossOrchestrator), null);
        log.LogWarning($"*** Started orchestration with ID = {instanceId}. Waiting 10 seconds to send two events...");

        // Wait for at least one loop to complete in the orchestration
        await Task.Delay(TimeSpan.FromSeconds(8));

        log.LogWarning($"*** Sending two events to orchestration with ID = {instanceId}.");
        await Task.WhenAll(
            client.RaiseEventAsync(instanceId, "TriggerEvent"),
            client.RaiseEventAsync(instanceId, "TriggerEvent"));

        return client.CreateCheckStatusResponse(request, instanceId);
    }
}

The trick is that the two events have to arrive after at least one "timeout" occurs. This is why my HTTP trigger waits 8 seconds before sending the events.

I think the fundamental problem here is that the cancellation code path abandons the original WaitForExternalEvent task. When the external events arrive, one of those events completes the "canceled" external event task. Since nobody is awaiting that task, no code executes as a result. This scenario is broken by design. The fact that it works one events arrive one at a time is actually an odd consequence of how we manage the underlying task completion sources. I would call this loop + external event + cancel pattern an anti-pattern.

@wynandjordaan The correct way to implement this is actually something like the following (note: I removed some of your unnecessary null-checking logic):

// This task must be reused
Task<TestTriggerModel> externalTriggerTask = null;

const string endConditionMessage = "DONE";
while (input?.Message != endConditionMessage)
{
    using (var cancellationTokenSource = new CancellationTokenSource())
    {
        var triggersToAwait = new List<Task>();

        if (externalTriggerTask == null || externalTriggerTask.IsCompleted)
        {
            // Only create a new external event task if the previous one completed
            externalTriggerTask = _context.WaitForExternalEvent<TestTriggerModel>(Constants.Workflow.Triggers.TriggerRaised);
        }

        Task timeOutTriggerTask = _context.CreateTimer(_context.CurrentUtcDateTime.AddMinutes(2), cancellationTokenSource.Token);

        var winner = await Task.WhenAny(externalTriggerTask, timeOutTriggerTask);
        cancellationTokenSource.Cancel();

        if (winner is Task<TestTriggerModel> externalTrigger)
        {
            input = externalTrigger.Result;
            _messages.Add($"{_context.CurrentUtcDateTime:yyyy-MM-dd HH:mm:ss}: Received external trigger with input: {input.Message}");
        }
        else if (winner == timeOutTriggerTask)
        {
            _messages.Add($"{_context.CurrentUtcDateTime:yyyy-MM-dd HH:mm:ss}: Time Out Trigger");
        }
        _context.SetCustomStatus(_messages);

        await ProcessTrigger(input);
    }
}

The difference in the new code is that we never abandon any externalTriggerTask objects - we keep the same one around until it completes.

At this point, I'm not sure if there is a way to actually change the behavior so that the existing code can work reliably, at least not without introducing a breaking change.

FYI @ConnorMcMahon in case you see similar issues in your investigations.

wynandjordaan commented 4 years ago

Wow man, I must say there are quite a few people with this problem then. I saw many examples of this exact pattern.

But thanks for the info on how to fix this problem. I will make the changes and let you know how it goes, but from what you have explained to me it all makes sense now.

Kind regards. I owe you a few beers :)

wynandjordaan commented 4 years ago

Hey all just an update.

We have made the changes as suggested to our processes. It took me a while to ensure that we are still getting some events dropped. However it does seem to work better on the most events. I must say that these are only once or twice a day, but it is definitely happening.

Here is one example from today: image

Does this make sense with the implementation we have now? Is this a side effect of not listening for the external event?

Thanks in advance.

wynandjordaan commented 4 years ago

This might be related to the Timeout event. I will look for another Example.

wynandjordaan commented 4 years ago

Hey guys just an update again.

It looks like our issues are solved now.

Thanks for all the help.