rebus-org / Rebus.ServiceProvider

:bus: Microsoft Extensions Dependency Injection container adapter for Rebus
https://mookid.dk/category/rebus
Other
65 stars 32 forks source link

Handling multiple endpoints #32

Closed wizhi closed 4 years ago

wizhi commented 4 years ago

Is this at all possible, using the this ServiceProvider extension?

Skimming the code, it doesn't seem possible, since we can only have 1 IBus implementation, which makes perfect sense when you think about it.

Basically, I'm looking for a way to split internal background jobs from incoming messages. I'd like for these to run in parallel, likely having incoming messages potentially queuing up background jobs.

This could in theory be solved by utilizing more workers, which would increase the throughput, but I'd prefer the separation, since handling the incoming messages doesn't necessarily need to be thread safe.

Aside from this, I also mean to use it for some prioritization of certain messages, since I'm using the Rebus.Async extension for some few synchronous actions. This is a bit of an issue since, from what I can tell, the reply will be sent to the input queue for the bus from which the request was sent. Right now, this hasn't proven to be a problem, but there are huge spikes in my message intake, in which case the request/response would timeout, if enough messages were received during the exchange window. I've seen that RabbitMQ happens to support prioritizing messages, but I'm not sure that I like relying on that too much.

This all seemed like a good case for splitting things up into different endpoints. I could probably solve it by making things more parallel, and then crossing my fingers that it'll process fast enough not to timeout the synchronous exchanges, but I'd be more confident by splitting it up instead.

I really like the ServiceCollection integration, so I'd love if that magic could be preserved. Any suggestions as to how I might accomplish any/all of this?

Also, thanks for the library, it's really nice. :)

mookid8000 commented 4 years ago

Yes, there is a way: One service provider per bus instance. 🙂

You could e.g. use the background service model from Microsoft.Extensions.Hosting to host each instance, and then you add each background worker (e.g. MyBackgroundService) like this:

public static IHostBuilder CreateHostBuilder(string[] args) =>
    Host.CreateDefaultBuilder(args)
        .ConfigureServices((hostContext, services) =>
        {
            services.AddHostedService<MyBackgroundService>();
        });

where a background service could look like this:

public class MyBackgroundService : BackgroundService
{
    readonly IServiceCollection _services = new ServiceCollection();

    public MyBackgroundService()
    {
        // configure the bus

        services.AddRebus(
            configure => configure
                .Transport(t => t.Use(...))
        );
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        using var provider = _services.BuildServiceProvider();

        // start the bus
        provider.UseRebus();

        while (!stoppingToken.IsCancellationRequested)
        {
            await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
        }
    }
}
vegar commented 4 years ago

How would you deal with the stuff coming from CreateDefaultBuilder() and things generally added on the host builder? Like configuration?

mookid8000 commented 4 years ago

In this case, MyBackgroundService can get things injected from the main container (the one behind the host builder), which means that e.g. configuration bound and integrated with the container can be injected via IOption<YourConfiguration>.

(and then, because I personally dislike having to muck around with IOption<> in my services, I like to provide an extra bit of convenience for myself like this:

services.AddSingleton(p => p.GetRequiredService<IOption<MyConfiguration>>().Value);

which means that MyBackgroundService can look nice like this:

public class MyBackgroundService : BackgroundService
{
    readonly IServiceCollection _services = new ServiceCollection();
    readonly MyConfiguration _configuration;

    public MyBackgroundService(MyConfiguration configuration)
    {
        _configuration = configuration ?? throw new ArgumentNullException("cannot even");

        // configure the bus

        services.AddRebus(
            configure => configure
                .Transport(t => t.Use(...))
        );
    }
}
vegar commented 4 years ago

And Ilogger<> - just AddLogging() on each container instance?

mookid8000 commented 4 years ago

Umm well maybe...

I was never a fan of injected loggers, because that only make logging available to things pulled from a container... so I haven’t thought about that.

vegar commented 4 years ago

So, resorting to a static instance instead?

mookid8000 commented 4 years ago

Yes, I've gotten into the habit of simply having something like

static readonly ILogger Logger = Log.ForContext<KafkaesqueStorageEngine>();

at the top of each class that needs a logger.

All sensible logging libraries have so much abstraction and indirection built in already, that it doesn't bring any value to use an IoC container IMO.

joaosb commented 2 years ago

Hi @mookid8000,

I´m having the same problem, since we cannot register multiple Buses on Autofac, I have created a backgroundworker with a bus inside for each queue. but when consuming a message, it throws an error. Like in your code, the provider is local and it´s not initialize the same way as the application, and it cannot resolve any of the handles or any other registration. Is there any way to pass the initial Autofac registration for the background worker ?

Thks :)

mookid8000 commented 2 years ago

Is there any way to pass the initial Autofac registration for the background worker ?

I don't know ... I would probably just go and register all of the necessary things in each container instance myself, and then factor repetitive registrations out into their own extensions (e.g. like services.AddDomainServices();, etc.)