rebus-org / Rebus

:bus: Simple and lean service bus implementation for .NET
https://mookid.dk/category/rebus
Other
2.26k stars 353 forks source link

SagaData is not blocked when changing in parallel #1129

Closed AfanasievAndrew closed 6 months ago

AfanasievAndrew commented 6 months ago

If you run two hosts in parallel and send two messages for processing for one saga, then the saga data can be overwritten, judging by the behavior, the saga data is read at the time of entering the handler and there is no lock when changing.

My example:

public class SampleSaga : Saga<SampleSagaData>,
        IAmInitiatedBy<StartCommand>,
        IHandleMessages<SampleCommand>
    {
        private readonly ILogger<SampleScheduleSaga> _logger;

        public SampleSaga(ILogger<SampleScheduleSaga> logger)
        {
            _logger = logger;
        }

        protected override void CorrelateMessages(ICorrelationConfig<SampleScheduleSagaData> config)
        {
            config.Correlate<StartCommand>(m => m.Id, d => d.MessageId);
            config.Correlate<SampleCommand>(m => m.Id, d => d.MessageId);
        }

        public async Task Handle(StartCommand message)
        {
            _logger.LogInformation($"Handle StartCommand '{message.Id}'");

            Data.MessageId = message.Id;
            Data.InProcess = false;

            _logger.LogInformation($"End StartCommand '{message.Id}'");
        }

        public async Task Handle(SampleCommand message)
        {
            _logger.LogInformation($"1. Handle SampleCommand '{message.Id}' WaitTimeout '{message.WaitTimeout}'. Data.MessageId = {Data.MessageId}; Data.InProcess = {Data.InProcess}");

            await Task.Delay(message.WaitTimeout);

            if (!Data.InProcess)
            {
                _logger.LogInformation($"2. in if (!Data.InProcess). Data.InProcess = {Data.InProcess}");

                Data.InProcess = true;

                _logger.LogInformation($"3. Data.InProcess = {Data.InProcess}");
            }

            _logger.LogInformation($"4. End SampleCommand. Data.InProcess = {Data.InProcess}");
        }
    }

public class SampleSagaData : ISagaData
        {
            public Guid Id { get; set; }
            public int MessageId { get; set; }
            public int Revision { get; set; }
            public bool InProcess { get; set; }
        }

Saga storage SqlServer (StoreInSqlServer) and transport RabbitMq (UseRabbitMq).

Sending messages:

bus.SendAsync(new SampleCommand { Id = 77, WaitTimeout = TimeSpan.FromSeconds(1) }),
bus.SendAsync(new SampleCommand { Id = 77, WaitTimeout = TimeSpan.FromSeconds(10) }) 

And the logs of the hosts:

2023-12-21 20:04:43.939 [INF] 1. Handle SampleCommand '77' WaitTimeout '00:00:01'. Data.MessageId = 77; Data.InProcess = False
2023-12-21 20:04:45.117 [INF] 2. in if (!Data.InProcess). Data.InProcess = False
2023-12-21 20:04:46.122 [INF] 3. Data.InProcess = True
2023-12-21 20:04:46.127 [INF] 4. End SampleCommand. Data.InProcess = True
2023-12-21 20:04:44.037 [INF] 1. Handle SampleCommand '77' WaitTimeout '00:00:10'. Data.MessageId = 77; Data.InProcess = False
2023-12-21 20:04:54.054 [INF] 2. in if (!Data.InProcess). Data.InProcess = False
2023-12-21 20:04:55.073 [INF] 3. Data.InProcess = True
2023-12-21 20:04:55.075 [INF] 4. End SampleCommand. Data.InProcess = True

Rebus 8.0.2 Rebus.RebbitMq 9.0.1 Rebus.SqlServer 8.0.2

It looks like the second host should be set to 'true' at the time of reading the saga data, or in that case there should be sequential message processing.

mookid8000 commented 6 months ago

Rebus uses optimistic concurrency control around saga updates, so the two messages will seemingly be processed in parallel, but only the first one's update will go through ā€“ the other one will get an ConcurrencyException and be rolled back and retried, this time executing on an up-to-date saga data. šŸ™‚

I've taken your code to demonstrate the principle here: TestSagaOptimisticConcurrency

Please let me know if it isn't clear how it works from the example.