rebus-org / Rebus.ServiceProvider

:bus: Microsoft Extensions Dependency Injection container adapter for Rebus
https://mookid.dk/category/rebus
Other
65 stars 32 forks source link

Publish Subscribe with multiple buses #64

Closed sbyse closed 2 years ago

sbyse commented 2 years ago

We use Rebus along with RabbitMQ to support both pubsub handling and command sending, it's a great library and has really helped us get up and running quickly.

We are now keen for the pubsub routing be able to support a "modular monolith" architecture. In particular we want different modules to be able to handle specific event message types entirely independently of each other.

I've been playing with the latest setup extensions which seem very close to what we need but from what I can tell our only option is to use an entirely independent ServiceCollection per module calling AddRebusService:

builder.Host.AddRebusService(
    services => {
        services.AddMyEfDatabaseContext();

        services.AddMyRepositories();

        services.AddMyApplicationServices();

        services.AddRebus(
            configure => configure
                .Transport(t => t.UseAzureServiceBus(connectionString, "my-queue-name"))
        );

        services.AddRebusHandler<SomeMessageHandler>();
        services.AddRebusHandler<AnotherMessageHandler>();
    }
);

Conceptually this makes sense and arguably we should be using this approach. The problem is that it would be a major change from our perspective and require a considerable amount of refactoring and testing to ensure each "module" can function with an entirely separate ServiceCollection. What I'd really like is a way to achieve something similar while using a single ServiceCollection, registering handlers only for a specific bus.

Probably something like this:

services.AddRebus(
    configure => configure
        .Transport(t => t.UseRabbitMq(connectionString, "my-queue-name-1"))
        .Handlers(s =>
           {
                s.AddRebusHandlerForBus<Module1.SomethingInterestingHandler>());
                s.AddRebusHandlerForBus<Module1.AnotherInterestingHandler>());
           }),
    isDefaultBus: true,
    key: "bus-1",
    onCreated: async bus => {
        await bus.Subscribe<SomethingInterestingHappened>();
        await bus.Subscribe<AnotherInterestingThingHappened>();
    }
);

services.AddRebus(
    configure => configure
        .Transport(t => t.UseRabbitMq(connectionString, "my-queue-name-2"))
        .Handlers(s =>
           {
                s.AddRebusHandlerForBus<Module2.SomethingInterestingHandler>());
                s.AddRebusHandlerForBus<Module2.AnotherInterestingHandler>());
           }),
    isDefaultBus: false,
    key: "bus-2",
    onCreated: async bus => {
        await bus.Subscribe<SomethingInterestingHappened>();
        await bus.Subscribe<AnotherInterestingThingHappened>();
    }
);

Module1.SomethingInterestingHandler and Module2.SomethingInterestingHandler would then be entirely independent and allow for different error handling strategies etc. An error in one handler wouldn't affect whether or not the other handler gets invoked.

Is there currently any way of achieving this? If not can you recommend the best thing we can do to achieve the same end result? Happy to provide further details or test cases if it would help.

Many thanks!

mookid8000 commented 2 years ago

Is there currently any way of achieving this?

Sorry, no - the thing is: When a bus instance receives a message TMessage, it will look up handlers by resolving IHandleMessages<TMessage> from the container. So it's basically the container that gets to decide which messages can be handled, and by which handlers.

sbyse commented 2 years ago

Thanks for the response, that makes sense.

Looking through the code it feels like it should be possible to extend DependencyInjectionHandlerActivator to allow filtering by the owning bus (based on some additional wiring during setup). Ideally we'd like to allow for both globally-scoped and bus-scoped handlers.

Would you be interested in considering a PR which added that possibility? And if so do you have any pointers for implementation?

mookid8000 commented 2 years ago

Would you be interested in considering a PR which added that possibility?

Thanks for offering it, but I am not really convinced that this should be necessary.

I would encourage anyone using Rebus with Microsoft's host and Microsoft DI to either (a) Host multiple bus instances in the same container and accept that everything is basically capable of doing the same things, or (b) Host multiple bus instances in separate background services with isolated container instances(*)

So if this something you would like to do, I suggest you fork the Rebus.ServiceProvider repo and maintain your own version of the container adapter in your fork. If you later find out that your feature is the best thing to have happened to Rebus.ServiceProvider ever 😁 then please let me know, and then we can pick up this discussion again.

I hope that's alright with you. πŸ™‚ Closing this for now - feel free to pick it up again, if you believe I am missing something.


(*) where (b) in my opinion would be the best approach for building a modular monolith, since (a) would require much more discipline when holding things separate that should not be directly wired together

sbyse commented 2 years ago

Ok makes sense, thanks.

I don't want us to swim against the tide too much so in the first instance I think we'll be taking another look at option b) as you suggest, maybe it won't be as bad as I think! πŸ™‚

mookid8000 commented 2 years ago

(...) maybe it won't be as bad as I think!

I don't thnk it'll be bad. In fact, I think you can come up with an approach this is well-structured and easy to understand (and thus easy to pick apart later) if you apply C# extension methods and a bit of thought... I often see myself having stuff like this going on (taken from a codebase I am working on at the moment):

        services.AddDatabase(Configuration.GetConnectionStringOrThrow("CopperDb"));

        services.AddStorage(Configuration.GetConnectionStringOrThrow("Storage"));

        services.AddApplicationServices();

        services.AddChangeEventProcessor<PositionsMaterializer>((materializer, entries, token) => materializer.Process(entries, token), specificTypes: PositionsMaterializer.RelevantTypes);

        services.AddChangeEventProcessor<DailyProfitLossMaterializer>((materializer, entries, token) => materializer.Process(entries, token), specificTypes: DailyProfitLossMaterializer.RelevantTypes);

        services.AddScheduledTask<RemoveExpiredStateEntries>(interval: TimeSpan.FromMinutes(1), async (task, token) => await task.Execute(token));

        services.AddScheduledTask<SettlementPeriodRowMaterializer>(interval: TimeSpan.FromMinutes(15), async (task, token) => await task.Execute(token));

where all the Add(...) methods are extensions on IServiceCollection, and a bunch of them (AddDatabase, AddStorage, AddChangeEventProcessor, AddScheduledTask) would be good candidates to be called by multiple modules (with different arguments probably).

The AddApplicationServices method though, is the one that registers the relevant repository/factory/helper services in the container, so in this case it would belong to one specific module.

sbyse commented 2 years ago

Hi @mookid8000 ,

Just picking this up again, we've generally been able to use this approach but have found a couple of instances where we need the background service's container to access services from the host (generally something to do with secrets provision or app-wide permission management).

Currently AddRebusService hard codes IHostApplicationLifetime and ILoggerFactory to be forwarded but for our use case we've had to generalise this to allow us to pass in a (short) list of additional types to be forwarded. See https://github.com/sbyse/Rebus.ServiceProvider/commit/0f9b05f87fa492904c6d5ccd2f4bafcdf29a28e0 for details.

I guess it would be possible to abuse this mechanism but it seems like a generally useful bit of configuration to me, happy to setup a PR if you are interested.

mookid8000 commented 2 years ago

Neat! πŸ™‚ I'd definitely accept the PR.

But I have one question though: How does the child container work when resolving forwarded services in a lifetime scope, when the resolves instance(s) implement IDisposable?

In general, I'd assume that it would only be safe to resolve singletons from the parent container, but I could be wrong.

sbyse commented 1 year ago

I've setup the PR: https://github.com/rebus-org/Rebus.ServiceProvider/pull/66

Good question about scoped services, we're only using for singleton services and I agree that using the mechanism for scoped services would need further testing.

sbyse commented 1 year ago

@mookid8000 further to this; I found that using the existing AddTransientForward could lead to forwarded singleton dependencies being disposed if they were used by an event handler.

To avoid this I've updated the PR to use services.AddSingleton which I think is probably the correct thing to do. It means that calling code should only pass in types which are registered as a singleton in the host container but I'd say that was the case previously as well and hopefully it's a bit clearer now .. it would be nice to find a way to enforce this but I'm not sure it's possible