BrighterCommand / Brighter

A framework for building messaging apps with .NET and C#.
https://www.goparamore.io/
MIT License
1.98k stars 258 forks source link

Feature: Support Multiple Brokers #3156

Open iancooper opened 1 week ago

iancooper commented 1 week ago

Is your feature request related to a problem? Please describe. The issue relates to the need to read from multiple brokers. Given the following scenario, how we would support the two brokers?

services.AddServiceActivator(options =>
{
    options.Subscriptions = subscriptions1;
    options.ChannelFactory = new AzureServiceBusChannelFactory(asb1);
    options.UseScoped = false;
}).AsyncHandlers(registry =>
{
    registry.RegisterAsync<Event1, EventHandler1Async>();
    registry.RegisterAsync<Event2, EventHandler2Async>();
}).MapperRegistry(registry =>
{
    registry.Register<Event1, EventHandler1Async>();
    registry.Register<Event2, EventHandler2Async>();
});

services.AddServiceActivator(options =>
{
    options.Subscriptions = subscriptions2;
    options.ChannelFactory = new AzureServiceBusChannelFactory(asb2);
    options.UseScoped = false;
}).AsyncHandlers(registry =>
{
    registry.RegisterAsync<Event3, EventHandler3Async>();
    registry.RegisterAsync<Event4, EventHandler4Async>();
}).MapperRegistry(registry =>
{
    registry.Register<Event3, EventHandler3Async>();
    registry.Register<Event4, EventHandler4Async>();
});

Under the existing serviceactivator model, we only read from the first ServiceActivator. At issue here is the channel factory. We have a single channel factory set on the Dispatcher, which forces you to have multiple Service Activators to have multiple Dispatchers, each with their own channel factory. Our hostbuilder support however does not run multiple service activators, just the one.

Describe the solution you'd like It should be possible to consume from multiple sources. There are two options here:

  1. Change the Dispatcher so that it can have multiple channel factories. We would either need a registry of channel factories to subscriptions or add the channel factory to the subscription. This is because we assume the channel type is the same for all subscriptions at this point.
  2. Change the Service Activator support in host builder to run many Dispatchers if there are.

Whilst 1 might seem more satisfying, 2 might not require a breaking change, and thus allow us to release in v9 over v10. We could make a different choice in v10, where we can make breaking changes, if the multi-dispatcher solution was determined to be a work around.

To implement 2 we would need wither to have ServiceActivatorHostedService depend on a collection of Dispatchers and run them all or run multiple ServiceActivatorHostedServices each with its own Dispatcher. There are some API requirements to note around multiple hosted services here. Whilst .NET 8 has a fix for this, we would need to determine if that was available on netstandard20, see Steve Gordon's blog.

Describe alternatives you've considered Generally folks would tend to have one ServiceActivator in a given app, it is a supervisor-worker model (dispatcher-performer) that runs a pump per subscription. You add the serviceactivator as a hosted service and it begins the Dispatcher loop.

If you want two different ASB channel factories for different asb instances then you would tend to use two different apps each with their own service activator).

We don't really support: I want to run two different service activators on background services via hostbuilder, because it's not a scenario we tend to see. It might be possible to run each Dispatcher yourself over using ServiceActivatorHostedService. That pushes you away from our HostBuilder extensions to the underlying APIs such as CommandProcessor, ExternalServiceBus and Dispatcher. It's possible but it gets you into the details of the API.

Another alternative here is just to have two apps, unless you have some overriding reason to have two different ASB connections in the same source application.

Additional context None

preardon commented 6 days ago

Looks like we do Support multiple Brokers,

When setting up the ServiceActivator you set a DefaultChannelFactorfy which is used if you do not set a ChannelFactory on each subscription.

preardon commented 6 days ago

Consider the following code

var rmqConnection = new RmqMessagingGatewayConnection
{
    AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5672")),
    Exchange = new Exchange("paramore.brighter.exchange")
};

var rmqConnection2 = new RmqMessagingGatewayConnection
{
    AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5673")),
    Exchange = new Exchange("paramore.brighter.exchange")
};

var rmqMessageConsumerFactory = new RmqMessageConsumerFactory(rmqConnection);
var rmqMessageConsumerFactory2 = new RmqMessageConsumerFactory(rmqConnection2);

builder.Services.AddServiceActivator(options =>
    {
        options.Subscriptions = new Subscription[]
        {
            new RmqSubscription<MyDistributedEvent>(
                new SubscriptionName("Consumer"),
                new ChannelName("MyDistributedEvent"),
                new RoutingKey("MyDistributedEvent")
            ),
            new RmqSubscription<UpdateProductCommand>(
                new SubscriptionName("Consumer"),
                new ChannelName("UpdateProductCommand"),
                new RoutingKey("UpdateProductCommand"),
                channelFactory: new ChannelFactory(rmqMessageConsumerFactory2)
            ),
            new RmqSubscription<ProductUpdatedEvent>(
                new SubscriptionName("Consumer"),
                new ChannelName("ProductUpdatedEvent"),
                new RoutingKey("ProductUpdatedEvent"),
                requeueCount: 5
            )
        };
        options.ChannelFactory = new ChannelFactory(rmqMessageConsumerFactory);
    })
    .UseExternalBus(Helpers.GetProducerRegistry(rmqConnection))
    .UseInMemoryOutbox()
    .MapperRegistry(r =>
    {
        r.Register<MyDistributedEvent, MessageMapper<MyDistributedEvent>>();
        r.Register<UpdateProductCommand, MessageMapper<UpdateProductCommand>>();
        r.Register<ProductUpdatedEvent, MessageMapper<ProductUpdatedEvent>>();
    })
    .Handlers(r =>
    {
        r.Register<MyDistributedEvent, MyDistributedEventHandler>();
        r.Register<UpdateProductCommand, UpdateProductCommandHandler>();
        r.Register<ProductUpdatedEvent, ProductUpdatedEventHandler>();
    })
    .UseOutboxSweeper(options =>
    {
        options.TimerInterval = 30;
        options.MinimumMessageAge = 500;
    });

This code has connects to 2 different RabbitMQ instances

rmqMessageConsumerFactory (amqp://guest:guest@localhost:5672) is set as the default and has

rmqMessageConsumerFactory2 (amqp://guest:guest@localhost:5673) is set specifically on