rebus-org / Rebus.ServiceProvider

:bus: Microsoft Extensions Dependency Injection container adapter for Rebus
https://mookid.dk/category/rebus
Other
66 stars 32 forks source link

Registering multiple buses via Dependency Injection #81

Closed egmair closed 8 months ago

egmair commented 1 year ago

Hello,

I am currently investigating our setup of Rebus, specifically around registering multiple buses within a singular dependency injection container.

I am utilising Rabbit MQ as my messaging service, with the intent of registering multiple buses that may freely publish/send messages, based on their configuration within the application. I am attempting to register a default bus within the container and additionally register a 'non-default' bus, that has a tighter scope of operations.

I've created a simple console application for demonstration purposes. I've pasted my code below:

IHost host = Host.CreateDefaultBuilder(args)
    .UseWindowsService(cfg =>
    {
        cfg.ServiceName = "Rebus Worker";
    })
    .ConfigureServices((ctx, services) =>
    {
        services.AddOptions<MessageQueueOptions>()
            .BindConfiguration("RabbitMQ");

        services.AddRebusHandler<MessageHandler>();

        services.AddRebus((rebus, provider) =>
        {
            var options = provider.GetRequiredService<IOptions<MessageQueueOptions>>().Value.Default;
            var connectionString = ctx.Configuration.GetConnectionString(options.ConnectionStringKey);

            rebus.Transport(t =>
            {
                var sslSettings = new SslSettings(options.Ssl.UseSsl, options.Ssl.SslServerName,
                    options.Ssl.CertificatePath, options.Ssl.CertificatePath, options.Ssl.SslProtocols, options.Ssl.SslPolicyErrors);

                t.UseRabbitMq(connectionString, options.InputQueueName)
                    .Ssl(sslSettings)
                    .ExchangeNames(options.DirectExchangeName, options.TopicExchangeName);
            })
            .Routing(r =>
            {
                r.TypeBased()
                    .Map<Message>("Messages");
            })
            .Options(o =>
            {
                o.SetBusName(options.BusName);
                o.SimpleRetryStrategy(options.ErrorQueueName);
            })
            .Logging(l =>
            {
                if (options.EnableLogging)
                {
                    l.ColoredConsole();
                }
            })
            .Serialization(s =>
            {
                s.UseNewtonsoftJson(JsonInteroperabilityMode.PureJson);
                s.UseCustomMessageTypeNames()
                    .AddWithCustomName<Message>("Message")
                    .AllowFallbackToDefaultConvention();
            });

            return rebus;
        }, true, async bus =>
        {
            await bus.Subscribe<Message>();
        }, "bus-1");

        services.AddRebus((rebus, provider) =>
        {
            var options = provider.GetRequiredService<IOptions<MessageQueueOptions>>().Value.Sms;
            var connectionString = ctx.Configuration.GetConnectionString(options.ConnectionStringKey);

            rebus.Transport(t =>
            {
                var sslSettings = new SslSettings(options.Ssl.UseSsl, options.Ssl.SslServerName,
                    options.Ssl.CertificatePath, options.Ssl.CertificatePath, options.Ssl.SslProtocols, options.Ssl.SslPolicyErrors);

                t.UseRabbitMqAsOneWayClient(connectionString)
                            .Ssl(sslSettings)
                            .ExchangeNames(options.DirectExchangeName, options.TopicExchangeName);
            })
            .Routing(r =>
            {
                r.TypeBased()
                    .Map<NewEvent>("NewEvent");
            })
            .Options(o =>
            {
                o.SetBusName(options.BusName);
                o.SimpleRetryStrategy(options.ErrorQueueName);
            })
            .Logging(l =>
            {
                if (options.EnableLogging)
                {
                    l.ColoredConsole();
                }
            })
            .Serialization(s =>
            {
                s.UseNewtonsoftJson(JsonInteroperabilityMode.PureJson);
                s.UseCustomMessageTypeNames()
                    .AddWithCustomName<NewEvent>("NewEvent")
                    .AllowFallbackToDefaultConvention();
            });

            return rebus;
        }, false, async bus =>
        {

        }, "bus-2");

        services.AddHostedService<Worker.Worker>();
    })
    .Build();

await host.RunAsync();

In my Worker class, I inject the default IBus implementation, alongside the IBusRegistry, to check if my other non-default bus has been registered correctly. However, upon debugging, I noticed that the second services.AddRebus(...) call does not run the configuration action thus only having one bus registered in the IBusRegistry at run-time: image

I feel like there's a chance I've missed something blatantly obvious here, but I've been scratching my head for hours πŸ˜†

Any information/assistance would be greatly appreciated - cheers!

mookid8000 commented 1 year ago

What happens if you resolve the one-way client from the bus registry?

egmair commented 1 year ago

The following exception is thrown: image

Message: Registry did not contain a bus instance with key 'bus-2'. The key is configured by calling AddRebus(..., key: "your-key") when adding a bus to the container. Also, it's required that the host has been started (or that StartRebus() has been called on the service provider), because the bus instances do not exist before that.

Stack Trace: at Rebus.ServiceProvider.Internals.ServiceProviderBusRegistry.GetBus(String key) at RebusTest.NewEventWorker..ctor(ILogger1 logger, IBus defaultBus, IMessageBusFactory messageBusFactory, IOptions1 rabbitOptions, IBusRegistry busRegistry) in C:\Users\euan.mair\source\repos\RebusTest\RebusTest\NewEventWorker.cs:line 26 at System.RuntimeMethodHandle.InvokeMethod(Object target, Span1& arguments, Signature sig, Boolean constructor, Boolean wrapExceptions) at System.Reflection.RuntimeConstructorInfo.Invoke(BindingFlags invokeAttr, Binder binder, Object[] parameters, CultureInfo culture) at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitConstructor(ConstructorCallSite constructorCallSite, RuntimeResolverContext context) at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite callSite, RuntimeResolverContext context) at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor2.VisitCallSite(ServiceCallSite callSite, TArgument argument) at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitIEnumerable(IEnumerableCallSite enumerableCallSite, RuntimeResolverContext context) at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite callSite, RuntimeResolverContext context) at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor2.VisitCallSite(ServiceCallSite callSite, TArgument argument) at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope) at Microsoft.Extensions.DependencyInjection.ServiceProvider.CreateServiceAccessor(Type serviceType) at System.Collections.Concurrent.ConcurrentDictionary2.GetOrAdd(TKey key, Func`2 valueFactory) at Microsoft.Extensions.DependencyInjection.ServiceProvider.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope) at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType) at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider) at Rebus.Config.ServiceProviderExtensions.<>c__DisplayClass0_0.<g__StartHostedServices|0>d.MoveNext()

mookid8000 commented 1 year ago

Ok I'm starting to suspect that it's because the second registration is a one-way client and that that makes its registration different somehow.

I am unable to do something about it right now. Unless it messes with your plans, I suggest you simply use the default IBus throughout.

glazkovalex commented 8 months ago

Hello! I have an exception too in the ConsoleApp .Net 8: "Registry did not contain a bus instance with key 'OneWayBus'...". Only I don't have any buses in busRegistry from host.Services.GetRequiredService(). There is only the default bus from host.Services.GetRequiredService().

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Rebus.Bus;
using Rebus.Config;
using Rebus.Handlers;
using Rebus.Logging;
using Rebus.Routing.TypeBased;
using Rebus.ServiceProvider;
using Rebus.Transport.InMem;
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace OneWayTransport
{
    class Program
    {
        static InMemNetwork network = new InMemNetwork();
        static async Task Main(string[] args)
        {
            using (var host = CreateHostBuilder(args).Build())
            {
                var bus = host.Services.GetRequiredService<IBus>();
                var busRegistry = host.Services.GetRequiredService<IBusRegistry>();
                IBus busOneWay = busRegistry.GetBus("OneWayBus");
                char key; 
                do
                {
                    var sendAmount = 0;
                    sw.Restart();
                    var messages = Enumerable.Range(1, MessageCount)
                        .Select(async i =>
                        {
                            Interlocked.Add(ref sendAmount, i);
                            await busOneWay.Send(new TestMessage { MessageNumber = i });
                        }).ToArray();
                    Task.WaitAll(messages);
                    Console.WriteLine($"Send: {sendAmount} for {sw.ElapsedMilliseconds / 1000f:N3}c");
                    Console.WriteLine("Press any key to exit or 'r' to repeat.");
                    key = Console.ReadKey().KeyChar;
                } while (key == 'r' || key == 'ΠΊ');
                await host.StopAsync();
            }
            Console.ReadKey();
        }

        public static IHostBuilder CreateHostBuilder(string[] args)
        {
            var builder = Host.CreateDefaultBuilder(args);
            builder.ConfigureServices((hostContext, services) =>
            {
                var consoleLoggerFactory = new ConsoleLoggerFactory(true) { MinLevel = LogLevel.Debug };
                var queue = $"{nameof(OneWayTransport)}.queue";
                services.AddSingleton(consoleLoggerFactory);
                services.AddRebusHandler<TestMessageHandler>();
                services.AddRebus((configurer, serviceProvider) => configurer
                    .Logging(l => l.Use(consoleLoggerFactory))
                    .Transport(t => t.UseInMemoryTransport(network, queue))
                    .Options(o =>
                    {
                        o.SetNumberOfWorkers(10);
                        o.SetMaxParallelism(10);
                    }));
                services.AddRebus((configurer, serviceProvider) => configurer
                    .Logging(l => l.Use(consoleLoggerFactory))
                    .Transport(t => t.UseInMemoryTransportAsOneWayClient(network))
                    .Routing(r => r.TypeBased().Map<TestMessage>(queue))
                    , isDefaultBus: false, key: "OneWayBus");
            });
            return builder;
        }

        internal const int MessageCount = 1;
        internal static Stopwatch sw = Stopwatch.StartNew();
    }

    public class TestMessage
    {
        public int MessageNumber { get; set; }
    }   

    internal class TestMessageHandler : IHandleMessages<TestMessage>
    {
        public Task Handle(TestMessage message)
        {
            Interlocked.Add(ref amount, message.MessageNumber);
            Console.WriteLine($"Received : \"{message.MessageNumber}\" (Thread #{Thread.CurrentThread.ManagedThreadId})");
            if (message.MessageNumber == Program.MessageCount)
                Console.WriteLine($"Received {Program.MessageCount} messages for {Program.sw.ElapsedMilliseconds / 1000f:N3}s");
            return Task.CompletedTask;
        }

        static int amount = 0;
    }
}
egmair commented 8 months ago

Hi @glazkovalex,

I recently looked into this issue I had raised and after going through how services.AddRebus(...) works, it adds an IHostedService registration to the service collection, with a callback to create a new RebusBackgroundService: see here

What I didn't know prior to reading the source code, was how .NET starts up hosted services as your application starts; each hosted service is started sequentially. The background service I had used when I originally raised the issue was infinitely waiting for the bus to be started (using loops :sweat_smile:), this meant the override for StartAsync would never complete, thus block all other hosted services from starting... do you see where the problem is?

A much better way to implement a wait mechanism into your hosted service(s) is using the TaskCompletionSource Class, see my example below:

internal sealed class MyBackgroundService : BackgroundService
{
     private readonly IBusRegistry _busRegistry;
     private readonly TaskCompletionSource _startupTaskCompletionSource = new();

     public MyBackgroundService(IBusRegistry busRegistry, IHostApplicationLifetime hostApplicationLifetime)
     {
          _busRegistry = busRegistry;
          hostApplicationLifetime.ApplicationStarted.Register(() => _startupTaskCompletionSource.SetResult());
     }

     public override async Task ExecuteAsync(CancellationToken stoppingToken)
     {
          // create a task completion source for the stopping token, as token source may be canceled before app starts
          var taskCompletionSource = new TaskCompletionSource();
          stoppingToken.Register(() => taskCompletionSource.SetResult());

          await Task.WhenAny(taskCompletionSource.Task, _startupTaskCompletionSource.Task)
               .ConfigureAwait(false);

          // check if the cancellation was requested and return if so
          if (stoppingToken.IsCancellationRequested)
               return;

          // do work here
     }
}

This will allow your background service(s) to start gracefully, but block the ExecuteAsync method until one of the task completion sources has been completed - either stopping token (likely something was canceled on startup), or the host application started successfully.

See the full article here.

Once I implemented this into our background services, all non-default buses we had setup using services.AddRebus(...) were in the registry as expected.

Hopefully you can apply this to your own example!

Edit: Not too sure how this affects one-way clients, as I've not tested it myself, but fingers-crossed it works for you!

glazkovalex commented 8 months ago

Hello, @egmair! Thanks for the article and recommendations! Yes, the article is good! It is convenient to wrap all sorts of callbacks and events in tasks.

I found a mistake in my case. I didn't have enough host.RunAsync() after var host = CreateHostBuilder(argus).Build(); When adding a host.RunAsync() everything started working as it should.πŸ™‚