Azure / azure-functions-durable-extension

Durable Task Framework extension for Azure Functions
MIT License
713 stars 267 forks source link

[FunctionTimeoutException] Eternal OrchestrationTrigger timeout occurring | .NET 8 Isolated / Linux Flex Consumption Plan #2900

Open david-pw opened 1 month ago

david-pw commented 1 month ago

Description

We've been running an eternal orchestration pattern in Aus East for the last couple weeks. We noticed that occasionally, the orchestration would never reach or execute context.ContinueAsNew when we had the "functionTimeout": -1 value set.

To help diagnose the issue, we set "functionTimeout": "00:30:00" suspecting it would be a runaway activity, however I was surprised to see that the OrchestrationTrigger itself was timing out. Typically this orchestration will run for about 10-15 minutes, and execute eternally for 5-8+ hours before hanging like this.

We're unsure how to diagnose, or address this issue further.

Interestingly, when we restarted the Flex Consumption Plan when "functionTimeout": -1 the orchestration activity would continue as expected.

Expected behavior

I expect that the Orchestration Trigger shouldn't timeout, as this avoids any recovery logic we have to self heal the process.

Actual behavior

All activities run to completion, however when attempting to continue using context.ContinueAsNew the orchestration trigger times out.

Relevant source code snippets

[Function(nameof(Orchestrator_SyncLogDataToDataLake))]
public async Task Orchestrator_SyncLogDataToDataLake(
    [OrchestrationTrigger] TaskOrchestrationContext context)
{
  ILogger logger = context.CreateReplaySafeLogger(nameof(Orchestrator_SyncLogDataToDataLake));
  var lastRunInputUtc = context.GetInput<DateTime?>();
  var importLogId = -1;
  var runFailed = false;

  var instanceId = context.InstanceId;
  var startTimeUtc = context.CurrentUtcDateTime;

  try {
    importLogId = await context.CallActivityAsync<int>(nameof(SharedActivities.InternalActivity_CreateImportLogForRun),
    new SharedActivities.CreateImportLogForRunRequest($"{nameof(Orchestrator_SyncLogDataToDataLake)}_{instanceId}", lastRunUtc, startTimeUtc));

    // ***** SKIPPED CODE FOR BREVITY ***** 
    // ... Activites that create references to blob storage for the next phase ...
    // ***** SKIPPED CODE FOR BREVITY ***** 

    var exportActivities = new List<Task>();
    foreach (var batchReference in batchedSessionReferences)
    {
        exportActivities.Add(context.CallActivityAsync(nameof(SharedActivities.InternalActivity_ExportAppDataForSessions),
            new SharedActivities.ExportAppDataForSessionsRequest(importLogId, batchReference, lastRunUtc,
                startTimeUtc)));
    }

    foreach (var tenantBatch in batchedTenantReferences)
    {
        exportActivities.Add(context.CallActivityAsync(nameof(SharedActivities.InternalActivity_ExportApiDataForTenants),
            new SharedActivities.ExportApiDataForTenantsRequest(importLogId, tenantBatch, lastRunUtc, startTimeUtc)));
    }

    await Task.WhenAll(exportActivities);

    context.SetCustomStatus(new
    {
        instanceId,
        importLogId = importLogId,
        lastOperation = "Finished batch operation activities. Cleaning up now.",
        timeStartUtc = startTimeUtc,
        timeLookBackUtc = maxLookBackUtc,
        timeLastRunUtc = lastRunUtc,
        countNewSessions = newSessionsCount,
        countActiveTenants = activeTenantIds.Length,
        countSessionOperations = batchedSessionReferences.Length,
        countTenantOperations = batchedTenantReferences.Length,
        countExecutedActivities = exportActivities.Count
    });

    await context.CallActivityAsync(nameof(SharedActivities.InternalActivity_CleanupActivitiesForRun),
    new SharedActivities.UpdateImportLogForRunRequest(importLogId, true));

  } catch (Exception ex) {
    runFailed = true;
    try
    {
        await context.CallActivityAsync(nameof(SharedActivities.InternalActivity_CleanupActivitiesForRun),
            new SharedActivities.UpdateImportLogForRunRequest(importLogId, false,
                $"{ex.Message}. {ex.InnerException?.Message}".Trim()));
    }
    catch (Exception ex2)
    {
        logger.LogCritical(ex2,
            "{instanceId} :: {importId} failed to close import session. Restarting orchestration.",
            instanceId, importLogId);
    }
  } finally {
    // Restart the process with the start time of this run. If faulted, pass the input in again.
    context.ContinueAsNew(runFailed ? lastRunInputUtc : startTimeUtc, false);
  }
}

host.json

{
  "version": "2.0",
  "functionTimeout": "00:30:00",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request;Exception;Trace"
      },
      "enableLiveMetricsFilters": true
    }
  },
  "extensions": {
    "durableTask": {
      "hubName": "DataIntegrationHub"
    }
  }
}

Known workarounds

Setting "functionTimeout": -1 and restarting the Flex Consumption Plan app when we detect that the process has frozen, not really a work around.

App Details

Note: Ranges added as we update day to day.

Screenshots

Instance History: Unsure if related, however I have consistently seen the sentinel RowKey right before the OrchestratorCompleted event when the process has frozen like this... image

If deployed to Azure

We have access to a lot of telemetry that can help with investigations. Please provide as much of the following information as you can to help us investigate!

If you don't want to share your Function App or storage account name GitHub, please at least share the orchestration instance ID. Otherwise it's extremely difficult to look up information.

david-pw commented 3 weeks ago

For anyone that comes across this issue - as an interim counter measure we've setup a timer trigger that monitors the status of the orchestration. This has been configured to run every 15 minutes, which is half the host function timeout time "functionTimeout": "00:30:00"

If the orchestration instance is in a non-running state, it'll simply restart the orchestration for us. We also had to switch the orchestration pattern to a singleton (fixed instance ID) to ensure that no more than one orchestration process was running at any given time. It's been running for a day now, and has auto recovered twice.

Timer Function

        [Function(nameof(Monitor_Orchestrator_SyncLogDataToDataLake))]
        public async Task Monitor_Orchestrator_SyncLogDataToDataLake(
            [TimerTrigger("0 */15 * * * *")] TimerInfo timerInfo, [DurableClient] DurableTaskClient client,
            FunctionContext executionContext)
        {
            var logger = executionContext.GetLogger(nameof(Monitor_Orchestrator_SyncLogDataToDataLake));
            var lastRunUtcIfNew = DateTime.UtcNow.AddHours(-6);
            await OrchestrationUtilities.ScheduleOrchestratorIfNotRunning(client,
                nameof(Orchestrator_SyncLogDataToDataLake), SingletonInstanceId, lastRunUtcIfNew, logger);
        }

Helper

    public static async Task<string> ScheduleOrchestratorIfNotRunning(DurableTaskClient client,
        TaskName orchestratorName, string instanceId, object? input = null, ILogger? logger = null,
        CancellationToken cancellationToken = default)
    {
        var existingInstance = await client.GetInstanceAsync(instanceId, cancellation: cancellationToken);
        if (existingInstance is not
            {
                RuntimeStatus: not OrchestrationRuntimeStatus.Completed 
                and not OrchestrationRuntimeStatus.Failed
                and not OrchestrationRuntimeStatus.Terminated
            })
        {
            // An instance with the specified ID doesn't exist or an existing one stopped running, create one.
            instanceId = await client.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input,
                new StartOrchestrationOptions(instanceId), cancellationToken);

            logger?.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);
        }
        else
        {
            instanceId = existingInstance.InstanceId;
            logger?.LogInformation("Orchestration '{instance}' status {status}.", instanceId,
                existingInstance.RuntimeStatus.ToString());
        }

        return instanceId;
    }