temporalio / sdk-core

Core Temporal SDK that can be used as a base for language specific Temporal SDKs
MIT License
266 stars 70 forks source link

[Bug] Local activity combined with cancellation of something that doesn't wait can get out of order on replay #803

Closed cretz closed 1 week ago

cretz commented 3 weeks ago

Describe the bug

Given a workflow that does the following:

The activation on non-replay after that last part will be:

But during replay it is:

Any pieces that rely on this ordering therefore will have a non-determinism error. Here is a workflow in .NET that demonstrates the issue:

[Workflow]
public class AttemptWorkflow
{
    [Activity]
    public static Task WaitAsync(int waitMs) =>
        Task.Delay(waitMs, ActivityExecutionContext.Current.CancellationToken);

    [WorkflowRun]
    public async Task RunAsync()
    {
        // Start a long activity that does not wait for cancel
        using var cts = CancellationTokenSource.CreateLinkedTokenSource(Workflow.CancellationToken);
        var activityTask = Workflow.RunTaskAsync(() => Workflow.ExecuteActivityAsync(
            () => WaitAsync(50000),
            new()
            {
                StartToCloseTimeout = TimeSpan.FromHours(1),
                CancellationToken = cts.Token,
            }));
        // Roll over the task
        await Workflow.DelayAsync(1);

        // Run local activity and cancel the activity
        var localActivityTask = Workflow.RunTaskAsync(() => Workflow.ExecuteLocalActivityAsync(
            () => WaitAsync(459),
            new() { StartToCloseTimeout = TimeSpan.FromHours(1) }));
        await Workflow.RunTaskAsync(async () => cts.Cancel());

        // Wait on both
        await Workflow.WhenAnyAsync(activityTask, localActivityTask);
        // Only if local activity status is done will we start a timer
        Console.WriteLine("ACT DONE: {0}, LOCAL ACT DONE: {1}", activityTask.Status, localActivityTask.Status);
        if (localActivityTask.Status == TaskStatus.RanToCompletion)
        {
            await Workflow.DelayAsync(1);
        }
    }
}

Relevant portion of non-replay log:

[12:44:24] trce: Temporalio.Worker.WorkflowWorker[0]
      Received workflow activation: { "runId": "07207c7b-8e3b-4686-8eb5-7422b6499694", "timestamp": "2024-08-23T17:44:24.324682100Z", "historyLength": 9, "jobs": [ { "fireTimer": { "seq": 1 } } ], "historySizeBytes": "916", "buildIdForCurrentTask": "f7b88e8d-444e-4e8e-b8fd-5b651fea24f8" }
[12:44:24] trce: Temporalio.Worker.WorkflowWorker[0]
      Sending workflow completion: { "runId": "07207c7b-8e3b-4686-8eb5-7422b6499694", "successful": { "commands": [ { "scheduleLocalActivity": { "seq": 2, "activityId": "2", "activityType": "Wait", "arguments": [ { "metadata": { "encoding": "anNvbi9wbGFpbg==" }, "data": "NDU5" } ], "startToCloseTimeout": "3600s" } }, { "requestCancelActivity": { "seq": 1 } } ] } }
[12:44:24] trce: Temporalio.Worker.WorkflowWorker[0]
      Received workflow activation: { "runId": "07207c7b-8e3b-4686-8eb5-7422b6499694", "timestamp": "2024-08-23T17:44:24.805477100Z", "historyLength": 9, "jobs": [ { "resolveActivity": { "seq": 1, "result": { "cancelled": { "failure": { "message": "Activity cancelled", "cause": { "canceledFailureInfo": { } }, "activityFailureInfo": { "activityType": { "name": "Wait" }, "activityId": "1", "retryState": "RETRY_STATE_CANCEL_REQUESTED" } } } } } }, { "resolveActivity": { "seq": 2, "result": { "completed": { "result": { "metadata": { "encoding": "YmluYXJ5L251bGw=" } } } } } } ], "historySizeBytes": "916", "buildIdForCurrentTask": "f7b88e8d-444e-4e8e-b8fd-5b651fea24f8" }
ACT DONE: Faulted, LOCAL ACT DONE: WaitingForActivation

Relevant portion of replay log:

[12:44:24] trce: Temporalio.Worker.WorkflowWorker[0]
      Received workflow activation: { "runId": "07207c7b-8e3b-4686-8eb5-7422b6499694", "timestamp": "2024-08-23T17:44:24.324682100Z", "isReplaying": true, "historyLength": 9, "jobs": [ { "fireTimer": { "seq": 1 } } ], "historySizeBytes": "916", "buildIdForCurrentTask": "f7b88e8d-444e-4e8e-b8fd-5b651fea24f8" }
[12:44:24] trce: Temporalio.Worker.WorkflowWorker[0]
      Sending workflow completion: { "runId": "07207c7b-8e3b-4686-8eb5-7422b6499694", "successful": { "commands": [ { "scheduleLocalActivity": { "seq": 2, "activityId": "2", "activityType": "Wait", "arguments": [ { "metadata": { "encoding": "anNvbi9wbGFpbg==" }, "data": "NDU5" } ], "startToCloseTimeout": "3600s" } }, { "requestCancelActivity": { "seq": 1 } } ] } }
[12:44:24] trce: Temporalio.Worker.WorkflowWorker[0]
      Received workflow activation: { "runId": "07207c7b-8e3b-4686-8eb5-7422b6499694", "timestamp": "2024-08-23T17:44:24.805477100Z", "isReplaying": true, "historyLength": 9, "jobs": [ { "resolveActivity": { "seq": 2, "result": { "completed": { "result": { "metadata": { "encoding": "YmluYXJ5L251bGw=" } } } } } }, { "resolveActivity": { "seq": 1, "result": { "cancelled": { "failure": { "message": "Activity cancelled", "cause": { "canceledFailureInfo": { } }, "activityFailureInfo": { "activityType": { "name": "Wait" }, "activityId": "1", "retryState": "RETRY_STATE_CANCEL_REQUESTED" } } } } } } ], "historySizeBytes": "916", "buildIdForCurrentTask": "f7b88e8d-444e-4e8e-b8fd-5b651fea24f8" }
ACT DONE: WaitingForActivation, LOCAL ACT DONE: RanToCompletion