rabbitmq / rabbitmq-dotnet-client

RabbitMQ .NET client for .NET Standard 2.0+ and .NET 4.6.2+
https://www.rabbitmq.com/dotnet.html
Other
2.08k stars 584 forks source link

Using blocking publisher confirms with concurrent publishers #959

Closed MichaelLogutov closed 3 years ago

MichaelLogutov commented 4 years ago

Hello. We're using rabbitmq library version 5 in web applications to publish messages from request. As you may guess it's highly concurrent environment and it's quite common to have 100-500 requests per second sending a message to rabbit.

Trying to update to version 6 of the library (6.2.1 at this time) shows a lot of timeout errors while sending the message. The problem can be reproduced locally and I've attached a simple solution with two projects:

Also I've added simple docker compose file to spin single local rabbitmq node on 127.0.0.11

Each project contains identical code which performs this steps:

  1. Opens single (static) connection
  2. Creates exchange "tests" (durable with type = topic)
  3. Starts 100 parallel tasks trying to publish a message to "test" exchange with publish confirm

Step 3 implemented in 3 different ways and can be switched by passing specific argument when running project:

Running all tests on v5 gets me a very good performance - 7-9 msec per message (100 messages total in parallel). Running all tests on v6 never completed without timeout errors.

Can someone please look into this? Maybe I'm doing something wrong? How to use rabbitmq v6 in high concurrent environment like web?

RabbitMQ.zip

michaelklishin commented 4 years ago

Thank you for your time.

Team RabbitMQ uses GitHub issues for specific actionable items engineers can work on. GitHub issues are not used for questions, investigations, root cause analysis, discussions of potential issues, etc (as defined by this team).

We get at least a dozen of questions through various venues every single day, often light on details. At that rate GitHub issues can very quickly turn into a something impossible to navigate and make sense of even for our team. Because GitHub is a tool our team uses heavily nearly every day, the signal/noise ratio of issues is something we care about a lot.

Please post this to rabbitmq-users.

Thank you.

michaelklishin commented 4 years ago

Sounds like a different case of what was discussed in #876, #945.

michaelklishin commented 4 years ago

Some of these approaches cannot possibly work because concurrent publishing on a shared channel is explicitly not supported.

Opening a channel and enabling confirms mode on it in a critical section, but not for actual publishing, cannot possibly work either and will result in connection-level errors due to incorrect framing in RabbitMQ logs.

WaitForConfirmsOrDie is very inefficient and in this case, hardly even convenient. Streaming confirms with a handler make significantly more sense for medium or high-volume concurrent publishing.

In 6.x, library-level locking around network writes has been eliminated (or greatly reduced, in case of TLS sockets). This means such concurrent operation issues are much more visible.

SendMessageCreateChannelAlways seems safe but that's not how channels are supposed to be used.

This basic integration test is a good starting point. It does not use confirms or streaming confirms but it does demonstrate what kind of concurrent access scenarios are expected (or not).

MichaelLogutov commented 4 years ago

There is not a single concurrent publishing on a channel on any of the methods. Please, read the code - I reuse channels just like it's been stated in documentation. Even on "per thread" there is no path where currently executing code could use another thread's channel.

6.x does not handle even work on SendMessageCreateChannelAlways - it's simply times out almost all the time. Have you even tried to run my sample?

Your suggested integration test assuming that for using v6 application should use SetMinThreads? You can't be serious about that. You can't demand that modern microservice application to limit itself with multithreading just to use rabbitmq v6. No other library we're using in our microservices platform implies such limitations - rabbitmq 5, mssql, pgsql, cassandra, redis, consul.

There is something wrong with you library and multiple people trying to prove it to you and you guys just keep denying it and closing tickets.

lukebakken commented 4 years ago

@MichaelLogutov please be patient. If it seems like we don't understand your code, you should take the time to explain. It was not immediately obvious to me that you weren't sharing channels until I saw this line -

https://github.com/lukebakken/rabbitmq-dotnet-client-959/blob/master/RabbitMQ2/Program.cs#L119-L120

Version 6.0 of this library uses the .NET Thread Pool. You must ensure that enough threads are available in the thread pool when your application starts by doing something like this:

https://github.com/lukebakken/rabbitmq-dotnet-client-959/blob/master/RabbitMQ2/Program.cs#L15-L16

With the above change, your code works fine using version 6.2.1 of this library. Similar changes fixed #860

Your suggested integration test assuming that for using v6 application should use SetMinThreads?

Please educate yourself as to what ThreadPool.SetMinThreads does: link.

@michaelklishin could you suggest a better title for this issue? Something like "Use of ThreadPool in 6.X should be more clearly documented".

@stebet now that the thread pool is being used how can we better guide users so that they do not make the mistake here and in #860? Can we detect if default ThreadPool settings are in place and throw an exception?

@MichaelLogutov here is the output of a run on my Arch Linux workstation. I have no idea why the RabbitMQ2 "avg time" totals are larger than those from RabbitMQ. I don't have time to diagnose further right now (cc @stebet ... ???)

$ ./run.sh 
Microsoft (R) Build Engine version 16.4.0+e901037fe for .NET Core
Copyright (C) Microsoft Corporation. All rights reserved.

  Restore completed in 23.92 ms for /home/lbakken/issues/rabbitmq/rabbitmq-dotnet-client/gh-959/repro/RabbitMQ2/RabbitMQ2.csproj.
  Restore completed in 23.92 ms for /home/lbakken/issues/rabbitmq/rabbitmq-dotnet-client/gh-959/repro/RabbitMQ/RabbitMQ.csproj.
  RabbitMQ2 -> /home/lbakken/issues/rabbitmq/rabbitmq-dotnet-client/gh-959/repro/RabbitMQ2/bin/Debug/netcoreapp3.1/RabbitMQ2.dll
  RabbitMQ -> /home/lbakken/issues/rabbitmq/rabbitmq-dotnet-client/gh-959/repro/RabbitMQ/bin/Debug/netcoreapp3.1/RabbitMQ.dll

Build succeeded.                                                                                                                                                                                                                                                                
    0 Warning(s)                                                                                                                                                                                                                                                                
    0 Error(s)

Time Elapsed 00:00:00.66
~/issues/rabbitmq/rabbitmq-dotnet-client/gh-959/repro/RabbitMQ ~/issues/rabbitmq/rabbitmq-dotnet-client/gh-959/repro
Act: SendMessageCreateChannelAlways
Sent 100 messages, avg time 2 ms, 0 errors (total_millis: 254)
Total time is 0.0459944 seconds

Act: SendMessageCreateChannelPerThread
Sent 100 messages, avg time 1 ms, 0 errors (total_millis: 101)
Total time is 0.0262683 seconds

Act: SendMessageCreateChannelPerThreadWithLock
Sent 100 messages, avg time 0 ms, 0 errors (total_millis: 71)
Total time is 0.0207761 seconds

Act: SendMessageSingleChannel
Sent 100 messages, avg time 2 ms, 0 errors (total_millis: 211)
Total time is 0.0385052 seconds

~/issues/rabbitmq/rabbitmq-dotnet-client/gh-959/repro
~/issues/rabbitmq/rabbitmq-dotnet-client/gh-959/repro/RabbitMQ2 ~/issues/rabbitmq/rabbitmq-dotnet-client/gh-959/repro
Act: SendMessageCreateChannelAlways
Sent 100 messages, avg time 56 ms, 0 errors (total_millis: 5683)
Total time is 0.0782001 seconds

Act: SendMessageCreateChannelPerThread
Sent 100 messages, avg time 24 ms, 0 errors (total_millis: 2432)
Total time is 0.0430014 seconds

Act: SendMessageCreateChannelPerThreadWithLock
Sent 100 messages, avg time 33 ms, 0 errors (total_millis: 3345)
Total time is 0.0699946 seconds

Act: SendMessageSingleChannel
Sent 100 messages, avg time 16 ms, 0 errors (total_millis: 1606)
Total time is 0.0295687 seconds
stebet commented 4 years ago

If highly concurrent publishing is desired, I'd suggest using channels (from System.Threading.Channels) as intermediate buffers, and create worker tasks that consume from those channels and use batch publishing if enough messages are waiting. That way you'll also reduce IO overhead by utilizing fewer connections/models more efficiently.

I'll write up a sample that could be added to the documentation.

@michaelklishin @lukebakken would it make sense to remove the "WaitForConfirms*" methods from the 7.0 version and rather document how users could utilize streaming confirms? It'd simplify a lot of logic in the current client and probably prevent incorrect usage of confirms.

MichaelLogutov commented 4 years ago

@lukebakken Thank you for taking time explaning and reading through my code. About increasing SetMinThreads - maybe I didn't word myself right, but there is a big warning on the same document page about increasing min threads and how it could affect high concurrent applications - that what I was talking about. I think it's a wrong design for a library to demand such implications from app.

@stebet Would be great to see working prototype with the same throughput as v5.

You guys doing a great job and I'm sure you did a lot of work with v6, it's just I responsible for all "platform" libraries in our company and it seems even though I'm a long time rabbitmq library user I can't come up with any solution that can utilize v6 to be as effective as it v5 was in high concurrent web applications. And based on other tickets I'm not the only one.

bollhals commented 4 years ago

I've analyzed the two programs to some degree.

Regarding the Threadpool threads issue: The thing kind of boils down to the way the parallelism is coded out => This way 100 items get queued in the threadpool to be executed.

In v5, all the functions were sync, single-threaded and blocking. => <#Thread-in-pool> were executed in parallel, but the execution was blocking.

In v6, all the network traffic is offloaded on the threadpool (still sync & blocking). => Due to the way the threadpool is used in this example, and the offloading to the threadpool, the result is that all threads are blocked while executing one item and there is no thread left in the threadpool to work on the offloaded work to send to the networkstream.

So in conclusion, it's a combination of us blocking the threads in our functions as well as the way the threadpool is used for parallelism.

PS: If you use Task.Factory.StartNew({...}, TaskCreationOptions.LongRunning), the v6 version runs just fine.

Regarding the @lukebakken output: The total time (as well as the avg, as it's calculated with the total time) is misleading, as when you increase the thread count, more work gets executed in parallel, thus many threads want to be executed on the CPU cores. Obviously this means that one execution will take longer (total millis is added up for each thread) So for a fair comparison, a different setup would be needed with dedicated threads (to not hit the v6 threadpool blockage) that way we can ensure we're measuring the same thing.

bording commented 4 years ago

In v6, all the network traffic is offloaded on the threadpool (still sync & blocking).

This is a really bad place to be. You should never block a threadpool thread.

bollhals commented 4 years ago

In v6, all the network traffic is offloaded on the threadpool (still sync & blocking).

This is a really bad place to be. You should never block a threadpool thread.

Hmm I see now that wording is chosen very poorly... What i meant to say is: The offloaded work to the threadpool (Networking) is done in the normal async, nonblocking fashion. But the other functions on the public API are still as they used to be (sync & blocking).

But yes, it is still not optimal to block the threads calling the public API (as they could, and in this case are, from the threadpool), this is why for 7.0 a full async API is being investigated.

MichaelLogutov commented 4 years ago

@bollhals Thanks, your investigation helps to shed some light on the problem.

Setting SetMinThreads(150, 150) helped, but still I've got 10 times more latency (on average) for sending one message compared to v5.

Changing Task.Run to Task.Factory.StartNew with LongRunning without setting SetMinThreads works too, but still it's 10 times slower than v5.

I say that it's not very clear if it's even recommended using v6 in web applications for those reasons:

  1. I don't think Kestrel starting tasks handling incomming requests using Task.Factory.StartNew with LongRunning. And in any case we can't make assumptions about Kestrel internal implementation details which could change any time.
  2. Using SetMinThreads in web application could have negative impact on overall application performance (see official documentation)
  3. Even if we're using workaround like Task.Factory.StartNew or SetMinThreads, using rabbitmq v6 in high RPS async web request handling scenarios gives slower performance by order of magnitude compared to v5.
bollhals commented 4 years ago

Just some quick input here to think about. The main issue in high rps webservice case is that the call to wait for confirmation is blocking. (All the other functions shouldn't be called in a high rps environment in my opinion, e.g. creating new channels every time) (Open question 2: should you wait for confirms in high rps cases?) So question is can we provide an api that does this but is not blocking? If so we'd fix this issue. (I'm not familiar with the confirmation logic)

MichaelLogutov commented 4 years ago

I've changed the way test is performed making each task to publish 100 messages. So 100 task in parallel sending 100 messages. And results are more adequate - using per_thread I was able to achieve the same speed on v6 as v5 so I stand corrected on point 3 of my previous message. Still, two other points remains the same - for using v6 it seems the only way currently is too enforce min threads which should be done very carefully on different web environments.

michaelklishin commented 4 years ago

@bollhals that should be possible assuming that the client and the calling code are async-oriented/aware. See Strategies for Using Publisher Confirms. Confirms coming from the server are inherently asynchronous. As is the publishing part.

MichaelLogutov commented 4 years ago

About streaming confirms. In our platform rabbitmq plays crucial role delivering asynchronous business events between microservices. It's important for application to send event after business operation is completed. So we use only durable queues and exchanges, mirroring and persistent messages with publish confirmation. We even created reserve channel - an microservice that can receive queue messages via http that app failed to send to rabbitmq server. We even evaluating using XA transaction at this moment. Switching to streaming confirms is impossible for us because we must be sure that message is sent just before completing DB transaction in current web request. So we can't just push message into some internal rabbitmq library queue and hope that no one will shut down the container. So even with streaming confirms we will still need to wait for confirmation for the message we've just sent before leaving current task - basically we will need to repeat what WaitForConfirmsOrDie do.

michaelklishin commented 4 years ago

@MichaelLogutov I'm not buying this argument. Switching to streaming confirms is always a possibility: the confirms arrive asynchronously and at an unknown moment in time anyway, the question is what the publisher does while it has to wait. This client is increasingly moving towards an async-oriented API and there is no reason why this "task" (used in the theoretical sense) cannot be performed without blocking.

Sticking to the blocking API and expecting good throughput and latency is a losing proposition: most users of the client want it to move in the exact opposite direction. So you either stick to the 5.x version which will become unmaintained one day (sooner than later, in fact) or you find a way to adapt to the changes in this client that are all about increasing throughput and efficiency for practical concurrent workloads.

"One-by-one-and-blocking" is a confirm use case that cannot be substantially optimized and will not be a priority for the maintainers and users of this client.

michaelklishin commented 4 years ago

I say that it's not very clear if it's even recommended using v6 in web applications for those reasons

This is a wild assumption: that "Web applications" look the same and can only use WaitForConfirmsOrDie when using publisher confirms. That's not the case.

100% of major changes in this client were driven by two needs of its users (and both code and benchmarking/profiling data was contributed by said users):

@stebet @bollhals and others have contributed very meaningful improvements in some of these areas, and this has continued for 7.0. We did not refactor it the way we did for shits and giggles.

This seems to have had a negative effect on a specific workload that uses blocking publisher confirms in N threads. I guess no user of this (generally recommended against) workload showed up to try it out and report the issue when the changes were discussed months ago. This is how open source software works: those who participate in its development get their ideas in.

@MichaelLogutov please have some respect to the work that was contributed to this client for the needs of other users, or you will be banned from ever filing another issue in this project again. Yes, I am dead serious.

michaelklishin commented 4 years ago

Making a task publish a batch of messages before waiting is strategy number two outlined in the docs. It is a middle ground between the publish-one-and-block-waiting option and using streaming confirms.

If there are specific suggestions to the defaults this library uses, we would consider them for a future 6.x release. They ideally should be backed by an executable benchmark and profiler data. 7.0 is in the process of switching to System.IO.Pipelines and is already pretty different from 6.x. Larger changes can be adopted there to accommodate this use case better. Perhaps some users would choose to skip 6.x then, that's perfectly fine with me.

MichaelLogutov commented 4 years ago

@michaelklishin I never mean any disrespect for anybody involving in this project. Being in charge of more than 20 platform libraries used in 350+ microsevicces I know what it takes to make breaking changes. You guys don't have army or internal client testers like Microsoft so it's understandable (for me at least) that such cases can be happened. It's just reading couple of different tickets about the same root cause and seeing it's being closed leaded me to feeling you don't want to acknowledge the problem. I apologies if I misunderstood that. I was actually very happy back when I first see what's being suggested for v6. And I'm 100% supporting going forward into full async even it's require breaking changed more than need (because that what it takes sometime to have limited resources and big goal).

Now, if I understand you correctly, what you're suggesting is that to use guaranteed delivery for a message we (clients) will have to implement it's own logic spinning channels and organizing awaitable TaskCancellationSource to track delivery of specific messages as a workaround. Ok then, I'll try to go this way. And also incorporate some logic calculating min threads into applications using new library and test it under heavy load - maybe it will be ok.

michaelklishin commented 4 years ago

Your understanding is largely correct in that right now publisher confirms is a mechanism that you integrate into your apps, not a feature flag you flip. We'd like to get there one day but right now channel pooling and confirm tracking are up to the publishing apps. It's not clear whether a library can provide One True Way™ of using publisher confirms. A lot of RabbitMQ team members would like to see something like that in most if not all clients.

In the meantime, there's a clear need for producing some examples that demonstrate how to address a few scenarios, such as moderately high volume RPS Web application with publisher confirms enabled. We can also make the library be less painful to use for those users with all defaults.

For 7.0, maybe there should be an async-only solution for the "publish-one-and-wait" scenario (which is attractive in its simplicity) which is a lot less punishing in terms of resources used or latency (I doubt we can do much about latency but we can give it a shot). We need specific profiling data and suggestions to work towards that, however.

bollhals commented 4 years ago

Just some quick input here to think about. The main issue in high rps webservice case is that the call to wait for confirmation is blocking. (All the other functions shouldn't be called in a high rps environment in my opinion, e.g. creating new channels every time) (Open question 2: should you wait for confirms in high rps cases?) So question is can we provide an api that does this but is not blocking? If so we'd fix this issue. (I'm not familiar with the confirmation logic)

I just double checked the code, and if we self implement a logic for the CountdownEvent that encorporates the async awaiting (and possibly the delivery tag tracking) we could implement a Async wait for confirmations. (Or we use something else than CountdownEvent)

stebet commented 4 years ago

Just some quick input here to think about. The main issue in high rps webservice case is that the call to wait for confirmation is blocking. (All the other functions shouldn't be called in a high rps environment in my opinion, e.g. creating new channels every time) (Open question 2: should you wait for confirms in high rps cases?) So question is can we provide an api that does this but is not blocking? If so we'd fix this issue. (I'm not familiar with the confirmation logic)

I just double checked the code, and if we self implement a logic for the CountdownEvent that encorporates the async awaiting (and possibly the delivery tag tracking) we could implement a Async wait for confirmations. (Or we use something else than CountdownEvent)

We already did at one point, but with a CountdownEvent you lose the ability to track which message IDs have not been confirmed, unless you'd want to add it along with the linked list used to track individual IDs.

Ideally I'd propose removing the tracking feature from the main client and rather add it as some sort of message handler (given that the client would expose a way to track message IDs sent/received).

bollhals commented 4 years ago

I just double checked the code, and if we self implement a logic for the CountdownEvent that encorporates the async awaiting (and possibly the delivery tag tracking) we could implement a Async wait for confirmations. (Or we use something else than CountdownEvent)

We already did at one point, but with a CountdownEvent you lose the ability to track which message IDs have not been confirmed, unless you'd want to add it along with the linked list used to track individual IDs.

Ideally I'd propose removing the tracking feature from the main client and rather add it as some sort of message handler (given that the client would expose a way to track message IDs sent/received).

We currently use a CountdownEvent + the linkedList to track them. We'd just have to make it async for now.

I like your the proposal, lets keep that in mind!

bollhals commented 4 years ago

@michaelklishin How likely is it that we'd add a new async method to wait for confirmations to the interface for the v6? (If it actually solves this issue)

michaelklishin commented 4 years ago

@bollhals I'd love to add a method for 6.3. The only objection to doing so could be a potential interface modification but given that this is a serious enough pain point I'd like to find a way to do this even if the Church of Strict Semantic Versioning™ curses me (I don't mind being a heretic when the cause is right).

Would an extension method do?

bollhals commented 4 years ago

Would an extension method do?

Only (possibly) if we do some shady "cast IModel parameter to ModelBase and call internal method" workaround.

bollhals commented 4 years ago

Ok so I have a working prototype ready locally, there is good news and bad news....

Good news first: It works without modifying the min threads count.

Act: SendMessageSingleChannelNoLocking
Sent 100 messages, avg time 4 ms, 0 errors (total_millis: 425)
Total time is 0.0121535 seconds

Now the bad news: It only does for my newly added test case SendMessageSingleChannelNoLocking that is basically just

        private static void SendMessageSingleChannelNoLocking(IConnection connection)
        {
            _ = connection;
            singleChannel.BasicPublish("test", "test-message", body: Encoding.UTF8.GetBytes("test"));

            singleChannel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(1));
        }

or in my case the async version of it.

Why? Well my funny brain blanked out some other blocking operation (createModel & ConfirmSelect) which happen in the methods. Obviously these will block the pool as well, thus it never gets further down in the method. (It would work if we wouldn't have to create the model for each threadpool thread at the very beginning, or if they would not start all at once.)

The other drawback is that due to async you can't put a lock around the publish anymore (Which you don't need anymore since v6) and due to not being able to control the acknowledgements, the following WaitForConfirms might wait for more than just the one published message. (Not a huge deal, but it for sure waits longer than it needs to)

Summary: v6 has an issue when the threadpool is starved as it can't process messages anymore. The starvation is kind of self inflicted, as most of our public API is internally blocking. If too many threadpool threads are concurrently executing a public API, we starve the threadpool and possibly deadlock ourselves.

Worth pointing out that the most important API is non blocking (BasicPublish) => As long as you already have a model you can publish from as many threads / threadpool as you like.

stebet commented 4 years ago

Normally (YMMV) short-lived operations that finish in milliseconds aren't too much a problem for the ThreadPool, it's once you get into things that can take upwards of hundreds of milliseconds that you quickly start starving the threadpool. All depends on the load though.

But again, this just really surfaces the reason why we need a proper async API :)

michaelklishin commented 4 years ago

@bollhals topology and channel setup operations such as channel.open and confirm.select can be done outside of the pool most of the time. We should not assume that they must run in a thread pool, unlike publishing.

michaelklishin commented 4 years ago

Well, we can always ship a 7.0 with a different scope (based on 6.x with some technically breaking interface changes) and the current master will continue as 8.0.

bollhals commented 4 years ago

@bollhals topology and channel setup operations such as channel.open and confirm.select can be done outside of the pool most of the time. We should not assume that they must run in a thread pool, unlike publishing.

Yes, but in the case of a web call we'll most likely be on one. And if they'll have one per thread, this means it's still possible, with enough concurrent calls, to block all available threads of the threadpool due to channel.open/confirm.select calls.

For them to be running outside means they need to be preopened by the pool/thread, which I'd argue is not so common (you'd mostly only create what's needed not what you might eventually need)

sungam3r commented 4 years ago

Hi. I've been watching this conversation for a long time. Pretty interesting.

topology and channel setup operations such as channel.open and confirm.select can be done outside of the pool most of the time.

can be done or should be done ?

Having a completely asynchronous design is a good goal. But it began to seem to me that the truth is somewhere in the middle, because for some operations it is more profitable to have a dedicated thread(s) to avoid the described problems. As a last resort, this can be made configurable by using an adequate default. In general, I find it odd to force the library client to manipulate the size of the thread pool because it is a application-wide resource. Yes, this can be considered a way to solve the problem, but it is not good to force the client to solve the problem only in this way.

michaelklishin commented 4 years ago

There is no reason to open channels frequently (assuming you don't have to go through connection recovery and such), so they should not happen in the publishing threads.

michaelklishin commented 4 years ago

Opening a new channel to publish a message is inherently inefficient. Opening a channel per thread on application startup is optimal. With some Web frameworks that's perfectly possible. If that's not the case with ASP.NET or whatever is most commonly used today, then going 100% async is the only option and even then there will be a lot of time wasted waiting for those one-off channel.open/channel.open-ok round trips.

sungam3r commented 4 years ago

I understand it all. In theory, this is so. The main thing is that the theory does not contradict practice.

MichaelLogutov commented 4 years ago

A quick update. I've released channel-per-thread implementation on one of our production microservices (netcore 3.1, 35 containers, overall RPS ~2000, about 50 RPS per container with 6k hard CPU limit) and it's seems working the same as v5 did - no improvement, but it seems no degradation either. That's without using SetMinThreads. So, at least on one of our highly loaded microservices it's seems to be working fine at least for now.

sungam3r commented 4 years ago

Regarding channel-per-thread. Do you create dedicated threads?

MichaelLogutov commented 4 years ago

Regarding channel-per-thread. Do you create dedicated threads?

No. I've used ThreadLocal. And a single publish connection for application.

MichaelLogutov commented 4 years ago

Ok, had to rollback the version. In the middle of the night all publishing stopped due to error:

RabbitMQ.Client.Exceptions.ChannelAllocationException: The connection cannot support any more channels. Consider creating a new connection
   at RabbitMQ.Client.Impl.SessionManager.Create()
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateNonRecoveringModel()
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()

I have one connection per application and all channels stored in ThreadLocal and they are being accessed like this:

var channel = this.publishChannel.Value;
if (channel == null || !channel.Channel.IsOpen)
{
    try
    {
        channel?.Dispose();
    }
    catch
    {
        // ignored
    }

    channel = this.CreatePublishChannel();
    this.publishChannel.Value = channel;
}
sungam3r commented 4 years ago

It seems to me you just created more and more new channels, when threads with new identifiers were created in the thread pool, and the old channels remained open (not disposed).

MichaelLogutov commented 4 years ago

Apparently so. Anyway, this shows that using per-thread is not feasible in some applications.

bollhals commented 4 years ago

Apparently so. Anyway, this shows that using per-thread is not feasible in some applications.

It's more about the ThreadLocal usage. Due to this, every thread that ever accessed it creates a new Channel, that is attached to the connection. In your case the connection eventually ran out of channels to be created (I think the absolute max is ushort.MaxValue, but can also be limited from a server side). Why are there so many channels? Well if a thread ends, the ThreadLocal value is becomes eligible to get collected by the GC, but Dispose will never be called for the value! Due to that & the way the SessionManager stores references to the channels, the channel for the terminated thread stays alive (not GC'd) and unclosed (blocks 1 slot in the connection). Over a few hours I imagine that your application will spawn quite a few threads and they'll create non disposed channels until shit hits the fan...

To be fair, this is not a particular issue of this library, but yes using a ThreadLocal or ThreadStatic for this purpose is probably not a good idea if the application runs for a long time. Usage of a Concurrent Collection for pooling or an Objectpool probably makes more sense for your application.

sungam3r commented 4 years ago

To be fair, this is not a particular issue of this library, but yes using a ThreadLocal or ThreadStatic for this purpose is probably not a good idea if the application runs for a long time. Usage of a Concurrent Collection for pooling or an Objectpool probably makes more sense for your application.

100% agree

michaelklishin commented 4 years ago

I have pushed a 7.x branch which can be used for a smaller set of breaking changes needed to address this without going full async (now for 8.0).

MichaelLogutov commented 4 years ago

Status update.

I've changed the code to "channel-per-publish" (create channel for each publish, publish, wait for confirm, dispose channel) and released again. It's working without errors, but even the median time for publish+confirm is higher on v6 compared to v5. I've released v6, v5 then again v6 and here are results (we've got our 5 cluster rabbitmq server heavy loaded today but the difference is still visible on all percentiles):

image

bollhals commented 4 years ago

I've changed the code to "channel-per-publish" (create channel for each publish, publish, wait for confirm, dispose channel) and released again. It's working without errors, but even the median time for publish+confirm is higher on v6 compared to v5. I've released v6, v5 then again v6 and here are results (we've got our 5 cluster rabbitmq server heavy loaded today but the difference is still visible on all percentiles):

With the "channel-per-publish"-approach you'll see decreased throughput due to the channel creation/close overhead. I really do recommend the pooling approach as said in my last comment.

Also for direct comparison of v5 to v6, it's possible that there is a negative difference due to in v6 there are more threads involved to send and wait for confirmation. This is due to the offload to a worker thread for sending which brings limited benefit if you immediately wait for a single confirmation afterwards.

Do you know how many concurrent open channels you have? (In your initial post you said ~100?)

One alternative would be to make the waiting for confirmation different. Is your method that sends the message async? (or could it be modified) In this case you could read the NextPublishSeqNo and create an TaskCompletionSource / IValueTaskSource to complete them when an event from BasicAcks / BasicNacks is received. That way you'd release the blocked thread back to the pool until the message is confirmed and therefore have more threads available in the pool.

MichaelLogutov commented 4 years ago

With the "channel-per-publish"-approach you'll see decreased throughput due to the channel creation/close overhead. I really do recommend the pooling approach as said in my last comment.

Also for direct comparison of v5 to v6, it's possible that there is a negative difference due to in v6 there are more threads involved to send and wait for confirmation. This is due to the offload to a worker thread for sending which brings limited benefit if you immediately wait for a single confirmation afterwards.

Do you know how many concurrent open channels you have? (In your initial post you said ~100?)

I don't have telemetry for that right now on production. The first post was about sandbox app.

One alternative would be to make the waiting for confirmation different. Is your method that sends the message async? (or could it be modified) In this case you could read the NextPublishSeqNo and create an TaskCompletionSource / IValueTaskSource to complete them when an event from BasicAcks / BasicNacks is received. That way you'd release the blocked thread back to the pool until the message is confirmed and therefore have more threads available in the pool.

But for that to work with suggested channel pooling I need to implement some sort of cross channel ack/nack watching? Or ack/nack properly sent to channels that published the message in the first place?

bollhals commented 4 years ago

One alternative would be to make the waiting for confirmation different. Is your method that sends the message async? (or could it be modified) In this case you could read the NextPublishSeqNo and create an TaskCompletionSource / IValueTaskSource to complete them when an event from BasicAcks / BasicNacks is received. That way you'd release the blocked thread back to the pool until the message is confirmed and therefore have more threads available in the pool.

But for that to work with suggested channel pooling I need to implement some sort of cross channel ack/nack watching? Or ack/nack properly sent to channels that published the message in the first place?

AFAIK yes, they're sent back to the channel they were published. (I assume that's why the WaitForConfirmationOrDie methods are also placed on the channel and not on the connection)

MichaelLogutov commented 3 years ago

Moving to channel pooling helped but still it's slower than v5. Moving to async emulation doesn't helped as much - without proper async support from library it's just spin waiting which works on par with WaitForConfirmationOrDie (CPU profit from unlocking threads is negligible).

michaelklishin commented 3 years ago

Thanks to @bollhals, #999 should be a decent stop-gap measure in addressing this before the API eventually switches to be fully async/await-oriented.

@MichaelLogutov would you have a chance to take a look at #999 and give it a try? Note that it has some limitations mentioned in the comments.