antonyvorontsov / RabbitMQ.Client.Core.DependencyInjection

.Net Core library-wrapper of RabbitMQ.Client for Dependency Injection.
MIT License
111 stars 36 forks source link

Connection and channel management in a multi-threaded app #17

Closed rgvlee closed 4 years ago

rgvlee commented 4 years ago

Looking for some advice RE usage of your library (which is great) with regards to a multi threaded app.

Basically we have a headless .net core app which spins up a number of threads to consume/publish to rabbitmq. Using the service collection extensions provided will give us a singleton IQueueService which will persist a single connection/channel.

Persisting the connection is best practice and reusing channel/s is also best practice, however sharing channels between threads and using the same connection for consuming and publishing are not best practice hence my post.

The different connection aspect could probably be resolved using adding my own interfaces to inject separate consume and publish queue services. However that doesn't resolve the issue about sharing channels between threads. From what I can see I'd either need to inject a transient queue service to force the issue (not going to happen as we create a new connection/channel for each instance) or attempt to manually create/manage separate channels for each thread.

I have some thoughts as to how the current interface could be extended to cover these cases but thought it best to ask if there was some functionality I haven't spotted that can sort me out.

Thanks

Lee

antonyvorontsov commented 4 years ago

Hi, Lee!

I really appreciate your suggestion. Go ahead and write down what you have in your pocket :smiley:

But can we take a look at your code before changing anything in the library? Can you give me an example where you struggle the most? And why do you even need to share a channel between threads since QueueService provides all the functionality to produce and consume messages?

Best regards, Antony

rgvlee commented 4 years ago

Hey buddy

I'm not trying to share a channel between threads :smile: I'm suggesting that this is the QueueService default behaviour when using the provided extensions to add it to the DI container. By adding it as a singleton we persist the connection (good) and the channel (sort of good), but in a multi-threaded app this is not considered best practice.

We're also only using a single connection for consuming and producing which isn't best practice.

From what I can see; I'm just looking for either a workaround that I haven't spotted or to see if I can help out with these cases

In terms of a code example, I've created a fork and added a console app in the tests folder of the feature/multithreadedtesting branch. The synopsis is:

The result is we get a handler consuming in it's own thread and a series of producers, each in their own thread, producing/sending messages. Both scenarios hit the same rabbitmq server, in this example they are accessing the same exchange but I believe the library can work with multiple exchanges on the same host.

Each time a new Quartz job starts, we create a new producer service (intended, I want this to be short-lived) and inject the QueueService (so we get the persisted connection).

Sample output from the console app:

image

In this we can see confirmation that the connection/channel is the exact same instance between each Quartz job (expected given QueueService is a singleton) and that each job is running in it's own thread. Additionally we can see that the consumption handler is running in it's own thread (again expected).

What I would like to be able to do is have segregation between the consumption and production connections and potentially a distinct channel for each per thread to satisfy both of the best practice recommendations.

Segregating the connections could be done via separate consumption and production interfaces on the QueueService, adding each to the DI container as singletons instead of single IQueueService, then using each instance in their respective contexts. This will get me distinct consumption and production connections fairly easily, albiet with 2 different injectables. Not necessarily ideal but will work and would be easy to implement. A better way would be to introduce a layer of automatic connection management within the existing QueueService to retain the single injectable but a bit more work would be involved and could get interesting satisfying the existing Connection property.

Having a distinct channel per thread is harder and would probably require proxying over IModel and adding a layer of management to the QueueService to be able to say, 'Have I got a channel for the current thread? Yes, then use that one; No, add a new one to our channel pool and then allocate that'.

None of the above handles the case where you have 2 distinct, unrelated rabbitmq servers that you want to hit. From what I can see that is not a supported feature.

Before I get too carried away with suggestions though, can you see any way with the existing functionality that will allow me to inject a QueueService into multiple short-lived threads and have it persist the connection yet have separate channels. Making the QueueService transient is one way to achieve that but we then go against best practice about ensuring we have long-lived connections.

antonyvorontsov commented 4 years ago

Hi again, Lee!

Now I see what the problem is. But I will look at your example code this weekend, don't have enough free time to make a deep dive into it yet. I just wanted to make a reply, so we can continue our conversation.

Yes, you are absolutely right. QueueService is a singleton and there is not other way to register that service now. Adding new extension methods that will allow you to add it as a transient is easy actually. In addition, I have almost finished developing the matching pattern features and I will release a new library version soon. So I can add those extension methods in that release. Thus, we can solve your problem (without looking at best practices) and that solution obviously will be temporary. I'm also worried that using the transient QueueService, we can reach the maximum connection limit, but okay, let's say that this is the feature created for advanced users who know what they are doing 😄

Moving on best practices can be applied. The suggestion of the interface segregation is great, but I will do it in a slightly different way. A very important nuance to keep in mind is backward compatibility with previous versions and I don't think I want to make breaking changes. In this case we can avoid them creating two separate interfaces also keeping IQueueService interface like this:

public interface IProducingService
{
        IConnection ProducingConnection { get; }

        IModel ProducingChannel { get; }

       void Send<T>(T @object, string exchangeName, string routingKey) where T : class;
       // other Send methods.
}

public interface IConsumingService
{
        IConnection Connection { get; }

        IModel Channel { get; }

        void StartConsuming();
}

public interface IQueueService : IProducingService , IConsumingService
{
}

Nothing will change for people who don't care about channel management in a multi-threaded application.

There has to be some workaround with existing extension methods, but let's leave it out, it's not that important right now. So this is the next step and the next library version. I also want to write some integration tests which can cover my back while I am developing new features. And that also will take some time.

Unfortunately, this library suffers from design mistakes as it was created as a "pet" project without thinking about covering most user needs. But since programmers find it useful I have to deal with those mistakes. So I am very pleased that there is someone willing to help me. Thank you

rgvlee commented 4 years ago

If the potential is there to accommodate some of these best practices, consider doing the MVP at the moment. I'll run some tests on a transient configuration and see if it works as I expect it to.

I wouldn't necessarily add more interfaces until you're confident that is where you want your library to go. It was a suggestion as a temporary workaround more so for me to implement rather than take the library down a less than optimal path, once this sort of thing is in and being used it's hard to deprecate. I'd opt for a mid-term considered, well designed implementation over a short term fix.

I won't be running the consumption and production in the same thread, they will always be separate. I am also not concerned about connection limits as I won't run enough threads to even touch them. Provided that the consumption handler can persist the connection, which my hunch says it will, then it'll probably be fine for my application for the production threads to each spin up a new connection - there won't be many of them, they will unlikely overlap and there will be a decent interval between each new start - until further work can be considered.

If you're happy to take contributions, more than happy to help out.

antonyvorontsov commented 4 years ago

I have done the part with transient services. You can find it in the feature/transient-queue-service branch. The signature is almost the same (AddRabbitMqClientTransient). Check it out, I can't wait to see if this suits you or not. If it is then I will release a new version ASAP.

As for contributing, I would love you to participate! I will add you as a collaborator to the project as soon as I start making changes with the IQueueService interface segregation so we can handle it together.

rgvlee commented 4 years ago

Thanks dude, that's working like I'd expect; my hack integration test gives us

image

So the consumption handler is persisting the connection and each production thread is spinning up a new connection. As I mentioned for my usage this should be fine, for the most part there should be a decent interval between threads executing.

If these extensions are intended to be a short term implementation, may be worth noting it in the xml summary and/or the doco (if you want to expose the functionality). Just to ensure that people know what they're getting into if they want to use it :smile:

antonyvorontsov commented 4 years ago

Great!

I think I will keep those transient methods so people can use it if they want. I mentioned those methods in the documentation file and let them stay. It will be like an advanced option (a.k.a. "Use this if you understand how").

I have just released a new version of the library 3.1.1 and it is already available in NuGet.

In the future with upcoming changes I will also rewrite the "RabbitMQ configuration" doc to provide a proper coverage of available features.

rgvlee commented 4 years ago

Thanks man. I think you should convert the library to .NET standard as well; that'll allow usage in .NET standard class libraries. Looking at the dependencies this should be a straight switch.

antonyvorontsov commented 4 years ago

Netstandart switch will be added in the v3.1.2 version alongside with ordering feature https://github.com/AntonyVorontsov/RabbitMQ.Client.Core.DependencyInjection/pull/24

Closing this issue since we moved this chat to the gitter.