Eventuous / eventuous

Event Sourcing library for .NET
https://eventuous.dev
Apache License 2.0
447 stars 71 forks source link

OptimisticConcurrencyException is not recognized #373

Closed wpavelev closed 4 days ago

wpavelev commented 3 weeks ago

Hi there!

I am having problems using the SQL Server EventStore. By inserting a heavy load of events I am getting a SQL Exception with error number 50000 instead of the OptimisticConcurrencyException that I excepted.

I found out that StoreFunctions.cs checks the SQL exception for a specific message: "WrongExpectedVersion".

https://github.com/Eventuous/eventuous/blob/e5a84610a713ac43fe02f159a48030cf3fa41faa/src/Core/src/Eventuous.Persistence/EventStore/StoreFunctions.cs#L42

The message should be created in the stored procedure of the SQL-EventStore (but fails): https://github.com/Eventuous/eventuous/blob/e5a84610a713ac43fe02f159a48030cf3fa41faa/src/SqlServer/src/Eventuous.SqlServer/Scripts/3_CheckStream.sql#L27

This line does not seem to work correctly. To pass a custom message, Microsoft suggests using the FORMATMESSAGE method: https://learn.microsoft.com/en-us/sql/t-sql/language-elements/throw-transact-sql?view=sql-server-ver16#c-use-formatmessage-with-throw

There is also a concurrency error that occurs when two events are written at the same time. We fixed it by overriding the stored procedures of append event and check stream.

append_event:

BEGIN
    DECLARE @customErrorMessage NVARCHAR(200)

    SELECT @current_version = [Version], @stream_id =StreamId
        FROM eventuous.Streams 
        WHERE StreamName = @stream_name

    IF @stream_id is null
        BEGIN
        IF @expected_version = -2 -- Any
        OR @expected_version = -1 -- NoStream
            BEGIN
                BEGIN TRY
                    INSERT INTO eventuous.Streams (StreamName, Version) VALUES (@stream_name, -1);
                    SELECT @current_version = Version, @stream_id = StreamId
                        FROM eventuous.Streams
                        WHERE StreamName = @stream_name
                END TRY
                BEGIN CATCH             
                    IF (ERROR_NUMBER() = 2627 OR ERROR_NUMBER() = 2601) AND (SELECT CHARINDEX(N'UQ_StreamName', ERROR_MESSAGE())) > 0
                        BEGIN
                            SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion %i, stream already created', @expected_version);
                            THROW 50000, @customErrorMessage, 1; 
                        END
                    ELSE
                        THROW
                END CATCH
            END
        ELSE
            THROW 50001, N'StreamNotFound', 1;
        END
    ELSE IF @expected_version != -2 and @expected_version != @current_version
        BEGIN
            SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion %i, current version %i', @expected_version, @current_version);
            THROW 50000, @customErrorMessage, 1;
        END
END

... and check_stream:

BEGIN
    DECLARE @customErrorMessage NVARCHAR(200)

    SELECT @current_version = [Version], @stream_id =StreamId
        FROM eventuous.Streams 
        WHERE StreamName = @stream_name

    IF @stream_id is null
        BEGIN
        IF @expected_version = -2 -- Any
        OR @expected_version = -1 -- NoStream
            BEGIN
                BEGIN TRY
                    INSERT INTO eventuous.Streams (StreamName, Version) VALUES (@stream_name, -1);
                    SELECT @current_version = Version, @stream_id = StreamId
                        FROM eventuous.Streams
                        WHERE StreamName = @stream_name
                END TRY
                BEGIN CATCH             
                    IF (ERROR_NUMBER() = 2627 OR ERROR_NUMBER() = 2601) AND (SELECT CHARINDEX(N'UQ_StreamName', ERROR_MESSAGE())) > 0
                        BEGIN
                            SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion %i, stream already created', @expected_version);
                            THROW 50000, @customErrorMessage, 1; 
                        END
                    ELSE
                        THROW
                END CATCH
            END
        ELSE
            THROW 50001, N'StreamNotFound', 1;
        END
    ELSE IF @expected_version != -2 and @expected_version != @current_version
        BEGIN
            SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion %i, current version %i', @expected_version, @current_version);
            THROW 50000, @customErrorMessage, 1;
        END
END

best regards

linear[bot] commented 3 weeks ago

EVE-187 OptimisticConcurrencyException is not recognized

alexeyzimarev commented 5 days ago

I think you posted the code for check_stream twice.

alexeyzimarev commented 5 days ago

I believe it works with only changing check_stream. At least the test produces that exception, and I didn't change append_events.

alexeyzimarev commented 4 days ago

@wpavelev if you can clarify if the issue is fixed by my PR and what changes did you make in append events procedure, it would be great. I plan to merge it tomorrow.

wpavelev commented 4 days ago

Hi @alexeyzimarev,

For the append_event we added similar logic to check if the event was successfully written. This is just for the rare case where two processes have passed the event check and want to write at the same time.

Here is the sql code, that we implemented:

BEGIN
    DECLARE @current_version INT,
        @stream_id INT,
        @position BIGINT,
        @customErrorMessage NVARCHAR(200),
        @newMessagesCount INT,
        @expected_StreamVersionAfterUpdate INT,
        @actual_StreamVersionAfterUpdate INT

    SELECT @newMessagesCount = COUNT(*) FROM @messages

    if @created is null
        BEGIN
            SET @created = SYSUTCDATETIME()
        END

    EXEC [eventuous].[check_stream] @stream_name, @expected_version, @current_version = @current_version OUTPUT, @stream_id = @stream_id OUTPUT

    BEGIN TRY
        INSERT INTO eventuous.Messages (MessageId, MessageType, StreamId, StreamPosition, JsonData, JsonMetadata, Created)
        SELECT message_id, message_type, @stream_id, @current_version + (ROW_NUMBER() OVER(ORDER BY (SELECT NULL))), json_data, json_metadata, @created
        FROM @messages
    END TRY
    BEGIN CATCH             
        IF (ERROR_NUMBER() = 2627 OR ERROR_NUMBER() = 2601) AND (SELECT CHARINDEX(N'UQ_StreamIdAndStreamPosition', ERROR_MESSAGE())) > 0
            BEGIN
                DECLARE @streamIdFromError nvarchar(20) = SUBSTRING(ERROR_MESSAGE(), PATINDEX(N'%[0-9]%,%', ERROR_MESSAGE()), PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE()) - PATINDEX(N'%[0-9]%,%', ERROR_MESSAGE()))
                DECLARE @streamPositionFromError nvarchar(20) = SUBSTRING(ERROR_MESSAGE(), (PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE())) + 2, PATINDEX(N'%).', ERROR_MESSAGE()) - (PATINDEX(N'%, [0-9]%).', ERROR_MESSAGE()) + 2))

                -- TODO: There are multiple causes of OptimisticConcurrencyExceptions, but current client code is hard-coded to check for 'WrongExpectedVersion' in message and 50000 as error number.
                SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion, another message has already been written at stream position %s on stream %s.', @streamIdFromError, @streamPositionFromError);
                THROW 50000, @customErrorMessage, 1;
            END
        ELSE
            THROW
    END CATCH

    SELECT TOP 1 @current_version = StreamPosition, @position = GlobalPosition
    FROM eventuous.Messages
    WHERE StreamId = @stream_id
    ORDER BY GlobalPosition DESC

    UPDATE eventuous.Streams SET [Version] = @current_version WHERE StreamId = @stream_id AND [Version] = @expected_version

    IF @@ROWCOUNT = 0
    BEGIN
        --  No row with StreamId = @stream_id AND [Version] = @expected_version
        SELECT @customErrorMessage = FORMATMESSAGE(N'WrongExpectedVersion %i for stream %i.', @expected_version, @stream_id);
        THROW 50000, @customErrorMessage, 1;
    END

Sorry for the wrong code at the beginning.

Thank you very much!

alexeyzimarev commented 4 days ago

Ok, I am adding those changes to AppendEvents. Btw, I see the TODO about distinguishing reasons. Maybe, you can elaborate on other reasons and we can make it work. For example, we are not limited to throwing error 50000.

wpavelev commented 4 days ago

That sounds great! Right now we are good to go with an 'universal' concurrency exception. As soon as we see a suitable use case, I'll be glad to come back to you. Thanks again!