rebus-org / Rebus

:bus: Simple and lean service bus implementation for .NET
https://mookid.dk/category/rebus
Other
2.26k stars 353 forks source link

Question about "Could not find existing saga data for message" #1146

Closed dazinator closed 3 months ago

dazinator commented 4 months ago

I am running a test in memory. It consists of a "manager" app bus, and an "agent" app bus.

The manager app has a saga. When it is initiated, in that same method it also dispatches some messages in a loop - we are talking to 2 or 3.

public class WorkflowSaga : Saga<WorkflowSagaData>,
    IAmInitiatedBy<ExecuteWorkflowCommandMessage>,   
     IHandleMessages<ChangeStepStateCommandMessage>

    public async Task Handle(ExecuteWorkflowCommandMessage message)
    {    
        // shortened for brevity
        foreach (var item in result.NextSteps!)
        {
            var integrationCommand = new ExecuteStepCommandMessage
            {
                WorkflowId = item.WorkflowId,
                StepId = item.StepId,
                Task = item.TaskDefinition, // etc               
            };
            await _bus.Send(integrationCommand);
        }
     }

Thes agent receives this, does some small work, and sends back a ChangeStepStateCommandMessage which is correlated to the same saga instance for handling.

   public async Task Handle(ChangeStepStateCommandMessage message)
    {
        _logger.LogInformation("Handling step state change message {stepid} {newstate} - worlkflow id {workflowid}.", message.StepId,message.NewState, message.WorkflowId);

    }

The issue I see is that, the agent, dispatches several ChangeStepStateCommandMessage messages destined for this saga instance, however only one of them gets handled, the rest seem to be dropped complaining about "could not find existing saga data for message"

2024-02-26 23:12:45Z] dbug: Rebus.Pipeline.Receive.DispatchIncomingMessageStep[0] Dispatching "Core.Infrastructure.Messages.ExecuteStepCommandMessage, Core" "3f22f222-9a18-49a0-875b-87362ba44518" to 1 handlers took 4091 ms [2024-02-26 23:12:45Z] dbug: Rebus.Pipeline.Receive.DispatchIncomingMessageStep[0] Dispatching "Core.Infrastructure.Messages.ExecuteStepCommandMessage, Core" "db65b0e7-9012-4fbe-bd6c-8667f0ee08c4" to 1 handlers took 4146 ms [2024-02-26 23:12:45Z] dbug: Rebus.Config.DefaultCorrelationErrorHandler[0] Could not find existing saga data for message "ChangeStepStateCommandMessage/3812431446d1402f9cfa12bf14c1cd4d-A-InProgress" [2024-02-26 23:12:45Z] dbug: Rebus.Pipeline.Receive.DispatchIncomingMessageStep[0] Dispatching "Core.Infrastructure.Messages.ChangeStepStateCommandMessage, Core" "3812431446d1402f9cfa12bf14c1cd4d-A-InProgress" to 1 handlers took 0 ms [2024-02-26 23:12:45Z] dbug: Rebus.Sagas.LoadSagaDataStep[0] Found existing saga data with ID 38124314-46d1-402f-9cfa-12bf14c1cd4d for message "ChangeStepStateCommandMessage/3812431446d1402f9cfa12bf14c1cd4d-B-InProgress" [2024-02-26 23:12:45Z] dbug: Rebus.Config.DefaultCorrelationErrorHandler[0] Could not find existing saga data for message "ChangeStepStateCommandMessage/3812431446d1402f9cfa12bf14c1cd4d-A-Complete" [2024-02-26 23:12:45Z] dbug: Rebus.Pipeline.Receive.DispatchIncomingMessageStep[0] Dispatching "Core.Infrastructure.Messages.ChangeStepStateCommandMessage, Core" "3812431446d1402f9cfa12bf14c1cd4d-A-Complete" to 1 handlers took 0 ms [2024-02-26 23:12:45Z] dbug: Rebus.Config.DefaultCorrelationErrorHandler[0] Could not find existing saga data for message "ChangeStepStateCommandMessage/3812431446d1402f9cfa12bf14c1cd4d-B-Complete" [2024-02-26 23:12:45Z] dbug: Rebus.Pipeline.Receive.DispatchIncomingMessageStep[0] Dispatching "Core.Infrastructure.Messages.ChangeStepStateCommandMessage, Core" "3812431446d1402f9cfa12bf14c1cd4d-B-Complete" to 1 handlers took 0 ms [2024-02-26 23:12:45Z] info: Manager.Infrastructure.WorkflowSaga[0] Handling step state change message B InProgress - worlkflow id 38124314-46d1-402f-9cfa-12bf14c1cd4d. [2024-02-26 23:12:45Z] info: Manager.Infrastructure.WorkflowSaga[0] Incrementing workflow. [2024-02-26 23:12:45Z] warn: Manager.Infrastructure.WorkflowSaga[0] Workflow not complete but no next steps to dispatch. [2024-02-26 23:12:45Z] dbug: Rebus.Pipeline.Receive.DispatchIncomingMessageStep[0] Dispatching "Core.Infrastructure.Messages.ChangeStepStateCommandMessage, Core" "3812431446d1402f9cfa12bf14c1cd4d-B-InProgress" to 1 handlers took 1 ms

My test is:

  1. Running in a single process (xunit)
  2. Uses a single InMemoryNetwork
  3. two buses (one for the manager, one for the agent)
  4. InMemoryTransport for both buses
  5. return routing.Sagas(s => { s.StoreInMemory(); s.EnforceExclusiveAccess(); });

I know that messages can be delivered out of order, so I have idempotency checks, and a scheme to discard obsolete messages. However I was not expecting for messages to be discarded without being given the chance to process them in this circumstance.

My gut feeling is that something like this could be happening:

  1. In the saga's IAmInitiatedBy method, it is awaiting Bus.Send() to send some messages and perhaps these are dispatched, handled by the agent, which generates messages destined back to the saga instance, before the saga fully completes its SaveChanges() operations? If the saga doesn't persist its record until sometime after it's IAmInitiatedBy finishes (I am actually not sure about that) then could this explain why I see the "Could not find existing saga data for message" - because those happen to be the messages picked for the sagaa, before the saga has even persisted?
    • Although in this case, sagas are persisted in memory.. but perhaps if that ends up as a task on the thread pool, and message sending / receiving & dispatch is using a dedicated thread, the messages beat the persistence task?
mookid8000 commented 3 months ago

I just coded up this example: https://github.com/rebus-org/Rebus/blob/master/Rebus.Tests/Examples/SagaSendsMessagesToItself.cs

It demonstrates how an in-mem saga can send a bunch of messages to itself, which will not actually be sent until the saga data is saved. Moreover, it shows that all subsequent messages get dispatched with optimistic concurrency.

Could you maybe take a look at the example to see where your code differs?

dazinator commented 3 months ago

Thank you, I will investigate

dazinator commented 3 months ago

One thing at a glance before I get into running the two examples, I use "bus.Send" not "bus.SendLocal" not sure if that makes any difference. The messages sent are routed to another bus's handler, which uses bus.Send() to route messages back to be handled by this saga.

mookid8000 commented 3 months ago
await bus.SendLocal(...);

just means that the the bus will

await bus.Send(...);

with itself as the destination. If you were to explicitly configure the equivalent, it would be something like

services.AddRebus(
    configure => configure
        .Transport(t => t.Use(..., "my-queue"))
        .Routing(r => r.TypeBased().Map<SomeMessage>("my-queue"))
);

// (...)

await bus.Send(new SomeMessage());
mookid8000 commented 3 months ago

Hi @dazinator , did you figure out how your code differed from my example?

dazinator commented 3 months ago

Hi @dazinator , did you figure out how your code differed from my example?

I haven't got back into the issue again just yet sorry, but this does seem like a difference.. your example uses SendLocal to route back to the same bus. My test has two buses, and my sage sends a message which is routed to the other bus (different input queue name) which handles it and sends a message back to my saga bus input queue. Both buses are sharing an in memory network created at the start of the test.

mookid8000 commented 3 months ago

Yeah ok, but as mentioned in the previous comment, it doesn't affect how the message is sent, only where it is sent to.

All other things being equal, await bus.SendLocal(..) should not take as long as await bus.Send(..) followed by await bus.Reply(...) in the other end, so to try to reproduce your issue, await bus.SendLocal(..) would seem like an sharper bet.