rebus-org / Rebus

:bus: Simple and lean service bus implementation for .NET
https://mookid.dk/category/rebus
Other
2.26k stars 355 forks source link

Random MessageCouldNotBeDispatchedToAnyHandlersException error #770

Closed GrzegorzBlok closed 5 years ago

GrzegorzBlok commented 5 years ago

Recently, after adding some new messages and moving them to different assembly we noticed random errors breaking our system. Some messages are moved to poison queue with exception of MessageCouldNotBeDispatchedToAnyHandlersException even though that the very same message type are being handled properly just before and just after the incident. We use Simple Injector container and we tested if it resolves the message handler in isolated UT and just after the rebus configuration in live environment. In both situations the handler was resolved by the container.

We observe this on Rebus.SimpleInjector 5.0.0-b02 and 4.0.0. Any idea what could be the cause? I can provide examples (i.e. json) of properly handled message and "broken" message via some private channel if it could be of any help.

Rebus configuration:


...
container.Collection.Register(typeof(IHandleMessages<>), typeof(PatchAppliedHandler).Assembly);
...

container.ConfigureRebus(configurer =>
    configurer
    .Logging(log => log.Log4Net())
    .Routing(r => r.TypeBased()
        .MapAssemblyOf<EuropaBusMessagesAssemblyLookup>("worker-input-queue")
        .MapAssemblyOf<EuropaBusResponsesAssemblyLookup>("data-input-queue"))
    .Options(o =>
    {
        o.SetNumberOfWorkers(1);
        o.SetMaxParallelism(1);
        o.EnableMessageAuditing("data-audit-queue");
        o.LogPipeline();
        o.SetWorkerShutdownTimeout(TimeSpan.FromMinutes(5));
    })
    .Transport(t => t.UseAzureServiceBus(connectionString, "data-input-queue").AutomaticallyRenewPeekLock()).Start());

// ... some more configuration - web api, DB etc.

container.StartBus();

var bus = container.GetInstance<IBus>();

Global.OnApplicationEnd += (sender, args) =>
{
        Log.Info("App is being disposed. Waiting for rebus workers to finish...");
         bus.Dispose();
         Log.Info("Rebus workers disposed. Exiting.");
};

Exception and stack trace:

System.AggregateException: 1 unhandled exceptions ---> Rebus.Exceptions.MessageCouldNotBeDispatchedToAnyHandlersException: Message with ID a84db7ab-a0e0-4872-9a4e-0ad6065e9860 and type Welop.Contractor.Europa.Bus.Responses.Data.PatchAppliedResponse, Welop.Contractor.Europa.Bus.Responses could not be dispatched to any handlers (and will not be retried under the default fail-fast settings)\r\n   at Rebus.Pipeline.Receive.DispatchIncomingMessageStep.<Process>d__3.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at Rebus.Sagas.LoadSagaDataStep.<Process>d__6.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at Rebus.Pipeline.Receive.ActivateHandlersStep.<Process>d__3.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at Rebus.Pipeline.Receive.HandleRoutingSlipsStep.<Process>d__3.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.<Process>d__2.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at Rebus.Auditing.Messages.IncomingAuditingStep.<Process>d__4.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at Rebus.Retry.FailFast.FailFastStep.<Process>d__3.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at Rebus.Retry.Simple.SimpleRetryStrategyStep.<DispatchWithTrackerIdentifier>d__8.MoveNext()\r\n   --- End of inner exception stack trace ---\r\n---> (Inner Exception #0) Rebus.Exceptions.MessageCouldNotBeDispatchedToAnyHandlersException: Message with ID a84db7ab-a0e0-4872-9a4e-0ad6065e9860 and type Welop.Contractor.Europa.Bus.Responses.Data.PatchAppliedResponse, Welop.Contractor.Europa.Bus.Responses could not be dispatched to any handlers (and will not be retried under the default fail-fast settings)\r\n   at Rebus.Pipeline.Receive.DispatchIncomingMessageStep.<Process>d__3.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at Rebus.Sagas.LoadSagaDataStep.<Process>d__6.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at Rebus.Pipeline.Receive.ActivateHandlersStep.<Process>d__3.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at Rebus.Pipeline.Receive.HandleRoutingSlipsStep.<Process>d__3.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.<Process>d__2.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at Rebus.Auditing.Messages.IncomingAuditingStep.<Process>d__4.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at Rebus.Retry.FailFast.FailFastStep.<Process>d__3.MoveNext()\r\n--- End of stack trace from previous location where exception was thrown ---\r\n   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)\r\n   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)\r\n   at Rebus.Retry.Simple.SimpleRetryStrategyStep.<DispatchWithTrackerIdentifier>d__8.MoveNext()<---\r\n
mookid8000 commented 5 years ago

I haven't heard about any issues like this. It would be cool if you could provide some more information, e.g. the message type definition, some JSON, etc.

GrzegorzBlok commented 5 years ago

Here are the two messages (see attachment):

As you can see, they are the same type and there are just few minutes apart. This is how this message is being sent from another server:

var headers = new Dictionary<string, string>
{
    {Headers.CorrelationId, customCorrelationId}
};

await _bus.Send(new PatchAppliedResponse
{
    TenantId = message.TenantId,
    UserUniqueId = message.UserUniqueId,
    ProjectExternalId = message.ProjectExternalId,
    FileId = message.FileId,
    Metadata = Mapper.Map<FileMetada>(file),
    Result = PatchAppliedResponse.CommandResult.Success
}, headers).ConfigureAwait(false);

messages.zip

mookid8000 commented 5 years ago

Hmm, there's nothing odd that strikes me at this point. I'm going skiing the next couple of days, so it'll probably be Wednesday/Thursday before I get back to you.

Please post in here if you figure something out 😐 also, if there's any way you can reproduce this behavior in a separate project, it would be awesome. 👍

MatthewDavidCampbell commented 5 years ago

Just starting looking into the same problem here which only occurs when a handler fails in it's invoke and after the max retries the poison queue handler sends the MessageCouldNotBeDispatchedToAnyHandlersException to the general error queue. The handler for the message type is being called and failing from the final step in the default pipeline (DispatchIncomingMessageStep). What I don't grasp is the order of things. The poison handler is called by the SimpleRetryStrategyStep (the first in the pipeline). How can the handlersInvoked counter not get incremented and the message whip around back to the start of the pipeline when it obviously when throw the same pipeline 2 times before?

Like @GrzegorzBlok we are getting this with message types that derive from base types on 5.0.2 of Rebus.

mookid8000 commented 5 years ago

Hmmm... anyone who can reproduce this odd behavior in a little Console Application or something? Then I'd be happy do debug it....

MatthewDavidCampbell commented 5 years ago

Posted some test code that we could build on to recall the error. Haven't reproduced the problem with a minimal pipeline. Would be great if somebody else could double check that the pipeline setup is fine. Then we can try to adjust the input afterwards.

Update 2019-02-15 Added bus test cases with bombs (throws an exception) and duds. Still can't reproduce the Dispatch error (i.e. no invokers).

MatthewDavidCampbell commented 5 years ago

@GrzegorzBlok Which framework are you targeting? What version of Rebus are you using? We are on Asp Net classic 4.6.1.

GrzegorzBlok commented 5 years ago

@MatthewDavidCampbell We are targeting .net 4.7.2. We always use latest stable Rebus packages. What DI container do you use?

mookid8000 commented 5 years ago

Hey there, any news about this issue?

mkoziel2000 commented 5 years ago

I am running into the same exception getting thrown. My scenario is that I have two processes, each instantiating their own instance of IBus using the BuiltinHandlerActivator and SQLServer for transport & persistence. I have each of them registering a different Saga definition and corresponding messages. When I have them both share the same queue in SQLServer, I get the MessageCouldNotBeDispatchedToAnyHandlersException all the time. It seems like the bus instances do not honor the message topic as part of its decision for whether it should pull the message off the queue and attempt processing it. Is this true of the architecture as a whole or just with the SQLServer package being used as the transport? If its a behavior of the architecture as a whole, then what is the best path forward for supporting horizontal scaling when I want to add additional processes to handle just a subset of all the possible messages that might end up on a given queue?

mookid8000 commented 5 years ago

(...) When I have them both share the same queue (...)

I think that's the problem! 🤔

(...) It seems like the bus instances do not honor the message topic as part of its decision for whether it should pull the message off the queue and attempt processing it (...)

You are absolutely right in your observation. 😄 Queues generally cannot be filtered that way, and if it was possible, it would raise all sorts of questions – e.g. like how do you even distribute a message to two subscribers, if there's one queue, and messages are deleted once they're processed?

THEREFORE: One bus per queue! Use publish/subscribe to distribute multiple copies to multiple subscribers. Use await bus.Subscribe<TEvent>() to bind a queue to a "topic" (in this case an event type).

Remember, you CAN have multiple buses per queue... but ONLY if they're the same! (i.e. multiple instances of the same bus configuration, probably on different machines – that's how you implement competing consumers)

GrzegorzBlok commented 5 years ago

I didn't notice any new occurrences of the issue. It seems to come and go...

mookid8000 commented 5 years ago

Hi @GrzegorzBlok , I'm closing this issue for now. Please don't hesitate to re-open it, if you run into the same symptoms again, and then we'll take it from there.

pelin commented 5 years ago

@GrzegorzBlok @mookid8000 @MatthewDavidCampbell We have the same issue on Rebus 5.2.1 with Rebus.Autofac 5.2.0. It comes and go, and can't be reproduced with simple testcode. The only common thing I noticed is that it occurs only when an exception is thrown (from the application code). We injects a logger, that logs out the "MessageCouldNotBeDispatchedToAnyHandlersException", from inside of Rebus. When the poisoned message is sent to the Error-queue, we get the original exception in the rbs2-error-details. Would be great if someone could dig into this...

mookid8000 commented 5 years ago

@pelin that sounds really weird..... I'll try and see if I can reproduce it

mookid8000 commented 5 years ago

Hey there, I've coded this little program that runs with Rebus 5.2.1 and Rebus.Autofac 5.2.0.

It quickly handles lots of messages, randomly throwing an exception to spice things up.

So far, it has not been able to reproduce any random MessageCouldNotBeDispatchedToAnyHandlersException.

Maybe someone, who has experienced these problems, would look at my code and see if they're doing anything differently, compared to how I do it.

davidxcheng commented 4 years ago

@mookid8000 Seems like the problem occurs because Rebus expects to find handler invokers.

We can consistently reproduce the problem in this scenario:

  1. We are using Azure Service Bus (which doesn't support transactions so that "...behavior is emulated by deferring the actual sending of the outgoing messages until after your code has finished executing")
  2. We are in a message handler (let's call it MyCommandHandler)
  3. We await _bus.Publish(new MyEvent()) inside MyCommandHandler
  4. We don't have a IHandleMessages<MyEvent> handler since we want our event to be consumed by another service (which I believe is a common scenario)

Outcome: given that no exception occurs within MyCommandHandler, the event is published but the MessageCouldNotBeDispatchedToAnyHandlersException shows up in our logs.

We're using Rebus 5.2.1.

p.s. The problem can't be reproduced in your little program because it happens within an IIncomingStep that doesn't seem to be run when using UseInMemoryTransport.

Shoutout to @dnt123 who helped debugging this issue

mookid8000 commented 4 years ago

@davidxcheng It sounds to me then that the problem is that you subscribe to an event you do not intend to handle.

That, or you're using the same input queue name in two different bus instances.

davidxcheng commented 4 years ago

@mookid8000 That was fast - and accurate!

I'd created a subscription while debugging why no events was reaching the subscribers (turns out it was because of the transaction emulation) and forgot to remove the subscription in Azure. Removed the subscription queue and verified that no error occurs.

Thx 👍

mookid8000 commented 4 years ago

@davidxcheng that's good to hear! 😁