rebus-org / Rebus.Autofac

:bus: Autofac container adapter for Rebus
https://mookid.dk/category/rebus
Other
12 stars 11 forks source link

Using RegisterHandlersFromAssemblyOf, it subscribes to all messages #24

Closed Claker closed 2 years ago

Claker commented 2 years ago

Hi, thanks for implementing Rebus and all it's goodies!

I am using RegisterHandlersFromAssemblyOf to register my handlers. However, based on my understanding, even though I use it, I still need to subscribe to particular messages.

The scenario I have I want to register all handlers( let's say I have 3 handlers ) in an assembly and based on a config value, I want to subscribe only to some messages (not all). So, I call RegisterHandlersFromAssemblyOf, I build the container, I subscribe to only 1 message type but when I run the application, my app handles all 3 types of messages.

If I use builder.RegisterHandler and register only the handler I need, it then handles only that type of message after I subscribe to it.

Is it a bug or I missunderstood how to use RegisterHandlersFromAssemblyOf + Subscribe?

Thank you!

Claker commented 2 years ago

With the example above, even if I remove all .Subscribe calls, the app still handles all messages of all handlers in the assembly

Claker commented 2 years ago

{  
     // other code

     var builder = new ContainerBuilder();

     // other code

     builder.RegisterHandlersFromAssemblyOf<Topic1Handler>(); 

     builder.RegisterRebus((configurer, context) => /* other code*/ );

     _container = builder.Build();

     SubscribeToRabbitMqTopics();
}

private void SubscribeToRabbitMqTopics()
{
     var bus = _container.Resolve<IBus>();

     ....... // WHATEVER I DO BELOW, HAS NO EFFECT... BOTH HANDLERS OF TOPIC1 AND TOPIC2 ARE CALLED AT RUNTIME
     switch (topic)
     {
          case Topics.Topic1:
           bus.Subscribe<Topic1>().Wait();
           break;

      case Topics.Topic2:
           bus.Subscribe<Topic2>().Wait();
           break;

      default:
      Task.WaitAll(
           bus.Subscribe<Topic1>(),
           bus.Subscribe<Topic2>());
           break;
      }
}
mookid8000 commented 2 years ago

This is most likely because you're sharing the input queue with something else, or because the subscriptions have already been established.

The way it works is this:

You will receive whatever goes into your input queue, regardless of what you have handlers for. Rebus doesn't know which handlers it has, only the IoC container knows this. Rebus just receives a message, deserializes it, and then it asks the container for everything that implements IHandleMessages<SomeEvent> (throwing an exception if there's 0, pipelining the handlers if there's more than 1).

You "subscribe" by calling

await bus.Subscribe<SomeEvent>();
// or bus.Advanced.SyncBus.Subscribe<SomeEvent>(); f you're not in an async context

and from that point in time the input queue will start receiving all published events of type SomeEvent - no need to subscribe again, events will queue up, even when the subscriber is not running.

I hope the information above gave you enough to understand why your bus instance behaves the way it does. 🙂

Claker commented 2 years ago

This is most likely because you're sharing the input queue with something else

Yes, that is the case but why would this be a problem? Because each handler subscribes to a different topic so I except I can share a queue for multiple topics.

because the subscriptions have already been established.

That code is run every time an instance spins up and that happens daily for me, so this cannot be the case for me as I see it, right?

The issue I see is the fact that I register all handlers and somehow that already subscribes to the topics, my following action to subscribe has no effect. This happens only at VM instance startup so the subscription cannot exist. Or do you refer to the fact that the subscription exists in the infrastructure already (not the fact that I call Subscribe)?

mookid8000 commented 2 years ago

Yes, that is the case but why would this be a problem? Because each handler subscribes to a different topic so I except I can share a queue for multiple topics.

It's a problem, because there's no way of "filtering" messages received from a queue.

What you should understand is that "topics" are used as a means of routing events into their respective destination queues. but once they're in a queue, they're going to be received by whoever consumes message off of that queue.

Different subscribers should have each their own individual queue and make their own individual subscriptions.

(...) do you refer to the fact that the subscription exists in the infrastructure already (...)

YES the subscription exists in the infrastructure! When you

await bus.Subscribe<Something>();

a "binding" will be created between the topic for Something to the input queue of the bus instance that called this method.

The binding is persistent, so subsequent calls to await bus.Subscribe<Something>(); will simply verify that yes, the subscription is still there.

If you're using Azure Service Bus, the binding comes in the form of a topic entity that represents Something with a subscription entity beneath if, which will forward all received events to the input queue of the subscriber.

I hope that clears it. 🙂

Claker commented 2 years ago

Yes, it does.

Thanks!