houseofcat / RabbitMQ.Dataflows

A set of libraries for rapidly developing RabbitMQ workflows/pipelines.
MIT License
14 stars 3 forks source link

Should ConsumerPipeline use Task.Factory.StartNew with TaskCreationOptions.LongRunning? #27

Closed tiggerite closed 3 years ago

tiggerite commented 3 years ago

This is more of a discussion point than an issue really, as the functionality is ok as it is.

However, because ConsumerPipeline uses Task.Run to create FeedPipelineWithDataTasks, this blocks one of the threadpool threads, and even using SetMinThreads doesn't seem to really alleviate the impact this has when running in Kestrel - it starts to struggle badly when there are more than around 70 of them.

https://github.com/houseofcat/Tesseract/blob/master/src/HouseofCat.RabbitMQ.Pipelines/ConsumerPipeline.cs#L73 https://github.com/houseofcat/Tesseract/blob/master/src/HouseofCat.RabbitMQ.Pipelines/ConsumerPipeline.cs#L82

Should it instead use Task.Factory.StartNew with the option of TaskCreationOptions.LongRunning? As far as I understand it, this would then force the creation of a new thread and then SetMinThreads would have more of an impact, also the threadpool wouldn't lose a thread (unexpectedly).

To get around this issue at the moment I simply switch to using Dataflow instead when the number is sufficiently large, which is okay, but does use a bit more memory; I guess to be expected as it uses BufferBlock as per https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/#relationship-to-the-rest-of-net-core and not a huge problem for now.

houseofcat commented 3 years ago

So a couple of things, Task.Run is basically sugar syntax for StartNew, with default parameters. It does have an option to have a custom TaskScheduler on one StartNew overload which will achieve what you are saying what you want.

https://devblogs.microsoft.com/pfxteam/task-run-vs-task-factory-startnew/

Long running doesn't work the way I think lots of people think it does. It's not a dedicated thread as far as I can recall. Long running means "hey task scheduler, don't expect me back for a while". This affects hysteresis, the threshold for generating new threads (by default new threads - up to SetMaxThreads() - is only performed once every 0.5 seconds). So that would be all you use lose, is if these were the last 2 threads in the pool, you would have to wait 1.0 seconds of CPU time, to get replacement threads assuming they were returned to the optimum levels (this is a whole hard to explain thing sorry). Even then if you use async and await that isn't fully commited threads (i.e. I took two threads). The thread used will eventually yield because of the code. So for a very hot calculation loop, that's synch, like is prime, long running is chosen because the thread really won't be back for a while. For a properly async await method it's virtually unnecessary, just in my opinion because the mechanism returns that very thread. In instances where you have a bit of hot looping before an await occurs, you would avoid a "lock in" by having called await Task.Yield(); to force background processing immediately. So also you have to understand that Async/Await is allowing the shuffling of threads inside and outside of that Task for the life time of that application. Also, with lower overhead due to SynchronizationContext not in use. The only time a thread is "assigned" is when its not "awaiting" so essentially between await steps, make sense? So creating an Infinitely running task/loop, doesn't mean I stole a thread, its only using a thread when its invoking code, otherwise the ThreadPool gets it back for other things.

I think 77 pipeline consumers, for the most part, sounds ridiculous for a single application... But I then remember we have 40 in a single kestrel app ourselves. The CPU utilization on idle is maybe 0-1%. The problem is curation of active consumers.

The top 3 queues - our ingestion queues - pegged at 100% siphon off all performance from the application (i.e. Threadpool starvation). The CPU usage at this point is 80-95% in one application.

There's nothing to actually fix though. It is a knowable quantity through observation, metrics, and benchmarking etc. I had back and forths with this developer that implemented his design with the library and while it did not alleviate his frustration, I kept explaining its doing what you are telling it to do, if you let everything burst simultaneously, in a highly threaded application, it will shit its pants because we don't have 1,000,000 green threads as goroutines like golang. It's that simple I am afraid.

I was still able to improve the design with some hotfixes and observing how the developer used the application. I made some quality of life improvements -> Global Consumer Options is one of those features. This allowed him to mass tweak the entire performance in high periods of traffic to self-throttle vs.... going back to the drawing board and rethink his "monolithic" one application does everything design - which he still should do.

This software scales up to the hardware, just like RabbitMQ. It won't hold your hand in this regard, which sounds like what you are attempting to do and got a little bit by. Splitting traffic off by horizontal scaling is one way of solving this, partition queues between applications is the best way, because you are dedicated a box to your hottest queues etc.

Now, for performance reasons and working with Golang I had a few ideas I researched as a kind of a version 2 to pipelines. Naturally, I was quiet pleased with Pipelines but did see an inherent weakness in ThreadPool contention on very large apps that couldn't readily be solved without custom TaskSchedulers/ThreadPools - which is why that code never existed too till recently, I don't like that as a solve.

I spent some time in GCP dataflow documentation, tensorflow documentation, and reading OLD .NET documents around Dataflows. I lacked some familiarity to design out a viable example, so I spent a bit learning each and every dataflow block and how it could be molded into something performant. It turns out, raw parallelism is the key to allowing some insulation to Thread Contention, the problem with that is, you need parallel hardware to do it. You won't be able to run 77 Dataflows with degrees of parallelism greater than 1, but they would run happily at 1. So it scaled to the hardware a lot better than Pipelines, I definitely encourage you to consider using the Dataflow RabbitMQ structure/pattern if you really want to melt your CPU.

Eventually - any hyper thread app with no upper limits - will still starve the ThreadPool, which sounds exactly like what you ran into.

Regarding the BufferBlock, while Channels GC has improved in Net5.0, I don't know know exactly what Stephen is showing. Really shouldn't use unbounded anythings in my opinion. It is possible BufferBlock is making a copy. BufferBlock is chained to the next block, in isolation it may not make sense, but in an entire flow its vastly more efficient and necessary for predictable and smooth performance.

tiggerite commented 3 years ago

Thanks for the reply, it gives me lots of food for thought.

One thing to mention is the services with so many consumers are being depreciated/replaced over time so everything is a microservice only having one queue to worry about, on two connections, with a customizable number of consumers each handler. So really I only need something as a stopgap until those services can be fully replaced.

I could even make them single consumer with higher prefetch in the interim which would bring the total down to around 40 handlers (20 per connection). It's probably what I will try next as Dataflow is great but seems to take time to "settle" upon deployment for around 10 minutes (not really sure why) before it is consuming messages in a really timely manner.

For the microservices I love the pipeline model (I actually create many ConsumerPipelines to one Pipeline, to mimic consumer count on ConsumerDataflow and this works really well). I fully appreciate what you are saying about the async/await nature as well, albeit eventually it has to Ack or Nack which of course isn't async (yet) in RabbitMQ.Client and it needs to do some publishes too, which I use QueueMessageAsync for to prevent sync over async in the hot path.

tiggerite commented 3 years ago

Almost forgot. These services run in kubernetes too with a max CPU of <=1.0 (depending on instance size). This causes its own issues (until NET 6 comes around when it can be customised) as this always makes ProcessorCount 1 as well.. not great for parallelism!

It's particularly frustrating as the underlying container is able to "see" 8 processors. I briefly tested with NET 6 using the new DOTNET_PROCESSOR_COUNT variable but it's not really viable in production until they release it in November of course.

tiggerite commented 3 years ago

I have a quick question about Task.Yield by the way - is it needed when processing and acking in the pipeline, like is present in examples/RabbitMQ/ConsumerPipelineMicroservice/Program.cs ? I use it in those places too but not sure if it's doing more harm than good, especially as the steps are shared for Dataflow too.

houseofcat commented 3 years ago

Thanks for the reply, it gives me lots of food for thought.

One thing to mention is the services with so many consumers are being depreciated/replaced over time so everything is a microservice only having one queue to worry about, on two connections, with a customizable number of consumers each handler. So really I only need something as a stopgap until those services can be fully replaced.

I could even make them single consumer with higher prefetch in the interim which would bring the total down to around 40 handlers (20 per connection). It's probably what I will try next as Dataflow is great but seems to take time to "settle" upon deployment for around 10 minutes (not really sure why) before it is consuming messages in a really timely manner.

For the microservices I love the pipeline model (I actually create many ConsumerPipelines to one Pipeline, to mimic consumer count on ConsumerDataflow and this works really well). I fully appreciate what you are saying about the async/await nature as well, albeit eventually it has to Ack or Nack which of course isn't async (yet) in RabbitMQ.Client and it needs to do some publishes too, which I use QueueMessageAsync for to prevent sync over async in the hot path.

I encourage you not to have 20 consumers on 2 connections, you are getting blocked on threads by the Client library -> I could nearly promise that. They have some... interesting lock / socket code in that son of a gun. It may look like its working until you get a huge dump in the queue.

If you are curious looking at the "BlockingCell" code in RabbitMQ's client in Net.

tiggerite commented 3 years ago

I encourage you not to have 20 consumers on 2 connections, you are getting blocked on threads by the Client library -> I could nearly promise that. They have some... interesting lock / socket code in that son of a gun. It may look like its working until you get a huge dump in the queue.

If you are curious looking at the "BlockingCell" code in RabbitMQ's client in Net.

Sorry I probably wasn't clear, I use your excellent ConnectionPool so it's more than 2 connections per rabbit, and each consumer is for a specific queue, using transient channels from your ChannelPool/Host.

I meant there are physically two rabbit instances it connects to for all 20 consumers, each has its own RabbitService, ConnectionPool, ChannelPool etc.

houseofcat commented 3 years ago

I encourage you not to have 20 consumers on 2 connections, you are getting blocked on threads by the Client library -> I could nearly promise that. They have some... interesting lock / socket code in that son of a gun. It may look like its working until you get a huge dump in the queue. If you are curious looking at the "BlockingCell" code in RabbitMQ's client in Net.

Sorry I probably wasn't clear, I use your excellent ConnectionPool so it's more than 2 connections per rabbit, and each consumer is for a specific queue, using transient channels from your ChannelPool/Host.

I meant there are physically two rabbit instances it connects to for all 20 consumers, each has its own RabbitService, ConnectionPool, ChannelPool etc.

That makes more sense to me!

houseofcat commented 3 years ago

A follow up for the Channel/BufferBlock may warrant an improvement. I am a bit surprised but its mostly GC churn - not runtime heap size thank goodness. I guess Stephen was aware of this being an issue previously.

I have been prepping for a V2 Dataflows now that its in a good state. Incorporate lessons learned, try and make it a little easier to use etc. Add an assembly loader, thats going to be pretty slick. Also missing non-queue based ingestions blocks. Might make for a ChannelBlock in lieu of a buffer block.

// * Summary *

BenchmarkDotNet=v0.13.0, OS=Windows 10.0.19042.1110 (20H2/October2020Update)
Intel Core i9-10900KF CPU 3.70GHz, 1 CPU, 20 logical and 10 physical cores
.NET SDK=5.0.302
  [Host]   : .NET 5.0.8 (5.0.821.31504), X64 RyuJIT
  .NET 5.0 : .NET 5.0.8 (5.0.821.31504), X64 RyuJIT

Job=.NET 5.0  Runtime=.NET 5.0
Method x Mean Error StdDev Ratio RatioSD Gen 0 Gen 1 Gen 2 Allocated
Channel_ReadThenWrite 100 6.935 us 0.0264 us 0.0247 us 1.00 0.00 - - - -
BufferBlock_ReadThenWrite 100 184.490 us 2.0835 us 1.9489 us 26.60 0.33 7.0801 - - 73,792 B
BoundedChannel_ReadThenWrite 100 7.741 us 0.0309 us 0.0289 us 1.12 0.00 - - - -
BoundedBufferBlock_ReadThenWrite 100 187.746 us 1.8869 us 1.6727 us 27.07 0.26 8.5449 - - 89,792 B
Channel_ReadThenWrite 1000 70.891 us 0.2761 us 0.2582 us 1.00 0.00 - - - -
BufferBlock_ReadThenWrite 1000 1,786.943 us 27.0370 us 25.2904 us 25.21 0.39 70.3125 - - 736,192 B
BoundedChannel_ReadThenWrite 1000 76.491 us 0.3263 us 0.3052 us 1.08 0.01 - - - -
BoundedBufferBlock_ReadThenWrite 1000 1,863.811 us 10.7330 us 10.0397 us 26.29 0.17 85.9375 - - 896,192 B
Channel_ReadThenWrite 10000 720.339 us 4.3929 us 3.6683 us 1.00 0.00 - - - -
BufferBlock_ReadThenWrite 10000 17,831.783 us 255.4254 us 238.9251 us 24.83 0.32 687.5000 - - 7,360,192 B
BoundedChannel_ReadThenWrite 10000 768.538 us 3.2062 us 2.6773 us 1.07 0.01 - - - -
BoundedBufferBlock_ReadThenWrite 10000 18,411.066 us 156.0664 us 145.9846 us 25.55 0.23 843.7500 - - 8,960,192 B
Channel_ReadThenWrite 100000 6,876.418 us 23.5272 us 19.6463 us 1.00 0.00 - - - -
BufferBlock_ReadThenWrite 100000 175,117.662 us 1,982.8357 us 1,655.7576 us 25.47 0.22 7000.0000 - - 73,600,192 B
BoundedChannel_ReadThenWrite 100000 7,664.456 us 21.2327 us 19.8611 us 1.11 0.01 - - - -
BoundedBufferBlock_ReadThenWrite 100000 182,905.710 us 1,279.4499 us 1,134.1990 us 26.61 0.18 8333.3333 - - 89,600,637 B
Channel_ReadThenWrite 1000000 68,838.648 us 294.2990 us 275.2875 us 1.00 0.00 - - - 1,132 B
BufferBlock_ReadThenWrite 1000000 1,771,063.280 us 12,479.0496 us 11,672.9107 us 25.73 0.22 71000.0000 - - 736,000,192 B
BoundedChannel_ReadThenWrite 1000000 76,467.524 us 314.1487 us 293.8549 us 1.11 0.01 - - - 191 B
BoundedBufferBlock_ReadThenWrite 1000000 1,825,513.067 us 9,725.0610 us 9,096.8280 us 26.52 0.16 86000.0000 - - 896,000,192 B
tiggerite commented 3 years ago

That sounds great! I use https://github.com/natemcmaster/DotNetCorePlugins for loading assemblies currently, which works really well, albeit unloading is nigh on impossible but that's just a "feature" of how assemblies are handled in NET Core/5 I guess. It allows for mismatches in nuget package versions etc which is a lifesaver and loading net standard plus net core assemblies despite targetting NET 5 in the host, and sharing specific types even if the assembly version isn't an exact match.

tiggerite commented 3 years ago

I managed to get ConsumerPipeline to work with the 20-consumer-service (each having 4 consumers per rabbit instance, so 20*8=160) by upping the minimum I/O Completion Threads to that value. I couldn't increase minimum Worker Threads to more than 64, because of memory pressure on the pods we can deploy, but actually those don't seem to change a lot anyway except for allowing a thread per publisher channel.

houseofcat commented 3 years ago

I managed to get ConsumerPipeline to work with the 20-consumer-service (each having 4 consumers per rabbit instance, so 20*8=160) by upping the minimum I/O Completion Threads to that value. I couldn't increase minimum Worker Threads to more than 64, because of memory pressure on the pods we can deploy, but actually those don't seem to change a lot anyway except for allowing a thread per publisher channel.

Feel free to share your experiences or an example should you get the urge.

tiggerite commented 3 years ago

I managed to get ConsumerPipeline to work with the 20-consumer-service (each having 4 consumers per rabbit instance, so 20*8=160) by upping the minimum I/O Completion Threads to that value. I couldn't increase minimum Worker Threads to more than 64, because of memory pressure on the pods we can deploy, but actually those don't seem to change a lot anyway except for allowing a thread per publisher channel.

Feel free to share your experiences or an example should you get the urge.

I would love to share the example and code but I don't think I'm allowed as it's in a private company repo, but basically, I calculate the sum of ConsumerPipelines across all connections and, as long as that is not greater than 256 (may be able to bump this even higher but seemed a good starting point) that is used in a SetMinThreads call to update completionPortThreads, the second argument. If it is greater than 256, then they are all switched to use ConsumerDataflow instead, and completionPortThreads are mostly left alone - bumped up slightly to a minimum of 16 if necessary.

Now it may be that it's a unique use case, as a lot of the time the services are waiting on an external endpoint to return before publishing various messages and then finally acking the original message. There's also a bit of CPU time in there for various reasons. Not the ideal way to use I/O completion ports I know, but out of my control unfortunately.

One other minor implementation detail to mention is I use one pipeline per queue, but several consumers per pipeline, something like

var pipeline = BuildPipeline();
ConsumerPipelines = new List<ConsumerPipeline<WorkState>>(Consumers.Select(consumer => new ConsumerPipeline<WorkState>(consumer, pipeline)));

And then start then along the lines of

ConsumerPipelines.ParallelForEachAsync(consumerPipeline => consumerPipeline.StartAsync(false), Environment.ProcessorCount * 12)

Same kind of pattern for awaiting the completion and stopping them all.

houseofcat commented 3 years ago

That's understandable, do you have throughput numbers?

Consuming/publishing etc.

It took Google an insane amount of hardware to hit 1 million messages a second and I found that bizzare.

https://dzone.com/articles/deploying-rabbitmq-cluster-one

tiggerite commented 3 years ago

It's nowhere near 1 million a second that's for sure :) exact throughput is a tricky one, I think our load test which has some headroom allows for 6 "operations" a second. I don't believe that is a one-to-one ratio to messages but it's probably close. However that is per queue so for the case of this 20 consumer service, I guess you are looking at 120 a second or thereabouts. We do use horizontal scaling (although this is far from perfect in terms of load balancing, unhelpfully) which aims to spread this number and for the 20 consumer ones there are 6 pods. So maybe 20 a second on these?

For publishing, it isn't so crucial for these to be done instantly. We use an equivalent of MaxChannels of 25 per connection so in theory could queue 25 at once (per pod) before the others have to wait. That seems to be adequate so far but might need to be tweaked at some later date; the fact we can use QueueMessageAsync helps as that offloads it from the actual hot path of consuming the messages (and also gives some redundancy as the publisher isn't stopped until everything is finished).

I do wonder though if it may be worth separating out channels from ack channels in the Publisher (maybe MaxChannels and MaxAckChannels in options?), as at the moment you get double for your money, but in my use case I'll never use the non-ack channels because the consumers create transient ones. It's hardly a massive overhead though.

houseofcat commented 3 years ago

6 "operations" a second

Are these operations very slow O.O?

tiggerite commented 3 years ago

6 "operations" a second

Are these operations very slow O.O?

They shouldn't be, no, although another service takes them from one queue and moves them onto another. However, they do have to be acted on very quickly, as there is a timeout of 30s for them to be consumed and the subsequent message to be published, bearing in mind the CPU usage which is necessary to process the messages before sending them off to the external services - some can be really slow especially in peak times so that 30s can quickly be eaten up. There's also a TTL on the queues of 5000ms so if they are sat on the queues any longer than that, they will dead letter.

houseofcat commented 3 years ago

In fact, why don't we close out this Issue/discussion, open a new one as discussion on how to achieve optimal performance.