microsoft / durabletask-mssql

Microsoft SQL storage provider for Durable Functions and the Durable Task Framework
MIT License
87 stars 31 forks source link

NullReferenceException when calling an entity from a sub-orchestration after signaling #146

Closed pgconreaux closed 1 year ago

pgconreaux commented 1 year ago

Using a variation of the "Orchestration first signals, then calls entity" example (shown here), I had moved the entity call to a sub-orchestration. While executing, the sub-orchestration aborts with the following error message:

[2023-01-26T16:31:19.001Z] @counter@myCounter: Orchestration execution was aborted: Session aborted because of NullReferenceException

The original example works without issue. The issue only appears when the call is moved into a sub-orchestration. The issue appears when testing locally using either SQLEXPRESS or Azure SQL.

I have created a simple test case project here: https://github.com/pgconreaux/DurableEntityNRE A full trace log from execution is here: https://github.com/pgconreaux/DurableEntityNRE/blob/main/tracelog.txt Exception line in log: https://github.com/pgconreaux/DurableEntityNRE/blob/main/tracelog.txt#L361

pgconreaux commented 1 year ago

Another observation, I can also hit the issue if I create a critical section by locking the entity, but not performing any action, and then call the entity from a sub-orchestration. However, I've only hit this intermittently against an Azure SQL DB.

Example. Replacing entity signal with entity lock causes issue intermittently with Azure SQL

//context.SignalEntity(entityId, "Add", 1);
using (await context.LockAsync(entityId))
{
    /// NO-OP
}
pgconreaux commented 1 year ago

I have not been able to reproduce the error while running the function app in Azure. Appears to only occur while running locally with Azure Functions Core Tools. As previously stated, I can get the error with a connection to either a local database (SQLEXPRESS) or cloud database (Azure SQL). Here are the contents of my local.settings.json with a SQLEXPRESS connection:

{
  "IsEncrypted": false,
  "Values": {
    "SQLDB_Connection": "Server=localhost\\SQLEXPRESS;Database=DurableDB;Trusted_Connection=True;",
    "AzureWebJobsSecretStorageType": "Files",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet"
  }
}
cgillum commented 1 year ago

Thanks @pgconreaux for providing these details. I'm taking a look at this now.

cgillum commented 1 year ago

@sebastianburckhardt I was able to reproduce the problem, and the null-ref exception is happening here in the Durable Extension code. Specifically, the requestMessage variable is null (because eventRaisedEvent.Input is null). Any ideas about why?

cgillum commented 1 year ago

I'm wondering if this problem reproduces on Azure Storage or if it's specific to the MSSQL backend. I'm thinking it could easily be a problem specific to MSSQL since it's easy enough to reproduce that we'd have seen it by now if it were reproducible in Azure Storage. There are also some quirks about how sub-orchestrations are implemented in the MSSQL backend that could result in some unexpected behavior.

sebastianburckhardt commented 1 year ago

Based on the description, it appears to me that somehow the content of the event is always lost when a suborchestration sends an event to an entity instance.

There are also some quirks about how sub-orchestrations are implemented in the MSSQL backend that could result in some unexpected behavior.

Could that explain why the event content is lost? The event that needs to be transmitted is passed to the backend when the workitem for the suborchestration has completed and calls IOrchestrationService.CompleteTaskOrchestrationWorkItemAsync, in the orchestratorMessages list.

(I did a quick check, this does not seem to happen on the Azure Storage or Netherite providers)

cgillum commented 1 year ago

I spent some time debugging this and it appears to be some kind of race condition. I wrote an integration test to run this exact scenario and it fails as described when run normally but succeeds when I execute in a debugger with breakpoints. :(

Because the behavior is non-deterministic, I suspect it could be related to the fact that, in this example, the Add and Get operations are sent to the entity in a non-deterministic order: the parent orchestration sends a Add operation, and in the same batch, schedules a sub-orchestration that eventually sends a Get without first waiting to see if the Add succeeded.

If I change the parent orchestration to use CallEntityAsync instead of SignalEntity, then the example succeeds reliably, which also suggests the main issue is a race. I haven't yet figured out why this kind of race condition would effectively result in data loss on the Get operation, though. I'll need to spend more time on this.

cgillum commented 1 year ago

I figured it out after writing the above description. The implementation for ContinueAsNew in the SQL provider was overaggressive in how it cleaned up state and was purging data for external events that were still waiting to be processed. I was able to fix the stored procedure locally to resolve this.

@pgconreaux if possible and practical, can you try running the following SQL script against your database to see if it resolves the issue for you? The script assumes you're using the default dt schema name (be sure to back up your database just in case if it has important data in it):

ALTER PROCEDURE [dt].[_CheckpointOrchestration]
    @InstanceID varchar(100),
    @ExecutionID varchar(50),
    @RuntimeStatus varchar(30),
    @CustomStatusPayload varchar(MAX),
    @DeletedEvents MessageIDs READONLY,
    @NewHistoryEvents HistoryEvents READONLY,
    @NewOrchestrationEvents OrchestrationEvents READONLY,
    @NewTaskEvents TaskEvents READONLY
AS
BEGIN
    BEGIN TRANSACTION

    DECLARE @TaskHub varchar(50) = dt.CurrentTaskHub()

    DECLARE @InputPayloadID uniqueidentifier
    DECLARE @CustomStatusPayloadID uniqueidentifier
    DECLARE @ExistingOutputPayloadID uniqueidentifier
    DECLARE @ExistingCustomStatusPayload varchar(MAX)
    DECLARE @ExistingExecutionID varchar(50)

    -- Check for an existing custom status payload for this instance. If one exists, compare it
    -- to the new one to know if we need to update the existing entry or not.
    -- At the same time, grab the execution ID so we can learn if this is a ContinueAsNew.
    SELECT TOP 1 
        @InputPayloadID = I.[InputPayloadID],
        @CustomStatusPayloadID = I.[CustomStatusPayloadID],
        @ExistingOutputPayloadID = I.[OutputPayloadID],
        @ExistingCustomStatusPayload = P.[Text],
        @ExistingExecutionID = I.[ExecutionID]
    FROM Payloads P RIGHT OUTER JOIN Instances I ON
        P.[TaskHub] = @TaskHub AND
        P.[InstanceID] = I.[InstanceID] AND
        P.[PayloadID] = I.[CustomStatusPayloadID]
    WHERE I.[TaskHub] = @TaskHub AND I.[InstanceID] = @InstanceID

    -- ContinueAsNew case: delete all existing runtime state (history and payloads), but be careful
    -- not to delete payloads of unprocessed state, like new events.
    DECLARE @IsContinueAsNew BIT = 0
    IF @ExistingExecutionID IS NOT NULL AND @ExistingExecutionID <> @ExecutionID
    BEGIN
        DECLARE @PayloadIDsToDelete TABLE ([PayloadID] uniqueidentifier NULL)
        INSERT INTO @PayloadIDsToDelete
        VALUES (@InputPayloadID), (@CustomStatusPayloadID), (@ExistingOutputPayloadID)

        DELETE FROM History
        OUTPUT DELETED.[DataPayloadID] INTO @PayloadIDsToDelete
        WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID

        DELETE FROM Payloads
        WHERE
            [TaskHub] = @TaskHub AND
            [InstanceID] = @InstanceID AND
            [PayloadID] IN (SELECT [PayloadID] FROM @PayloadIDsToDelete)

        -- The existing payload got purged in the previous statement 
        SET @ExistingCustomStatusPayload = NULL
        SET @IsContinueAsNew = 1
    END

    -- Custom status case #1: Setting the custom status for the first time
    IF @ExistingCustomStatusPayload IS NULL AND @CustomStatusPayload IS NOT NULL
    BEGIN
        SET @CustomStatusPayloadID = NEWID()
        INSERT INTO Payloads ([TaskHub], [InstanceID], [PayloadID], [Text])
        VALUES (@TaskHub, @InstanceID, @CustomStatusPayloadID, @CustomStatusPayload)
    END

    -- Custom status case #2: Updating an existing custom status payload
    IF @ExistingCustomStatusPayload IS NOT NULL AND @ExistingCustomStatusPayload <> @CustomStatusPayload
    BEGIN
        UPDATE Payloads SET [Text] = @CustomStatusPayload WHERE 
            [TaskHub] = @TaskHub AND
            [InstanceID] = @InstanceID AND
            [PayloadID] = @CustomStatusPayloadID
    END

    -- Need to update the input payload ID if this is a ContinueAsNew
    IF @IsContinueAsNew = 1
    BEGIN
        SET @InputPayloadID = (
            SELECT TOP 1 [PayloadID]
            FROM @NewHistoryEvents
            WHERE [EventType] = 'ExecutionStarted'
            ORDER BY [SequenceNumber] DESC
        )
    END

    DECLARE @IsCompleted bit
    SET @IsCompleted = (CASE WHEN @RuntimeStatus IN ('Completed', 'Failed', 'Terminated') THEN 1 ELSE 0 END)

    -- The output payload will only exist when the orchestration has completed.
    -- Fetch its payload ID now so that we can update it in the Instances table further down.
    DECLARE @OutputPayloadID uniqueidentifier
    IF @IsCompleted = 1
    BEGIN
        SET @OutputPayloadID = (
            SELECT TOP 1 [PayloadID]
            FROM @NewHistoryEvents
            WHERE [EventType] = 'ExecutionCompleted' OR [EventType] = 'ExecutionTerminated'
            ORDER BY [SequenceNumber] DESC
        )
    END

    -- Insert data payloads into the Payloads table as a single statement.
    -- The [PayloadText] value will be NULL if there is no payload or if a payload is already known to exist in the DB.
    -- The [PayloadID] value might be set even if [PayloadText] and [Reason] are both NULL.
    -- This needs to be done before the UPDATE to Instances because the Instances table needs to reference the output payload.
    INSERT INTO Payloads ([TaskHub], [InstanceID], [PayloadID], [Text], [Reason])
        SELECT @TaskHub, [InstanceID], [PayloadID], [PayloadText], [Reason]
        FROM @NewHistoryEvents
        WHERE [PayloadText] IS NOT NULL OR [Reason] IS NOT NULL

    UPDATE Instances
    SET
        [ExecutionID] = @ExecutionID,
        [RuntimeStatus] = @RuntimeStatus,
        [LastUpdatedTime] = SYSUTCDATETIME(),
        [CompletedTime] = (CASE WHEN @IsCompleted = 1 THEN SYSUTCDATETIME() ELSE NULL END),
        [LockExpiration] = NULL, -- release the lock
        [CustomStatusPayloadID] = @CustomStatusPayloadID,
        [InputPayloadID] = @InputPayloadID,
        [OutputPayloadID] = @OutputPayloadID
    FROM Instances
    WHERE [TaskHub] = @TaskHub and [InstanceID] = @InstanceID

    IF @@ROWCOUNT = 0
        THROW 50000, 'The instance does not exist.', 1;

    -- External event messages can create new instances
    -- NOTE: There is a chance this could result in deadlocks if two 
    --       instances are sending events to each other at the same time
    INSERT INTO Instances (
        [TaskHub],
        [InstanceID],
        [ExecutionID],
        [Name],
        [Version],
        [RuntimeStatus])
    SELECT DISTINCT
        @TaskHub,
        E.[InstanceID],
        NEWID(),
        SUBSTRING(E.[InstanceID], 2, CHARINDEX('@', E.[InstanceID], 2) - 2),
        '',
        'Pending'
    FROM @NewOrchestrationEvents E
    WHERE LEFT(E.[InstanceID], 1) = '@'
        AND CHARINDEX('@', E.[InstanceID], 2) > 0
        AND NOT EXISTS (
            SELECT 1
            FROM Instances I
            WHERE [TaskHub] = @TaskHub AND I.[InstanceID] = E.[InstanceID])
    GROUP BY E.[InstanceID]
    ORDER BY E.[InstanceID] ASC

    -- Create sub-orchestration instances
    INSERT INTO Instances (
        [TaskHub],
        [InstanceID],
        [ExecutionID],
        [Name],
        [Version],
        [ParentInstanceID],
        [RuntimeStatus])
    SELECT DISTINCT
        @TaskHub,
        E.[InstanceID],
        E.[ExecutionID],
        E.[Name],
        E.[Version],
        E.[ParentInstanceID],
        'Pending'
    FROM @NewOrchestrationEvents E
    WHERE E.[EventType] IN ('ExecutionStarted')
        AND NOT EXISTS (
            SELECT 1
            FROM Instances I
            WHERE [TaskHub] = @TaskHub AND I.[InstanceID] = E.[InstanceID])
    ORDER BY E.[InstanceID] ASC

    -- Insert new event data payloads into the Payloads table in batches.
    -- PayloadID values are provided by the caller only if a payload exists.
    INSERT INTO Payloads ([TaskHub], [InstanceID], [PayloadID], [Text], [Reason])
        SELECT @TaskHub, [InstanceID], [PayloadID], [PayloadText], [Reason]
        FROM @NewOrchestrationEvents
        WHERE [PayloadID] IS NOT NULL

    INSERT INTO Payloads ([TaskHub], [InstanceID], [PayloadID], [Text])
        SELECT @TaskHub, [InstanceID], [PayloadID], [PayloadText]
        FROM @NewTaskEvents
        WHERE [PayloadID] IS NOT NULL

    -- Insert the new events with references to their payloads, if applicable
    INSERT INTO NewEvents (
        [TaskHub],
        [InstanceID],
        [ExecutionID],
        [EventType],
        [Name],
        [RuntimeStatus],
        [VisibleTime],
        [TaskID],
        [PayloadID]
    ) 
    SELECT 
        @TaskHub,
        [InstanceID],
        [ExecutionID],
        [EventType],
        [Name],
        [RuntimeStatus],
        [VisibleTime],
        [TaskID],
        [PayloadID]
    FROM @NewOrchestrationEvents

    -- We return the list of deleted messages so that the caller can issue a 
    -- warning about missing messages
    DELETE E
    OUTPUT DELETED.InstanceID, DELETED.SequenceNumber
    FROM NewEvents E WITH (FORCESEEK(PK_NewEvents(TaskHub, InstanceID, SequenceNumber)))
        INNER JOIN @DeletedEvents D ON 
            D.InstanceID = E.InstanceID AND
            D.SequenceNumber = E.SequenceNumber AND
            E.TaskHub = @TaskHub

    -- IMPORTANT: This insert is expected to fail with a primary key constraint violation in a
    --            split-brain situation where two instances try to execute the same orchestration
    --            at the same time. The SDK will check for this exact error condition.
    INSERT INTO History (
        [TaskHub],
        [InstanceID],
        [ExecutionID],
        [SequenceNumber],
        [EventType],
        [TaskID],
        [Timestamp],
        [IsPlayed],
        [Name],
        [RuntimeStatus],
        [VisibleTime],
        [DataPayloadID])
    SELECT
        @TaskHub,
        H.[InstanceID],
        H.[ExecutionID],
        H.[SequenceNumber],
        H.[EventType],
        H.[TaskID],
        H.[Timestamp],
        H.[IsPlayed],
        H.[Name],
        H.[RuntimeStatus],
        H.[VisibleTime],
        H.[PayloadID]
    FROM @NewHistoryEvents H

    -- TaskScheduled events
    INSERT INTO NewTasks (
        [TaskHub],
        [InstanceID],
        [ExecutionID],
        [Name],
        [TaskID],
        [VisibleTime],
        [LockedBy],
        [LockExpiration],
        [PayloadID],
        [Version]
    )
    OUTPUT
        INSERTED.[SequenceNumber],
        INSERTED.[TaskID]
    SELECT 
        @TaskHub,
        [InstanceID],
        [ExecutionID],
        [Name],
        [TaskID],
        [VisibleTime],
        [LockedBy],
        [LockExpiration],
        [PayloadID],
        [Version]
    FROM @NewTaskEvents

    COMMIT TRANSACTION
END
pgconreaux commented 1 year ago

@cgillum I have confirmed that the exception no longer occurs after running the provided SQL script.