rebus-org / Rebus.MySql

:bus: MySQL integration for Rebus
https://mookid.dk/category/rebus
Other
7 stars 6 forks source link

Rebus.Exceptions.ConcurrencyException on saga initiation #12

Closed dietermijle closed 3 years ago

dietermijle commented 3 years ago

Hi,

We have a Saga which is handling multiple batches of the same event (SaveItemsBatch). For example when saving 1000 items, we are sending 10 messages that contain 100 items each. For this saga we tried to only use the IAmInitiatedBy (not the IHandleMessage interface).

public class SaveItemsInBatchSaga : 
    Saga<SaveItemsInBatchSagaData>,
    IAmInitiatedBy<SaveItemsBatch>
{
}

As we are running multiple instances of our service, we sometimes see that the creation of the saga results in the following exception.

Unhandled exception {errorNumber} while handling message with ID {messageId}","@l":"Warning","@x":"Rebus.Exceptions.ConcurrencyException: Saga data Sag.OrderCenter.Messaging.CollectCreateTimeslotsSagaData with ID 1d7f2941-5c86-40ce-8192-3d3d58c9afa4 in table sagas could not be inserted! ---> MySql.Data.MySqlClient.MySqlException: Duplicate entry 'CorrelationId-faff4efe-e653-438e-8ee1-a73b9e0fdcba-Sag.OrderCent' for key 'PRIMARY'\n at MySql.Data.MySqlClient.MySqlStream.ReadPacket()\n at MySql.Data.MySqlClient.NativeDriver.GetResult(Int32& affectedRow, Int64& insertedId)\n at MySql.Data.MySqlClient.Driver.NextResult(Int32 statementId, Boolean force)\n at MySql.Data.MySqlClient.MySqlDataReader.NextResult()\n at MySql.Data.MySqlClient.MySqlCommand.ExecuteReader(CommandBehavior behavior)\n at MySql.Data.MySqlClient.MySqlCommand.ExecuteNonQuery()\n at System.Data.Common.DbCommand.ExecuteNonQueryAsync(CancellationToken cancellationToken)\n--- End of stack trace from previous location where exception was thrown ---\n at Rebus.MySql.Sagas.MySqlSagaStorage.CreateIndex(ISagaData sagaData, MySqlConnection connection, IEnumerable1 propertiesToIndex)\n --- End of inner exception stack trace ---\n at Rebus.MySql.Sagas.MySqlSagaStorage.CreateIndex(ISagaData sagaData, MySqlConnection connection, IEnumerable1 propertiesToIndex)\n at Rebus.MySql.Sagas.MySqlSagaStorage.Insert(ISagaData sagaData, IEnumerable1 correlationProperties)\n at Rebus.Sagas.LoadSagaDataStep.SaveSagaData(RelevantSagaInfo sagaDataToUpdate, Boolean insert)\n at Rebus.Sagas.LoadSagaDataStep.SaveSagaData(RelevantSagaInfo sagaDataToUpdate, Boolean insert)\n at Rebus.Sagas.LoadSagaDataStep.Process(IncomingStepContext context, Func1 next)\n at Rebus.Pipeline.Receive.ActivateHandlersStep.Process(IncomingStepContext context, Func1 next)\n at Rebus.Pipeline.Receive.HandleRoutingSlipsStep.Process(IncomingStepContext context, Func1 next)\n at SaasKit.Multitenancy.Rebus.TenantResolutionMiddleware1.Process(IncomingStepContext context, Func1 next)\n at Rebus.Retry.Simple.FailedMessageWrapperStep.Process(IncomingStepContext context, Func1 next)\n at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func1 next)\n at Rebus.Retry.FailFast.FailFastStep.Process(IncomingStepContext context, Func1 next)\n at Rebus.Retry.Simple.SimpleRetryStrategyStep.DispatchWithTrackerIdentifier(Func1 next, String identifierToTrackMessageBy, ITransactionContext transactionContext, String messageId, String secondLevelMessageId)","errorNumber":1,"messageId":"f72909dc-43f9-4408-b81c-85892c8c2f29","SourceContext":"Rebus.Retry.ErrorTracking.InMemErrorTracker","TntId":1}

How should we handle this best? Can the ResolveConflict method also be used here?


[UPDATE] We did some further investigation and it seems that the above error occurs within the Rebus.MySql library after the message is marked as handled. So the message is also not retried. (Maybe this issue should be moved to the Rebus.MySql project, not sure?)

mookid8000 commented 3 years ago

Hmmm that sounds pretty weird. It looks like the exception occurs during a call to CreateIndex on MySqlSagaStorage, which tells me that this is probably because of a duplicated correlation property.

Correlation properties (i.e. the ones you configure Rebus to be able to retrieve your saga by) must have unique values, so it's probably a fair enough (transient) error, which should result in a successful lookup of the saga instance the second time around.

Although the part where you say

the above error occurs within the Rebus.MySql library after the message is marked as handled. So the message is also not retried.

worries me.... which transport are you using?

dietermijle commented 3 years ago

First of all, thanks for the reply!

For correlating the messages, we are using a unique identifier (CorrelationId) which is the same in the different batches:

 protected override void CorrelateMessages(ICorrelationConfig<SaveItemsInBatchSagaData> config)
{
  _logger.LogDebug("Configuring the correlation for SaveItemsBatch messages");
  config.Correlate<SaveItemsBatch>(m => m.CorrelationId, d => d.CorrelationId);
}

For transport we are using Rebus.AmazonSQS and as already mentioned for storage we are using Rebus.MySql.

Here is an extract of our logs in which you can see that the message is first marked as handled and the saga concurrency exception is thrown afterwards: {"@t":"2021-02-12T13:56:42.6022797Z","@mt":"Sending {messageBody} -> {queueNames}","@l":"Debug","messageBody":"Sag.NextGen.Pigeon.Messages.Events.EventCenter.V2.SetTimeslotsForEventSucceededEvent","queueNames":"pratik-order-center-api","SourceContext":"Rebus.Pipeline.Send.SendOutgoingMessageStep","ActionId":"4fc5985e-ed7e-4025-b3c2-1f6ced4a69c7","ActionName":"Sag.OfferCenter.Api.Controllers.V2.Internal.InternalTestController.EventcenterSettimeslotsforeventsucceededevent (Sag.OrderCenter.Api)","RequestId":"0HM6FBKJFGKRB:00000002","RequestPath":"/internal/v2/test/eventcenter-settimeslotsforeventsucceededevent","CorrelationId":null,"ConnectionId":"0HM6FBKJFGKRB","TntKey":"wN+02/ArCESPzS7jiXI1xA==","TntId":1} {"@t":"2021-02-12T13:56:42.7772384Z","@mt":"Sending {messageBody} -> {queueNames}","@l":"Debug","messageBody":"Sag.NextGen.Pigeon.Messages.Events.EventCenter.V2.SetTimeslotsForEventSucceededEvent","queueNames":"pratik-order-center-api","SourceContext":"Rebus.Pipeline.Send.SendOutgoingMessageStep","ActionId":"4fc5985e-ed7e-4025-b3c2-1f6ced4a69c7","ActionName":"Sag.OfferCenter.Api.Controllers.V2.Internal.InternalTestController.EventcenterSettimeslotsforeventsucceededevent (Sag.OrderCenter.Api)","RequestId":"0HM6FBKJFGKRB:00000002","RequestPath":"/internal/v2/test/eventcenter-settimeslotsforeventsucceededevent","CorrelationId":null,"ConnectionId":"0HM6FBKJFGKRB","TntKey":"wN+02/ArCESPzS7jiXI1xA==","TntId":1} {"@t":"2021-02-12T13:56:42.9476493Z","@mt":"Sending {messageBody} -> {queueNames}","@l":"Debug","messageBody":"Sag.NextGen.Pigeon.Messages.Events.EventCenter.V2.SetTimeslotsForEventSucceededEvent","queueNames":"pratik-order-center-api","SourceContext":"Rebus.Pipeline.Send.SendOutgoingMessageStep","ActionId":"4fc5985e-ed7e-4025-b3c2-1f6ced4a69c7","ActionName":"Sag.OfferCenter.Api.Controllers.V2.Internal.InternalTestController.EventcenterSettimeslotsforeventsucceededevent (Sag.OrderCenter.Api)","RequestId":"0HM6FBKJFGKRB:00000002","RequestPath":"/internal/v2/test/eventcenter-settimeslotsforeventsucceededevent","CorrelationId":null,"ConnectionId":"0HM6FBKJFGKRB","TntKey":"wN+02/ArCESPzS7jiXI1xA==","TntId":1} {"@t":"2021-02-12T13:56:42.9522851Z","@mt":"Starting periodic task {taskDescription} with interval {timerInterval}","@l":"Debug","taskDescription":"RenewPeekLock-281651eb-2ba7-4d88-90ab-975e6566edde","timerInterval":"00:04:00","SourceContext":"Rebus.Threading.TaskParallelLibrary.TplAsyncTask"} TenantContext Resolved. Adding to IncomingStepContext. {"@t":"2021-02-12T13:56:42.9866892Z","@mt":"Created new saga data with ID {sagaDataId} for message {messageLabel}","@l":"Debug","sagaDataId":"a212754c-8e3f-4ce9-8b1d-e9486bda6297","messageLabel":"SetTimeslotsForEventSucceededEvent/3bdd986a-852a-486b-a4c8-da9e96fce4fc","SourceContext":"Rebus.Sagas.LoadSagaDataStep","TntId":1} {"@t":"2021-02-12T13:56:43.0664020Z","@mt":"Mounting message data for message with ID 3bdd986a-852a-486b-a4c8-da9e96fce4fc onto the transactioncontext.","SourceContext":"Rebus.Idempotency.LoadMessageDataStep","TntId":1} {"@t":"2021-02-12T13:56:43.0664849Z","@mt":"Checking if message with ID 3bdd986a-852a-486b-a4c8-da9e96fce4fc has already been processed before.","SourceContext":"Rebus.Idempotency.IdempotentMessageIncomingStep","TntId":1} {"@t":"2021-02-12T13:56:43.0665113Z","@mt":"Message with ID 3bdd986a-852a-486b-a4c8-da9e96fce4fc has not been handled yet.","SourceContext":"Rebus.Idempotency.IdempotentMessageIncomingStep","TntId":1} {"@t":"2021-02-12T13:56:43.0706929Z","@mt":"Updating message storage for the message with ID 3bdd986a-852a-486b-a4c8-da9e96fce4fc as being processed by thread 10","SourceContext":"Rebus.Idempotency.IdempotentMessageIncomingStep","TntId":1} {"@t":"2021-02-12T13:56:43.0834176Z","@mt":"Start handling EventCenter.SetTimeslotsForEventSucceededEvent for event: {eventId}, totalTimeSlots: {totalTimeSlots}, {CorrelationId}, {currentBatch}","eventId":1,"totalTimeSlots":23,"CorrelationId":"5b002c6f-c75c-4661-a0c0-28081f437552","currentBatch":2,"SourceContext":"Sag.OrderCenter.Messaging.Sagas.CreateTimeslotsInBatchSaga","TntId":1} {"@t":"2021-02-12T13:56:54.9342503Z","@mt":"*****************saga id: a212754c-8e3f-4ce9-8b1d-e9486bda6297, correlation id: 5b002c6f-c75c-4661-a0c0-28081f437552","@l":"Debug","SourceContext":"Sag.OrderCenter.Messaging.Sagas.CreateTimeslotsInBatchSaga","TntId":1} {"@t":"2021-02-12T13:56:54.9607505Z","@mt":"Sag.OrderCenter.Domain.Commands.CreateEventCommand#55606099: Started. \n","@l":"Debug","SourceContext":"Sag.NextGen.Raven.Commands.Handlers.Async.LoggingCommandHandler","TntId":1} {"@t":"2021-02-12T13:56:54.9986760Z","@mt":"Transaction 19246507 was started.","@l":"Debug","SourceContext":"Sag.NextGen.Owl.Implementations.MySqlDapperContext","TntId":1} {"@t":"2021-02-12T13:56:54.9987728Z","@mt":"Sag.OrderCenter.Domain.Commands.CreateEventCommand#55606099: Owns transaction 19246507","@l":"Debug","SourceContext":"Sag.NextGen.Raven.Commands.Handlers.MySql.Async.TransactionalCommandHandler","TntId":1} {"@t":"2021-02-12T13:56:55.0075271Z","@mt":"Sending {messageBody} -> {queueNames}","@l":"Debug","messageBody":"Sag.NextGen.Pigeon.Messages.Events.EventCenter.V2.SetTimeslotsForEventSucceededEvent","queueNames":"pratik-order-center-api","SourceContext":"Rebus.Pipeline.Send.SendOutgoingMessageStep","ActionId":"4fc5985e-ed7e-4025-b3c2-1f6ced4a69c7","ActionName":"Sag.OfferCenter.Api.Controllers.V2.Internal.InternalTestController.EventcenterSettimeslotsforeventsucceededevent (Sag.OrderCenter.Api)","RequestId":"0HM6FBKJFGKRB:00000002","RequestPath":"/internal/v2/test/eventcenter-settimeslotsforeventsucceededevent","CorrelationId":null,"ConnectionId":"0HM6FBKJFGKRB","TntKey":"wN+02/ArCESPzS7jiXI1xA==","TntId":1} {"@t":"2021-02-12T13:56:55.1855869Z","@mt":"Sag.OrderCenter.Domain.Commands.CreateEventCommand#55606099: committing transaction 19246507","@l":"Debug","SourceContext":"Sag.NextGen.Raven.Commands.Handlers.MySql.Async.TransactionalCommandHandler","TntId":1} {"@t":"2021-02-12T13:56:55.1912201Z","@mt":"Transaction 19246507 was committed.","@l":"Debug","SourceContext":"Sag.NextGen.Owl.Implementations.MySqlDapperContext","TntId":1} {"@t":"2021-02-12T13:56:55.1914030Z","@mt":"Sag.OrderCenter.Domain.Commands.CreateEventCommand#55606099: Completed","@l":"Debug","SourceContext":"Sag.NextGen.Raven.Commands.Handlers.Async.LoggingCommandHandler","TntId":1} {"@t":"2021-02-12T13:56:55.1915207Z","@mt":"Sag.OrderCenter.Domain.Commands.AddNewTimeslotsCommand#9286666: Started. \n","@l":"Debug","SourceContext":"Sag.NextGen.Raven.Commands.Handlers.Async.LoggingCommandHandler","TntId":1} {"@t":"2021-02-12T13:56:55.1919941Z","@mt":"Transaction 15246793 was started.","@l":"Debug","SourceContext":"Sag.NextGen.Owl.Implementations.MySqlDapperContext","TntId":1} {"@t":"2021-02-12T13:56:55.1920438Z","@mt":"Sag.OrderCenter.Domain.Commands.AddNewTimeslotsCommand#9286666: Owns transaction 15246793","@l":"Debug","SourceContext":"Sag.NextGen.Raven.Commands.Handlers.MySql.Async.TransactionalCommandHandler","TntId":1} {"@t":"2021-02-12T13:56:55.1920855Z","@mt":"Starting handling AddNewTimeslotsCommand for eventId {eventId}","eventId":1,"SourceContext":"Sag.OrderCenter.OfferManagement.BL.EventCommandHandler","TntId":1} {"@t":"2021-02-12T13:56:55.1921364Z","@mt":"Start inserting time slots for event {eventId}, slot count {count}","eventId":1,"count":5,"SourceContext":"Sag.OrderCenter.OfferManagement.Dal.Repositories.EventTimeSlotRepository","TntId":1} {"@t":"2021-02-12T13:56:55.1939252Z","@mt":"Time slots inserted successfully for event {eventId}","eventId":1,"SourceContext":"Sag.OrderCenter.OfferManagement.Dal.Repositories.EventTimeSlotRepository","TntId":1} {"@t":"2021-02-12T13:56:55.1939908Z","@mt":"Ending handling AddNewTimeslotsCommand for eventId {eventId}","eventId":1,"SourceContext":"Sag.OrderCenter.OfferManagement.BL.EventCommandHandler","TntId":1} {"@t":"2021-02-12T13:56:55.1940267Z","@mt":"Sag.OrderCenter.Domain.Commands.AddNewTimeslotsCommand#9286666: committing transaction 15246793","@l":"Debug","SourceContext":"Sag.NextGen.Raven.Commands.Handlers.MySql.Async.TransactionalCommandHandler","TntId":1} {"@t":"2021-02-12T13:56:55.1947865Z","@mt":"Sending {messageBody} -> {queueNames}","@l":"Debug","messageBody":"Sag.NextGen.Pigeon.Messages.Events.EventCenter.V2.SetTimeslotsForEventSucceededEvent","queueNames":"pratik-order-center-api","SourceContext":"Rebus.Pipeline.Send.SendOutgoingMessageStep","ActionId":"4fc5985e-ed7e-4025-b3c2-1f6ced4a69c7","ActionName":"Sag.OfferCenter.Api.Controllers.V2.Internal.InternalTestController.EventcenterSettimeslotsforeventsucceededevent (Sag.OrderCenter.Api)","RequestId":"0HM6FBKJFGKRB:00000002","RequestPath":"/internal/v2/test/eventcenter-settimeslotsforeventsucceededevent","CorrelationId":null,"ConnectionId":"0HM6FBKJFGKRB","TntKey":"wN+02/ArCESPzS7jiXI1xA==","TntId":1} {"@t":"2021-02-12T13:56:55.1995892Z","@mt":"Transaction 15246793 was committed.","@l":"Debug","SourceContext":"Sag.NextGen.Owl.Implementations.MySqlDapperContext","TntId":1} {"@t":"2021-02-12T13:56:55.1997255Z","@mt":"Sag.OrderCenter.Domain.Commands.AddNewTimeslotsCommand#9286666: Completed","@l":"Debug","SourceContext":"Sag.NextGen.Raven.Commands.Handlers.Async.LoggingCommandHandler","TntId":1} {"@t":"2021-02-12T13:56:55.1997744Z","@mt":"*****************saga id: a212754c-8e3f-4ce9-8b1d-e9486bda6297, correlation id: 5b002c6f-c75c-4661-a0c0-28081f437552, Batch: 2","@l":"Debug","SourceContext":"Sag.OrderCenter.Messaging.Sagas.CreateTimeslotsInBatchSaga","TntId":1} {"@t":"2021-02-12T13:56:55.1998122Z","@mt":"Dispatching {messageType} {messageId} to {count} handlers took {elapsedMs:0} ms","@r":["12116"],"@l":"Debug","messageType":"Sag.NextGen.Pigeon.Messages.Events.EventCenter.V2.SetTimeslotsForEventSucceededEvent, Sag.NextGen.Pigeon.Messages","messageId":"3bdd986a-852a-486b-a4c8-da9e96fce4fc","count":1,"elapsedMs":12116.4451,"SourceContext":"Rebus.Pipeline.Receive.DispatchIncomingMessageStep","TntId":1} {"@t":"2021-02-12T13:56:55.1998710Z","@mt":"Marking the message with ID 3bdd986a-852a-486b-a4c8-da9e96fce4fc as been handled.","SourceContext":"Rebus.Idempotency.IdempotentMessageIncomingStep","TntId":1} {"@t":"2021-02-12T13:56:55.1999047Z","@mt":"Saving the message data for message with ID 3bdd986a-852a-486b-a4c8-da9e96fce4fc into the message store.","SourceContext":"Rebus.Idempotency.LoadMessageDataStep","TntId":1} Loaded '/usr/share/dotnet/shared/Microsoft.NETCore.App/2.1.16/System.Diagnostics.StackTrace.dll'. Module was built without symbols. Loaded '/usr/share/dotnet/shared/Microsoft.NETCore.App/2.1.16/System.Reflection.Metadata.dll'. Skipped loading symbols. Module is optimized and the debugger option 'Just My Code' is enabled. {"@t":"2021-02-12T13:56:55.3345748Z","@mt":"Unhandled exception {errorNumber} while handling message with ID {messageId}","@l":"Warning","@x":"Rebus.Exceptions.ConcurrencyException: Saga data Sag.OrderCenter.Messaging.Sagas.CreateTimeslotsInBatchSagaData with ID a212754c-8e3f-4ce9-8b1d-e9486bda6297 in table sagas could not be inserted! ---> MySql.Data.MySqlClient.MySqlException: Duplicate entry 'CorrelationId-5b002c6f-c75c-4661-a0c0-28081f437552-Sag.OrderCent' for key 'PRIMARY'\n at MySql.Data.MySqlClient.MySqlStream.ReadPacket()\n at MySql.Data.MySqlClient.NativeDriver.GetResult(Int32& affectedRow, Int64& insertedId)\n at MySql.Data.MySqlClient.Driver.NextResult(Int32 statementId, Boolean force)\n at MySql.Data.MySqlClient.MySqlDataReader.NextResult()\n at MySql.Data.MySqlClient.MySqlCommand.ExecuteReader(CommandBehavior behavior)\n at MySql.Data.MySqlClient.MySqlCommand.ExecuteNonQuery()\n at System.Data.Common.DbCommand.ExecuteNonQueryAsync(CancellationToken cancellationToken)\n--- End of stack trace from previous location where exception was thrown ---\n at Rebus.MySql.Sagas.MySqlSagaStorage.CreateIndex(ISagaData sagaData, MySqlConnection connection, IEnumerable1 propertiesToIndex)\n --- End of inner exception stack trace ---\n at Rebus.MySql.Sagas.MySqlSagaStorage.CreateIndex(ISagaData sagaData, MySqlConnection connection, IEnumerable1 propertiesToIndex)\n at Rebus.MySql.Sagas.MySqlSagaStorage.Insert(ISagaData sagaData, IEnumerable1 correlationProperties)\n at Rebus.Sagas.LoadSagaDataStep.SaveSagaData(RelevantSagaInfo sagaDataToUpdate, Boolean insert)\n at Rebus.Sagas.LoadSagaDataStep.SaveSagaData(RelevantSagaInfo sagaDataToUpdate, Boolean insert)\n at Rebus.Sagas.LoadSagaDataStep.Process(IncomingStepContext context, Func1 next)\n at Rebus.Pipeline.Receive.ActivateHandlersStep.Process(IncomingStepContext context, Func1 next)\n at Rebus.Pipeline.Receive.HandleRoutingSlipsStep.Process(IncomingStepContext context, Func1 next)\n at SaasKit.Multitenancy.Rebus.TenantResolutionMiddleware1.Process(IncomingStepContext context, Func1 next)\n at Rebus.Retry.Simple.FailedMessageWrapperStep.Process(IncomingStepContext context, Func1 next)\n at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func1 next)\n at Rebus.Retry.FailFast.FailFastStep.Process(IncomingStepContext context, Func1 next)\n at Rebus.Retry.Simple.SimpleRetryStrategyStep.DispatchWithTrackerIdentifier(Func1 next, String identifierToTrackMessageBy, ITransactionContext transactionContext, String messageId, String secondLevelMessageId)","errorNumber":1,"messageId":"3bdd986a-852a-486b-a4c8-da9e96fce4fc","SourceContext":"Rebus.Retry.ErrorTracking.InMemErrorTracker","TntId":1} {"@t":"2021-02-12T13:56:55.4171284Z","@mt":"Stopping periodic task {taskDescription}","@l":"Debug","taskDescription":"RenewPeekLock-281651eb-2ba7-4d88-90ab-975e6566edde","SourceContext":"Rebus.Threading.TaskParallelLibrary.TplAsyncTask","TntId":1} {"@t":"2021-02-12T13:56:55.8478754Z","@mt":"Starting periodic task {taskDescription} with interval {timerInterval}","@l":"Debug","taskDescription":"RenewPeekLock-1d111be8-9b7d-4831-b4a6-79b6f98a72ae","timerInterval":"00:04:00","SourceContext":"Rebus.Threading.TaskParallelLibrary.TplAsyncTask"} TenantContext Resolved. Adding to IncomingStepContext. {"@t":"2021-02-12T13:56:55.8748617Z","@mt":"Found existing saga data with ID {sagaDataId} for message {messageLabel}","@l":"Debug","sagaDataId":"58545e2f-ba94-4759-a26d-795acd501689","messageLabel":"SetTimeslotsForEventSucceededEvent/c953f8c2-709b-4ef4-a6a8-d86dba2ccbbb","SourceContext":"Rebus.Sagas.LoadSagaDataStep","TntId":1} {"@t":"2021-02-12T13:56:55.8815715Z","@mt":"Mounting message data for message with ID c953f8c2-709b-4ef4-a6a8-d86dba2ccbbb onto the transactioncontext.","SourceContext":"Rebus.Idempotency.LoadMessageDataStep","TntId":1} {"@t":"2021-02-12T13:56:55.8816701Z","@mt":"Checking if message with ID c953f8c2-709b-4ef4-a6a8-d86dba2ccbbb has already been processed before.","SourceContext":"Rebus.Idempotency.IdempotentMessageIncomingStep","TntId":1} {"@t":"2021-02-12T13:56:55.8817055Z","@mt":"Message with ID c953f8c2-709b-4ef4-a6a8-d86dba2ccbbb has not been handled yet.","SourceContext":"Rebus.Idempotency.IdempotentMessageIncomingStep","TntId":1} {"@t":"2021-02-12T13:56:55.8850421Z","@mt":"Updating message storage for the message with ID c953f8c2-709b-4ef4-a6a8-d86dba2ccbbb as being processed by thread 10","SourceContext":"Rebus.Idempotency.IdempotentMessageIncomingStep","TntId":1} {"@t":"2021-02-12T13:56:55.8896886Z","@mt":"Start handling EventCenter.SetTimeslotsForEventSucceededEvent for event: {eventId}, totalTimeSlots: {totalTimeSlots}, {CorrelationId}, {currentBatch}","eventId":1,"totalTimeSlots":23,"CorrelationId":"5b002c6f-c75c-4661-a0c0-28081f437552","currentBatch":5,"SourceContext":"Sag.OrderCenter.Messaging.Sagas.CreateTimeslotsInBatchSaga","TntId":1}

mookid8000 commented 3 years ago

Hi @dietermijle , sorry for being so slow to reply 🙂

It's hard for me to diagnose what's tricking you here, sorry....

Could you try and update Rebus.MySql to 3.0.0-b1 and see if it makes a difference?

The Rebus.MySql package has been fully re-ported from Rebus.SqlServer, which should bring it up-to-date with various things.

mookid8000 commented 3 years ago

Hi again, I am taking the liberty to close this issue for now, under the assumption that the issue is "stale" in the sense that it applies to an older version of Rebus.MySql.

Please don't hesitate to make some noise if that is not the case 🙂

dietermijle commented 3 years ago

Hi, we've updated to the new beta version but we see the duplicate entry exception still occurs.

{
  "@t": "2021-02-24T10:13:54.1682044Z",
  "@mt": "Unhandled exception {errorNumber} while handling message with ID {messageId}",
  "@l": "Warning",
  "@x": "MySqlConnector.MySqlException (0x80004005): Duplicate entry 'CorrelationId-e7e85a51-f9ff-4c1c-bdc3-2b013451afc9-CreateTimeslo' for key 'PRIMARY' ---> MySqlConnector.MySqlException (0x80004005): Duplicate entry 'CorrelationId-e7e85a51-f9ff-4c1c-bdc3-2b013451afc9-CreateTimeslo' for key 'PRIMARY'\n   at MySqlConnector.Core.ServerSession.ReceiveReplyAsyncAwaited(ValueTask`1 task) in /_/src/MySqlConnector/Core/ServerSession.cs:line 814\n   at MySqlConnector.Core.ResultSet.ReadResultSetHeaderAsync(IOBehavior ioBehavior) in /_/src/MySqlConnector/Core/ResultSet.cs:line 49\n   at MySqlConnector.MySqlDataReader.ActivateResultSet(CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlDataReader.cs:line 133\n   at MySqlConnector.MySqlDataReader.CreateAsync(CommandListPosition commandListPosition, ICommandPayloadCreator payloadCreator, IDictionary`2 cachedProcedures, IMySqlCommand command, CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlDataReader.cs:line 437\n   at MySqlConnector.Core.CommandExecutor.ExecuteReaderAsync(IReadOnlyList`1 commands, ICommandPayloadCreator payloadCreator, CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/Core/CommandExecutor.cs:line 60\n   at MySqlConnector.MySqlCommand.ExecuteNonQueryAsync(IOBehavior ioBehavior, CancellationToken cancellationToken) in /_/src/MySqlConnector/MySqlCommand.cs:line 264\n   at Rebus.MySql.Sagas.MySqlSagaStorage.CreateIndex(IDbConnection connection, ISagaData sagaData, IEnumerable`1 propertiesToIndex)\n   at Rebus.MySql.Sagas.MySqlSagaStorage.Insert(ISagaData sagaData, IEnumerable`1 correlationProperties)\n   at Rebus.Sagas.LoadSagaDataStep.SaveSagaData(RelevantSagaInfo sagaDataToUpdate, Boolean insert) in C:\\projects-rebus\\Rebus\\Rebus\\Sagas\\LoadSagaDataStep.cs:line 198\n   at Rebus.Sagas.LoadSagaDataStep.Process(IncomingStepContext context, Func`1 next) in C:\\projects-rebus\\Rebus\\Rebus\\Sagas\\LoadSagaDataStep.cs:line 95\n   at Rebus.Pipeline.Receive.ActivateHandlersStep.Process(IncomingStepContext context, Func`1 next) in C:\\projects-rebus\\Rebus\\Rebus\\Pipeline\\Receive\\ActivateHandlersStep.cs:line 47\n   at Rebus.Pipeline.Receive.HandleRoutingSlipsStep.Process(IncomingStepContext context, Func`1 next) in C:\\projects-rebus\\Rebus\\Rebus\\Pipeline\\Receive\\HandleRoutingSlipsStep.cs:line 42\n   at SaasKit.Multitenancy.Rebus.TenantResolutionMiddleware`1.Process(IncomingStepContext context, Func`1 next)\n   at Rebus.Retry.Simple.FailedMessageWrapperStep.Process(IncomingStepContext context, Func`1 next) in C:\\projects-rebus\\Rebus\\Rebus\\Retry\\Simple\\FailedMessageWrapperStep.cs:line 42\n   at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next) in C:\\projects-rebus\\Rebus\\Rebus\\Pipeline\\Receive\\DeserializeIncomingMessageStep.cs:line 34\n   at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next) in C:\\projects-rebus\\Rebus\\Rebus\\DataBus\\ClaimCheck\\HydrateIncomingMessageStep.cs:line 51\n   at Rebus.Retry.FailFast.FailFastStep.Process(IncomingStepContext context, Func`1 next) in C:\\projects-rebus\\Rebus\\Rebus\\Retry\\FailFast\\FailFastStep.cs:line 49\n   at Rebus.Retry.Simple.SimpleRetryStrategyStep.DispatchWithTrackerIdentifier(Func`1 next, String identifierToTrackMessageBy, ITransactionContext transactionContext, String messageId, String secondLevelMessageId) in C:\\projects-rebus\\Rebus\\Rebus\\Retry\\Simple\\SimpleRetryStrategyStep.cs:line 123",
  "errorNumber": 1,
  "messageId": "8dcc03e6-71f1-48e0-911e-caf57ab4791a",
  "SourceContext": "Rebus.Retry.ErrorTracking.InMemErrorTracker",
  "TntId": 9
}
mookid8000 commented 3 years ago

Hi @dietermijle , after having had your issue in the back of my head for a long time, I suddenly recall having seen an error like this before - in that case, it was caused by an IoC container configuration error that caused the saga handler to be registered twice... in turn, this caused two instances of the same saga handler to handle the message, thus consistently failing when trying to save saga instance number two with the same error message as the one you're getting...

Could you try and attach a debugger and see if your saga handler code is run twice?