Closed emranttl closed 8 months ago
Could you try registering your saga in the container?
services.AddRebusHandler<SalesCreateSaga>();
?
I face a very similar issue. I work at Visma e-conomic and trying to set up a Saga.
serviceCollection.AddRebus(c => c
.Logging(l => l.Serilog())
.Options(b => {
b.RetryStrategy(errorQueue, maxNumberRetries);
b.SetNumberOfWorkers(maxNumberOfWorkers);
b.SetMaxParallelism(maxParallelism);
b.Decorate<IPipeline>(c =>
{
var pipeline = c.Get<IPipeline>();
var step = new RebusSetTracingHeadersStep(c.Get<IRebusLoggerFactory>());
return new PipelineStepInjector(pipeline)
.OnSend(step, PipelineRelativePosition.After, typeof(AssignDefaultHeadersStep));
});
}
)
.Options(RegisterTracingStep)
.Transport(t => t.UseRabbitMq(rabbitMqUri, inputQueue)
.ExchangeNames(topicExchangeName: topicExchangeName))
.Sagas(s => s.StoreInMemory())
.Routing(r => r.TypeBased()),
onCreated: async bus => await SubscribeToEvents(bus));
..........................
private static async Task SubscribeToEvents(IBus bus)
{
await bus.Subscribe<FileToOffloadAction>();
await bus.Subscribe<StartColdStorageSaga>();
await bus.Subscribe<FileOffloaded>();
}
Saga looks like this:
public class ColdStorageSaga : Saga<ColdStorageSagaData>,
IAmInitiatedBy<StartColdStorageSaga>,
IHandleMessages<FileToOffloadAction>,
IHandleMessages<FileOffloaded>
{
private readonly IBus _bus;
private readonly ILogger<ColdStorageSaga> _logger;
protected override void CorrelateMessages(
ICorrelationConfig<ColdStorageSagaData> config)
{
config.Correlate<StartColdStorageSaga>(m => m.Id, d => d.Id);
config.Correlate<FileOffloaded>(m => m.SagaId, d => d.Id);
config.Correlate<FileToOffloadAction>(m => m.SagaId, d => d.Id);
}
public ColdStorageSaga(IBus bus, ILogger<ColdStorageSaga> logger)
{
_bus = bus;
_logger = logger;
}
public async Task Handle(FileToOffloadAction message)
{
if (!Data.OffloadingState.ContainsKey(message.FileName))
{
Data.OffloadingState.Add(message.FileName, false);
await _bus.Publish(new FileToOffload {
Content = message.Content,
AgreementNumber = message.AgreementNumber,
FileName = message.FileName,
FilteringConfiguration = message.FilteringConfiguration,
SagaId = message.SagaId,
});
}
}
public async Task Handle(FileOffloaded message)
{
Data.OffloadingState[message.FileName] = true;
if (Data.OffloadingState.Values.All(value => value) && Data.OffloadingState.Count == Data.FilesCount)
{
_logger.LogInformation($"All files of Saga {message.SagaId} were offloaded to storage successfully. Sending command to delete agreement");
await _bus.Publish(new AgreementDataToDelete()
{
AgreementNumber = message.AgreementNumber,
FilterConfiguration = message.FilteringConfiguration
});
MarkAsComplete();
}
}
public Task Handle(StartColdStorageSaga message)
{
Data.OffloadingState = new Dictionary<string, bool>();
Data.FilesCount = message.FilesCount;
return Task.CompletedTask;
}
}
I verified and all bindings inside the exchange and queue looks correct. Consumers are up and running. The strange part is that I send 2 message types and 1 is handled and one is not:
{"@t":"2024-07-23T12:17:13.3555525Z","@m":"Unhandled exception 1 (FINAL) while handling message with ID \"a086691c-815d-4987-9c3e-0ba04761731e\"","@i":"c044095b","@l":"Warning","@x":"Rebus.Exceptions.MessageCouldNotBeDispatchedToAnyHandlersException: Message with ID a086691c-815d-4987-9c3e-0ba04761731e and type ColdStorage.Models.Messaging.FileToOffloadAction, ColdStorage.Models could not be dispatched to any handlers (and will not be retried under the default fail-fast settings)\r\n at Rebus.Pipeline.Receive.DispatchIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Datadog.Tracing.RebusLinkTracingHeaderStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Sagas.LoadSagaDataStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Pipeline.Receive.ActivateHandlersStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Pipeline.Receive.HandleRoutingSlipsStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Retry.Simple.DefaultRetryStep.Process(IncomingStepContext context, Func`1 next)","errorNumber":1,"messageId":"a086691c-815d-4987-9c3e-0ba04761731e","SourceContext":"Rebus.Retry.Simple.DefaultExceptionLogger"}
{"@t":"2024-07-23T12:17:13.3690583Z","@m":"Moving message with ID \"a086691c-815d-4987-9c3e-0ba04761731e\" to error queue \"localErrorQueue\" - error details: \"1 unhandled exceptions: 7/23/2024 3:17:13 PM +03:00: Rebus.Exceptions.MessageCouldNotBeDispatchedToAnyHandlersException: Message with ID a086691c-815d-4987-9c3e-0ba04761731e and type ColdStorage.Models.Messaging.FileToOffloadAction, ColdStorage.Models could not be dispatched to any handlers (and will not be retried under the default fail-fast settings)\r\n at Rebus.Pipeline.Receive.DispatchIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Datadog.Tracing.RebusLinkTracingHeaderStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Sagas.LoadSagaDataStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Pipeline.Receive.ActivateHandlersStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Pipeline.Receive.HandleRoutingSlipsStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Retry.Simple.DefaultRetryStep.Process(IncomingStepContext context, Func`1 next)\"","@i":"7c7a277e","@l":"Error","messageId":"a086691c-815d-4987-9c3e-0ba04761731e","queueName":"localErrorQueue","errorDetails":"1 unhandled exceptions: 7/23/2024 3:17:13 PM +03:00: Rebus.Exceptions.MessageCouldNotBeDispatchedToAnyHandlersException: Message with ID a086691c-815d-4987-9c3e-0ba04761731e and type ColdStorage.Models.Messaging.FileToOffloadAction, ColdStorage.Models could not be dispatched to any handlers (and will not be retried under the default fail-fast settings)\r\n at Rebus.Pipeline.Receive.DispatchIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Datadog.Tracing.RebusLinkTracingHeaderStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Sagas.LoadSagaDataStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Pipeline.Receive.ActivateHandlersStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Pipeline.Receive.HandleRoutingSlipsStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.DataBus.ClaimCheck.HydrateIncomingMessageStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func`1 next)\r\n at Rebus.Retry.Simple.DefaultRetryStep.Process(IncomingStepContext context, Func`1 next)","SourceContext":"Rebus.Retry.PoisonQueues.DeadletterQueueErrorHandler"}
I sometimes get the same error for StartColdStorageSaga, too. It's intermittent this one, but for FileToOffloadAction, it always happens
For context, the solution worked as expected when all Message Handlers and my main solution were in 1 application. I am trying to extract the saga out of the solution and be a standalone solution.
I cannot figure out what I am missing.
I'd appreciate a look 🙏🏻
@Claker When people seem to have intermittent problems with messages that could not be dispatched to any handlers, it is often because they have two or more Rebus instances (i.e. DIFFERENT instances - instances of different apps) consuming messages from the same queue.
Could you maybe go and check, to verify that the queue name configured with inputQueue
is not used by anyone else?
You are correct. It works if I change the queue. Trying to understand why, please validate: My expectation was that if I have 2 apps listening to the same queue, because I subscribe to different events from each app, the consumer will know which event types to consume from that queue, but apparently, as soon as the message arrives in the queue, it can be taken by any of it's consumers and it is tried to be handled, hence, the error because my 2 apps had different handlers. So, if the wrong message is fetched by the wrong consumer, then there's no handler for it.
When you're working with message queues, there's no such thing as receiving specific messages from the queue. A consumer will receive ALL the messages from its input queue, with the caveat that if MULTIPLE instances are consuming from the same queue, the messages will be distributed among them! This is called "competing consumers" and is how you distribute load when you're using message queues.
With message queues, you should ONLY have multiple consumers on the same queue if it's multiple instances of the same app.
In all other cases, a consumer should have its own input queue, and you should think of the queue as a mailbox for messages for a specific recipient.
I hope it makes more sense to you now 🙂
Yep, makes sense now, thanks!
I am new to rebus and Saga. Can anyone help on this error message I am getting
unhandled exceptions: 2/7/2024 12:35:52 PM +06:00: Rebus.Exceptions.MessageCouldNotBeDispatchedToAnyHandlersException: Message with ID cae72a91-c020-4cde-adce-f1896bafb1eb and type Sales.API.Sagas.SalesCreatedEvent, Sales.API could not be dispatched to any handlers (and will not be retried under the default fail-fast settings)
I am setting the configuration
builder.Services.AddRebus(rebus => rebus .Transport(t => t.UseRabbitMq(RabbitMqConnectionString, SalesQueueName)) .Sagas(s => s.StoreInPostgres(DefaultConnection, SagaTableName, SagaIdexTableName)), onCreated: async bus => { await bus.Subscribe<SalesCreatedEvent>(); await bus.Subscribe<CreateItemLoad>(); await bus.Subscribe<ItemLoadCreated>(); await bus.Subscribe<StockUpdated>(); });
And added the Saga
` public class SalesCreateSaga : Saga,
IAmInitiatedBy,
IHandleMessages,
IHandleMessages
{
private readonly IBus _bus;
} `
this is the SalesCreatedEvent class
[Serializable] public record SalesCreatedEvent(SalesDTO salesDto); public record ItemLoadCreated(SalesDTO salesDto); public record StockUpdated(SalesDTO salesDto); public record CreateItemLoad(SalesDTO salesDto); public record StockUpdate(SalesDTO salesDto);
I am sending the message in my SalesService
await _bus.Send(new SalesCreatedEvent(salesDto));
Can anyone help on this, what is the wrong with my approach?