rebus-org / Rebus.ServiceProvider

:bus: Microsoft Extensions Dependency Injection container adapter for Rebus
https://mookid.dk/category/rebus
Other
65 stars 32 forks source link

Rebus.Config.ServiceCollectionExtensions+BusStop hosted service causes Bus to attempt to start before being told #57

Closed zlepper closed 2 years ago

zlepper commented 2 years ago

Sample app: ConsoleApp1.zip

It seems the BusStop hosted service, that is added here: https://github.com:rebus-org/Rebus.ServiceProvider/blob/06ff0520f80383e9a42917fed7435c67c601fa1b/Rebus.ServiceProvider/Config/ServiceCollectionExtensions.Bus.cs#L99-L99 somehow causes the bus to attempt to start before it is supposed to. If you look at the sample I attached above I have some commented-out lines for removing that specific service from the service collection, and that causes the bus to defer the start until it is actually told to. The reason this is a problem for us is that we have some retry logic around starting Rebus, so we can wait for RabbitMQ to be ready before we start the rest of the application, and in the same vein, so Rebus failing to connect to RabbitMQ doesn't cause the entire application to crash.

Stacktrace:

[INF] Rebus.Internals.ConnectionManager (Thread #1): Initializing RabbitMQ connection manager for transport with input queue "q"
[WRN] Rebus.Internals.ConnectionManager (Thread #1): Could not establish connection: "None of the specified endpoints were reachable"
[INF] Rebus.Bus.RebusBus (Thread #1): Bus "Rebus 1" stopped
Unhandled exception. Rebus.Injection.ResolutionException: Could not resolve Rebus.Bus.IBus with decorator depth 0 - registrations: Rebus.Injection.Injectionist+Handler
 ---> RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable
 ---> System.AggregateException: One or more errors occurred. (Connection failed)
 ---> RabbitMQ.Client.Exceptions.ConnectFailureException: Connection failed
 ---> System.Net.Sockets.SocketException (11001): No such host is known.
   at System.Net.NameResolutionPal.ProcessResult(SocketError errorCode, GetAddrInfoExContext* context)
   at System.Net.NameResolutionPal.GetAddressInfoExCallback(Int32 error, Int32 bytes, NativeOverlapped* overlapped)
--- End of stack trace from previous location ---
   at RabbitMQ.Client.Impl.TcpClientAdapter.ConnectAsync(String host, Int32 port)
   at RabbitMQ.Client.Impl.TaskExtensions.TimeoutAfter(Task task, TimeSpan timeout)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, TimeSpan timeout)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectOrFail(ITcpClient socket, AmqpTcpEndpoint endpoint, TimeSpan timeout)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingAddressFamily(AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan timeout, AddressFamily family)
   at RabbitMQ.Client.Impl.SocketFrameHandler.ConnectUsingIPv4(AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan timeout)
   at RabbitMQ.Client.Impl.SocketFrameHandler..ctor(AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
   at RabbitMQ.Client.Framing.Impl.IProtocolExtensions.CreateFrameHandler(IProtocol protocol, AmqpTcpEndpoint endpoint, Func`2 socketFactory, TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
   at RabbitMQ.Client.ConnectionFactory.CreateFrameHandler(AmqpTcpEndpoint endpoint)
   at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.EndpointResolverExtensions.SelectOne[T](IEndpointResolver resolver, Func`2 selector)
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.Init(IEndpointResolver endpoints)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   --- End of inner exception stack trace ---
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IEndpointResolver endpointResolver, String clientProvidedName)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IList`1 endpoints, String clientProvidedName)
   at RabbitMQ.Client.ConnectionFactory.CreateConnection(IList`1 endpoints)
   at Rebus.Internals.ConnectionManager.GetConnection()
   at Rebus.RabbitMq.RabbitMqTransport.CreateQueue(String address)
   at Rebus.RabbitMq.RabbitMqTransport.Initialize()
   at Rebus.Config.RebusConfigurer.<>c__DisplayClass13_0.<Start>b__28(IResolutionContext c)
   at Rebus.Injection.Injectionist.Resolver`1.InvokeResolver(IResolutionContext context)
   at Rebus.Injection.Injectionist.ResolutionContext.Get[TService]()
   --- End of inner exception stack trace ---
   at Rebus.Injection.Injectionist.ResolutionContext.Get[TService]()
   at Rebus.Injection.Injectionist.Get[TService]()
   at Rebus.Config.RebusConfigurer.Start()
   at Rebus.Config.DelayedStartupConfigurationExtensions.Create(RebusConfigurer configurer)
   at Rebus.Config.ServiceCollectionExtensions.<>c__DisplayClass1_0.<AddRebus>b__3(IServiceProvider provider)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitFactory(FactoryCallSite factoryCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite callSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope)
   at Microsoft.Extensions.DependencyInjection.ServiceProvider.CreateServiceAccessor(Type serviceType)
   at System.Collections.Concurrent.ConcurrentDictionary`2.GetOrAdd(TKey key, Func`2 valueFactory)
   at Microsoft.Extensions.DependencyInjection.ServiceProvider.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
   at Rebus.Config.ServiceCollectionExtensions.<>c.<AddRebus>b__1_5(IServiceProvider provider)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitFactory(FactoryCallSite factoryCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite callSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitConstructor(ConstructorCallSite constructorCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite callSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitIEnumerable(IEnumerableCallSite enumerableCallSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite callSite, RuntimeResolverContext context)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
   at Microsoft.Extensions.DependencyInjection.ServiceLookup.CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope)
   at Microsoft.Extensions.DependencyInjection.ServiceProvider.CreateServiceAccessor(Type serviceType)
   at System.Collections.Concurrent.ConcurrentDictionary`2.GetOrAdd(TKey key, Func`2 valueFactory)
   at Microsoft.Extensions.DependencyInjection.ServiceProvider.GetService(Type serviceType, ServiceProviderEngineScope serviceProviderEngineScope)
   at Microsoft.Extensions.DependencyInjection.ServiceProvider.GetService(Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
   at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetServices[T](IServiceProvider provider)
   at Program.<Main>$(String[] args) in C:\Users\rhdh\RiderProjects\ConsoleApp1\ConsoleApp1\Program.cs:line 28
   at Program.<Main>$(String[] args) in C:\Users\rhdh\RiderProjects\ConsoleApp1\ConsoleApp1\Program.cs:line 46
   at Program.<Main>(String[] args)
mookid8000 commented 2 years ago

Hi there, I've tried the sample now... the problem is that Rebus actually "starts" when it's resolved the first time (into BusStop), it just starts with number of workers = 0, thus not processing any messages. And then, when you call UseRebus() on the service provider, it'll set number of workers = 1 (or whatever you've configured it to).

So... if RabbitMQ has not been started, then it's not possible for Rebus to start up... in those cases, I recommend you simply let the app crash and let whatever hosting environment you're running it in (Windows Service, K8s, Azure, whatever) restart the app.

Alternatively, wrap the entire app bootstrapping code in a retry loop that makes a couple of attempts before letting the process crash.

Btw. the BusStop thing is there to ensure that the bus is properly disposed when the app shuts down, so removing it will simply disable this part. That means that you should be able to safely replace it with e.g.

serviceProvider.GetRequiredService<IHostApplicationLifetime>()
    .ApplicationStopping.Register(() => serviceProvider.GetRequiredService<IBus>().Dispose());

and this way ensure that Rebus handlers finish processing what they have already received, without any new messages being received.

Sorry for taking so long to answer btw. ๐Ÿ™„

zlepper commented 2 years ago

So... if RabbitMQ has not been started, then it's not possible for Rebus to start up... in those cases, I recommend you simply let the app crash and let whatever hosting environment you're running it in (Windows Service, K8s, Azure, whatever) restart the app.

I would love to do this, but IIS is a bitch, and doesn't always start the app again. Especially not if it fails rapidly, then it just lets it stay dead until a human looks at it :(

We have code that does indeed wrap the Rebus startup and made sure to retry it. However that doesn't work when aspnetcore builds the service provider (Which causes IHostedServices to start). Then the only place I can add the try-catch is all the way out in the program file. Which is a bit problematic, because that means we have to rerun the entire setup logic (Which sets up considerably more things than just Rebus). In the same vein, we also have working shutdown code from previously that works exactly like you mention yourself :)

But fair enough if you recommend just replacing the service instance in the ServiceCollection, it's not that much of a pain, I would just have preferred if there was a way to avoid having to do that.

mookid8000 commented 2 years ago

Bummer dude, I can see you're in a pickle there. If you have any other ideas on how this can be fixed, I'd be happy to hear about it.

Sometimes I have been thinking about Rebus' service provider integration, which I think could be much better if it was more tightly integrated with Microsoft.Extensions.Hosting somehow. Slightly related is also the discussion of whether it should be supported that one could host multiple bus instances in the same container instance.

Again, if you have some nifty ideas, please let me know ๐Ÿ™‚

zlepper commented 2 years ago

Slightly related is also the discussion of whether it should be supported that one could host multiple bus instances in the same container instance.

I would love this honestly! We have some cases where Rebus transactional integration is fantastic, but also other cases where we want true "broadcasting" behavior, for example for cache busting multiple instances of the same services. Wrapping something like Nats is easy enough with Rebus, but then I cannot use RabbitMQ at the same time. (At least not in the same ServiceProvider)

Again, if you have some nifty ideas, please let me know ๐Ÿ™‚

I remember having a problem like this with NServiceBus back when we used them, where the solution essentially ended up being roughly the same where we just removed all their bootstrap logic, and overwrite it with our own, so we could have retries on startup. What we did there (And what we are still doing with Rebus) is that we have a BackgroundService to starting the Bus, that wraps a SafeMessageSession, where all the methods essentially just blocks (Optionally with a timeout), until the bus has actually been started by a background service. That way we can have retries, but we can also start serving some traffic even before the Bus has been started. Same background service then also listens for the shutdown event and stops Rebus again.

mookid8000 commented 2 years ago

OK I've gone ahead and added the ability to host multiple Rebus instances!

The code is currently residing on the feature/hosting branch. You can read about the features here: https://github.com/rebus-org/Rebus.ServiceProvider/blob/feature/hosting/README.md

It's available now on NuGet.org as Rebus.ServiceProvider 8.0.0-b01

Please try it and see how it works out for you. ๐Ÿ˜

zlepper commented 2 years ago

Damn, that was fast! I skimmed through the code, and read the readme, and it looks really good (and nicely transparent, so things generally doesn't have to care about however many bus instances they are working with!

Only thing that comes to mind, and actually in relation to my above mentioned use-case of cache busting. Can I request a specific bus? I can see that I given handler can request the bus instance a given message was received from, however in my case I would be received some rather large "transactional" messages from a RabbitMQ bus, and then would want to massage the data a bit, and then forward it on a Nats bus, which would broadcast the message?

I'll see if I can find some time to play around with it during this week (hopefully even tomorrow!)

mookid8000 commented 2 years ago

OK it seems I have to figure something out around disposal of the bus, because the container will dispose it immediately after startup as it is now... stick around, I'll post back here when I have a solution ๐Ÿ™„

mookid8000 commented 2 years ago

OK I've released Rebus.ServiceProvider 8.0.0-b02 now where the premature disposal thing has been fixed.

Only thing that comes to mind, and actually in relation to my above mentioned use-case of cache busting. Can I request a specific bus? I can see that I given handler can request the bus instance a given message was received from, however in my case I would be received some rather large "transactional" messages from a RabbitMQ bus, and then would want to massage the data a bit, and then forward it on a Nats bus, which would broadcast the message?

No, unfortunately you cannot request a specific bus. I could figure out a way to resolve all of the available buses though, but you would not be able to readily distinguish them from eachother in the code, so I am not sure that it would be that useful.

zlepper commented 2 years ago

Can we maybe do a trick with registering a "marked" version, using a generic parameter? That way I can request IBus<MyMarker>. While not quite as ergonomic, I believe it should be good enough, and dotnet will verify when it's building the service provider that a bus with the given marker exists.

mookid8000 commented 2 years ago

Having thought about your comment for a couple of days, I decided to crank it up a notch and add an IBusRegistry to the container.

Now you can go

services.AddRebus(
    configure => configure
        .Transport(t => t.UseAzureServiceBus(connectionString, queueName)),

    key: "my-favorite-bus"
);

and then later on get this particular bus instance by going

var registry = provider.GetRequiredService<IBusRegistry>(); //< or have this injected

var myFave = registry.GetBus("my-favorite-bus");

// ๐Ÿค 

It's available in Rebus.ServiceProvider 8.0.0-b03 which is on NuGet.org now ๐Ÿ™‚

This feature also gave me a good way to implement delayed start of the bus. You can read more about it in the readme: ๐Ÿ‘‰ https://github.com/rebus-org/Rebus.ServiceProvider/blob/feature/hosting/README.md