Closed edubrovski closed 1 year ago
CPU starvation causes problems with scheduling fairness
Indeed! This is essentially the definition of CPU starvation.
An important first step is to understand the definition of starvation in this context. "Starvation" can refer to any scenario in which a task is unable to obtain access to a resource it needs to continue.
https://typelevel.org/cats-effect/docs/core/starvation-and-tuning#starvation
I assume the example is a bit contrived 😅 and highly recommend following the starvation and tuning guide for your deployed applications.
https://typelevel.org/cats-effect/docs/core/starvation-and-tuning
Hope that helps!
@armanbilge thanks for the answer, but this example is not contrived, it emulates an actual app that started to hang when I migrated it from CE2 to CE3. One way to solve the problem was to give the app 4 cpus instead of 2. Another way is change my code to spawn less fibers, but it's a code change and I think it's at least worth mentioning in the migration guide (CE2 to CE3).
I've read the doc and I understand the reasoning for giving more CPUs to the app, but it doesn't explain how an app that worked fine on 2 CPUs suddenly needs 4 after migrating to CE3. I guess it wasn't efficient because it was cpu starving but at least it wasn't hanging.
CPU starvation causes problems with scheduling fairness
Indeed! This is essentially the definition of CPU starvation.
I guess I should've called the issue "CE3 performs 50 times worse than CE2 when thread starving"
I see! You may find this section of the documentation relevant.
Fairness is a form of prioritization. You are prioritizing the responsiveness of your application over its raw performance.
When compute-bound work is so expensive that it cannot be organically prioritized together with other work without affecting the responsiveness of the application (i.e. triggering starvation issues), the only remaining option is to prevent that work from consuming all available compute resources. This can be accomplished in several ways, but the two most common strategies are
Semaphore
andQueue
.
https://typelevel.org/cats-effect/docs/core/starvation-and-tuning#restrict-work
Do I understand correctly that prioritization in CE2 and CE3 is different? So I guess CE3 prioritizes responsiveness/fairness over raw performance/throughput?
Do I understand correctly that prioritization in CE2 and CE3 is different? So I guess CE3 prioritizes responsiveness/fairness over raw performance/throughput?
I personally don't know much about CE2 internals. But based on the comparison in the docs, I believe your statement is a good summary. Edit: although to be clear, overall performance/throughput is absolutely a priority for CE3, but it is implemented in a way that does not compromise on fairness.
Specifically it says:
CE3 also has a very exciting custom work-stealing threadpool implementation. This has numerous benefits over the FixedThreadpool used in CE2:
- ...
- Consequently we can support auto-yielding where a fiber will insert an
IO.cede
every fixed number of iterations of the runloop
This "auto-yielding" feature is specifically designed to improve fairness.
https://typelevel.org/cats-effect/docs/thread-model#work-stealing-pool
I'll dig into this more when I get a moment. At first glance it certainly doesn't look like this should be hitting anything truly pathological aside from a heavy reliance on the external queue. I suspect the integrated timers branch would make this snippet run hilariously fast, but then it might not accurately reproduce the issue in the original application.
Thank you for looking into this!
For the context: the actual app processes 2 Kafka topics and writes the results into another Kafka topic. It processes messages in batches. Under heavy load (lots of messages in a kafka topic) it's acceptable for the app to lag in message processing for several minutes, but it shouldn't take forever: if a message gets processed 30 minutes after it gets ingested to the kafka topic, then it's too much.
So I guess in terms of prioritization/fairness this app is the opposite of web server. If we have a web server we don't want one heavy request affecting all other users too much. In our app it's the opposite: if we get a heavy batch from one topic we want it to complete, we don't care if processing of another topic slows down.
I'd love to get some understanding of the difference in scheduling between CE2 and CE3. I don't wanna study the internals of scheduling in CE2, so a doc describing the difference would be very helpful.
So I spent some time thinking about this. I'm pretty sure it's illustrating a slowness in the external queue itself. That would be the same as what the original Kafka reproducer would be doing. The arrival of #3219 will render this particular reproduction entirely obsolete (since IO.sleep
will bypass the external queue in 3.5), but I think the underlying problem remains. I'm pretty sure I can capture this in a benchmark and we can go from there.
One additional bit of information which would help, @edubrovski, is how many physical threads (i.e. CPUs, multiplied by two if hyperthreading) are used in this reproduction? My strong hunch is that the difference is a lot less noticeable with higher thread counts, and it's likely something that is showing up with smaller instances. If true, this still suggests that there's a performance issue in the external queue (relative to ConcurrentLinkedQueue
, which sits inside of the fixed thread pool executor in CE2), but the contention overhead would eventually dominate.
I'd love to get some understanding of the difference in scheduling between CE2 and CE3. I don't wanna study the internals of scheduling in CE2, so a doc describing the difference would be very helpful.
There are a lot of differences. :-) I think the most concise description is probably the following:
async
always shifts back to the compute pool, and cede
s are automatically inserted when a fiber hasn't yielded for 1024 stages. (terminology note: each contiguous unyielding batch of stages will be called a task from here on)ThreadPoolExecutor
with a fixed number of threads. This is a very naive disruptor-style scheduler with a single concurrent work queue that every worker thread would contend on. Tasks are inserted into the queue and observed in strictly FIFO order. Any thread which is not executing a task blocks on that queue and they race to dequeue any available tasks. This creates a quadratic contention pattern as the number of workers increases, as well as some other very negative effect, such as an effectively randomized mapping from fiber to carrier thread (defeating cache optimizations).WorkStealingThreadPool
(the comments are very good). This pool is based on Tokio's scheduler, which in turn is based on Go's, though all three have evolved since the respective fork points.
start
s a new fiber, that fiber goes onto the local queue for its worker, meaning that related fibers tend to cluster together on a single carrier thread, which in turn means that they have a high probability of sharing working sets within the L2 and L3 cache of the underlying CPU. (note that, in a 1:1 configuration of active Thread
s and physical threads, kernel-level scheduling algorithms will converge to a perfect 1:1 mapping across the underlying hardware, making those compute Thread
s an excellent proxy for the physical processors) This same optimization also applies to async
completions from worker threads, meaning that abstractions like Deferred
and Queue
end up keeping tasks within the local queues.start
or async
), then the external queue is never used and the worker almost never interfere with each other.start
s very quickly, such as in a parTraverse
type situation), the back half of the local queue is spilled to the external queue by splitting it into multiple task batches and enqueuing them individually. In turn, whenever a worker dequeues from the external queue, if it receives a batch it must inflate that into its own local queue. This can in turn cause cascading spillage, but the sizing is such that this converges immediately.There's more details here, and I would highly recommend reading the comments in WSTP and WorkerThread
if you want to understand more deeply. The intuition though is that CE3 doesn't guarantee FIFO priority on tasks. It usually will get something very close to FIFO, but any form of work stealing breaks that type of guarantee, and WSTP takes this even further with the spilling trick as well as the delays on reading from the external queue.
Your example flows entirely through the external queue, because (prior to 3.5), all sleep
s are implemented as an async
which enqueues externally, similar to how any asynchronous I/O callbacks would work. When all the work is on the external queue, the worker threads will end up checking that queue after every task, but there is a certain amount of overhead which is imposed by the fact that they will first attempt to (unsuccessfully) steal from each other before jumping on the external bandwagon. This will happen with each task iteration, due to how this program works, since all work (except the parTraverse
itself) is external. So that alone is already slower than the ConcurrentLinkedQueue
inside of ThreadPoolExecutor
. Once the threads attempt to hit the external queue, my expectation is that it should be just as fast as CLQ (which is to say, very slow), but my guess is that we have a performance issue here which is hidden by the fact that we already try very hard to avoid touching the external queue.
Hello @djspiewak
Thanks for looking into this.
The case we have at Evolution: it was instance configured with 2 cpus, using defaults from IOApp
and no blocking calls. It is pretty standard variance of tiny apps used for stateful stream processing. We have a lot of apps like that here.
It was enough to have 2 cpus. However we had to double to 4 cpus after ce2
-> ce3
migration in order to suppress this problem.
Would you be able to explain a bit more details about the origin of this problem? is it related to the thread pool being used?
cats-effect version: 3.4.4
@t3hnar Sorry I ninja-edited my previous comment and added a ton of details. That might help. I think what we're seeing here is the following three effects:
In a worst case scenario, you can always override the default scheduler inside of CE3 and replace it with a fixed thread pool. This can be done by overriding the runtime
method of IOApp
and creating a custom IORuntime
which uses something like Executors.fixedThreadPool
. This would basically make CE3 behave almost identically to CE2.
@djspiewak thanks for explanation
@djspiewak does it make sense to have some recommendation based on number of cpus which thread pool to use? something like
if `(cpus <= 4) FixedThreadPool else WorkStealingThreadPool`
?
another question: should parTraverse
and alike be re-implemented with ThreadPool implementation in mind?
does it make sense to have some recommendation based on number of cpus which thread pool to use?
Definitely not. :-) I think I may have miscommunicated. While the number of CPUs is a problem and you should absolutely go taller here, WSTP is faster than the fixed thread pool regardless of the number of CPUs. The thing that's making this workflow slower is primarily the fact that your specific workflow, combined with how small your containers are, violates the WSTP optimization assumptions, which makes the fixed pool faster.
another question: should parTraverse and alike be re-implemented with ThreadPool implementation in mind?
Well, this kind of goes to the optimization assumptions. There are no possible implementations of parTraverse
which are better for fixed thread pool. The fixed thread pool is just… really bad for those cases. The bottleneck in your reproducer is not parTraverse
(which WSTP is almost certainly shredding) but rather the sleep
s.
@djspiewak I don't think that we have sleep
s in original app, rather than pure
computations inside of parTraverse
@t3hnar Do you have pure CPU-bound computations or are you pulling things from Kafka? If this is a CPU-bound situation then the reproducer isn't really reproducing the symptom. :-(
I've confirmed that the WSTP external queue is actually faster (significantly so) than ConcurrentLinkedQueue
, so the source of the performance issues in the OP reproduction is almost certainly the failed stealing.
[info] Benchmark (size) (threads) Mode Cnt Score Error Units
[info] ScalQueueBenchmark.clqConcurrentEnqueueDequeue 131072 4 thrpt 20 5795.490 ± 11.456 ops/min
[info] ScalQueueBenchmark.scalConcurrentEnqueueDequeue 131072 4 thrpt 20 9107.226 ± 131.260 ops/min
However, it sounds like the OP snippet doesn't actually reproduce the original issue, since the OP snippet is bound by timer execution, while the original issue sounds like the computation is CPU-bound?
@djspiewak we will come back to you on this. It took some time to deduce complex app to small code snippet :)
I definitely get it! :-) I think what I would encourage is trying to keep the ratio of asynchronous stuff vs CPU-bound stuff the same in the reproducer as it is in the original. So if you're doing 100ms of CPU-bound work, rather than replacing that with a IO.sleep(100.millis)
, try to do a hot while
loop or something checking System.nanoTime
. That type of thing.
@djspiewak thanks for the thorough explanation!
One additional bit of information which would help, @edubrovski, is how many physical threads (i.e. CPUs, multiplied by two if hyperthreading) are used in this reproduction?
To be honest I didn't even limit the number of cpus, I ran it on my machine with M1 processor, so I guess 8 cpus / threads?
However, it sounds like the OP snippet doesn't actually reproduce the original issue, since the OP snippet is bound by timer execution, while the original issue sounds like the computation is CPU-bound?
Indeed, the snippet that I posted doesn't reproduce the actual problem we have with kafka-flow when we run it on CE3. I came up with it just to confirm that there exist pathological cases when CE3 performs "worse" than CE2. Right now we're trying to come up with a better snippet that would reproduce the actual problem and at the same time be as minimalistic as possible. We'll get back to you when we have it.
@djspiewak , please see https://github.com/edubrovski/reproducing-kafka-flow-bug/ It uses OSS libraries and fake Kafka consumer, so you can build and run it locally without Kafka.
This code is not just slow, it completely hangs with CE3. At the same time it works fine and never hangs with CE2. The details are in the README.
It's probably a lot of code, but minimizing this reproduction is hard.
@edubrovski I see you are using -Dcats.effect.tracing.mode=full
, is this just for testing/debug? Full tracing adds a massive performance hit.
@armanbilge , yes, this is for full fiber dumps.
tracing.mode=full
, it will also hang at some point. If you change it to cached
, you also need to change val uniqueKeys = 65000
to something like 250000 to reproduce cpu starvation.Fiber dump from a stuck run looks interesting:
cats.effect.IOFiber@8732f37 WAITING
cats.effect.IOFiber@11ac1ba1 WAITING
cats.effect.IOFiber@78126f1d WAITING
├ flatMap @ example.Test$.$anonfun$run$17(Test.scala:132)
├ background @ example.Test$.$anonfun$run$16(Test.scala:131)
├ background @ example.Test$.$anonfun$run$16(Test.scala:131)
├ background @ example.Test$.printStatsInBackround(Test.scala:146)
╰ background @ example.Test$.printStatsInBackround(Test.scala:146)
cats.effect.IOFiber@8e29f17 WAITING
├ parTraverse$extension @ com.evolutiongaming.kafka.flow.TopicFlow$$anon$1.$anonfun$apply$3(TopicFlow.scala:91)
├ parTraverse$extension @ com.evolutiongaming.kafka.flow.TopicFlow$$anon$1.$anonfun$apply$3(TopicFlow.scala:91)
├ parTraverse$extension @ com.evolutiongaming.kafka.flow.TopicFlow$$anon$1.$anonfun$apply$3(TopicFlow.scala:91)
├ flatMap @ com.evolutiongaming.kafka.flow.TopicFlow$$anon$1.$anonfun$apply$3(TopicFlow.scala:91)
├ delay @ com.evolutiongaming.catshelper.Log$$anon$1.debug(Log.scala:102)
├ flatMap @ com.evolutiongaming.kafka.flow.TopicFlow$$anon$1.$anonfun$apply$1(TopicFlow.scala:88)
├ get @ com.evolutiongaming.scache.LoadingCache$EntryRefOps$.value$extension(LoadingCache.scala:574)
├ map @ com.evolutiongaming.scache.LoadingCache$EntryRefOps$.value$extension(LoadingCache.scala:575)
├ map @ com.evolutiongaming.scache.LoadingCache$$anon$1.$anonfun$values$3(LoadingCache.scala:370)
├ get @ com.evolutiongaming.scache.LoadingCache$EntryRefOps$.value$extension(LoadingCache.scala:574)
├ map @ com.evolutiongaming.scache.LoadingCache$EntryRefOps$.value$extension(LoadingCache.scala:575)
├ map @ com.evolutiongaming.scache.LoadingCache$$anon$1.$anonfun$values$3(LoadingCache.scala:370)
├ flatMap @ com.evolutiongaming.scache.LoadingCache$$anon$1.$anonfun$values$2(LoadingCache.scala:367)
├ flatMap @ com.evolutiongaming.scache.LoadingCache$$anon$1.$anonfun$values$2(LoadingCache.scala:367)
├ get @ com.evolutiongaming.scache.LoadingCache$$anon$1.values(LoadingCache.scala:358)
╰ flatMap @ com.evolutiongaming.scache.LoadingCache$$anon$1.values(LoadingCache.scala:360)
cats.effect.IOFiber@72c1fc4a WAITING
cats.effect.IOFiber@635a7fb2 WAITING
cats.effect.IOFiber@19c002e8 WAITING
cats.effect.IOFiber@6c729672 WAITING
cats.effect.IOFiber@5675843 ACTIVE
├ >> @ example.Test$.printStatsInBackround(Test.scala:146)
├ foreverM @ example.Test$.printStatsInBackround(Test.scala:146)
├ println @ example.Test$.$anonfun$printStatsInBackround$1(Test.scala:146)
├ >> @ example.Test$.printStatsInBackround(Test.scala:146)
├ foreverM @ example.Test$.printStatsInBackround(Test.scala:146)
├ println @ example.Test$.$anonfun$printStatsInBackround$1(Test.scala:146)
├ >> @ example.Test$.printStatsInBackround(Test.scala:146)
├ foreverM @ example.Test$.printStatsInBackround(Test.scala:146)
├ println @ example.Test$.$anonfun$printStatsInBackround$1(Test.scala:146)
├ >> @ example.Test$.printStatsInBackround(Test.scala:146)
├ foreverM @ example.Test$.printStatsInBackround(Test.scala:146)
╰ println @ example.Test$.$anonfun$printStatsInBackround$1(Test.scala:146)
As you can see we have very few fibers. I don't understand why we don't have stacktraces for 6 WAITING fibers, but I doubt that they're deadlocked, because we investigated the possibility of a deadlock very thoroughly and added debug logs to almost every line.
Thank you for all the work in narrowing down a reproducer! I'll have a look this weekend.
Okay already found one source of problems. Potentially the problem. After running this for just a few minutes, the number of io-blocker
threads was already over 600. That's uh… a lot.
Dug deeper with some help from @vasilmkd…
For starters, the amount of blocking going on here is a pretty serious problem. This is going to create performance issues on any system and with any version of Cats Effect. If this is genuinely what your production system is doing, then I would strongly recommend gating the blocking with a Semaphore
which limits it to a much smaller amount of resources. Probably something around 2-3x the number of CPUs at most. parTraverseN
is a convenient way of doing this.
Ironically, Cats Effect 2 had fewer problems here because it's slower. Due to the lower performance of IO
in CE2 and the extreme differences in how parTraverse
(and really everything) is implemented, CE2 isn't able to put as much pressure on the blocking pool, which in turn means that it stabilizes its cached thread count at a lower level (which in turn means that the underlying kernel has less contention to deal with).
The thread pools do stabilize eventually though. On my system, with the integrated blocking in the WSTP, I stabilized around 675-700 threads. Vasil was testing on older hardware and it stabilized at a lower thread count.
I think none of this is related to the fact that the program hangs nondeterministically.
The hang appears to be a classic instance of livelock when asynchronous callbacks are simply never invoked. There are a number of different ways to track this down, but unfortunately it appears to be quite a rare issue. I haven't been able to reproduce it on my hardware, so I spun up a few throwaway EC2 instances to attempt in a more controlled environment. Vasil was able to reproduce the hang once and we're attempting to harvest some telemetry.
In summary, I think what is happening here is a couple things:
IO
has a very rare bug here which may be causing it to lose continuations. Still very much in the early stages of narrowing this down, but all the signs point in that direction. This bug is almost certainly entirely independent of all these other factors.Thanks for looking into this! The reproduction is big and the code is pretty bad, so I really appreciate you guys spending time on this.
I haven't been able to reproduce it on my hardware
In case you still want to reproduce it, you might want to tweak uniqueKeys
variable and set it to something bigger, like 100000 for example. Also it can take multiple runs to reproduce.
number of io-blocker threads was already over 600
Wow, I didn't notice that. I've made a thread dump of a running app and I see that indeed we have too many blocking threads. In CE2 version we were using a fixed thread pool as a custom blocker, and when I was migrating to CE3 I decided to use the default blocker, which is a cached thread pool. I guess the most straightforward fix would be to build a custom runtime with IORuntime.builder.setBlocking(my fixed thread pool)
. Is there any benefit in using a semaphore or parTraverseN
instead of using a custom blocker?
Is there any benefit in using a semaphore or
parTraverseN
instead of using a custom blocker?
Yes, the work-stealing threadpool has various optimizations for running blocking ops, that will have better performance when compared to a separate blocking pool.
See: https://typelevel.org/cats-effect/docs/faq#why-is-my-io-running-on-a-blocking-thread
Edit: in fact, even if you override the blocking pool, it may still use the WSTP for blocking anyway 🤔
Wow, I didn't notice that. I've made a thread dump of a running app and I see that indeed we have too many blocking threads. In CE2 version we were using a fixed thread pool as a custom blocker, and when I was migrating to CE3 I decided to use the default blocker, which is a cached thread pool. I guess the most straightforward fix would be to build a custom runtime with IORuntime.builder.setBlocking(my fixed thread pool). Is there any benefit in using a semaphore or parTraverseN instead of using a custom blocker?
A custom blocker ends up queueing up all the tasks within an unbounded(!) internal work queue. The Semaphore
(or equivalently, parTraverseN
) puts a bound on that and reflects the backpressure up into the semantic fiber layer. This is helpful because it will then slow down your Kafka consumption, which in turn leaves batches on the queue where other instances can pick them up (rather than slurping batches fully into memory where they just sit on the queue).
As for the blocking pool itself though, definitely relying on the default strategy is much better in CE3 rather than rolling your own. Also it's important to question whether this is truly blocking work or if it's just compute-bound and slow. If it's truly blocking, then a fixed thread pool defeats the purpose, since you're trying to shunt to avoid starvation. If it's just compute bound and not actually hard-blocking threads, then leaving it on the compute pool is actually better since allocating new threads doesn't magic new CPUs into existence, and you're really bounded by that physical resource availability. This is part of why the trick to really solve this is to make sure the backpressure reflects all the way through the Kafka consumer, leaving batches on the topic and ideally triggering scaling out your consumer cluster (which is to say, buying yourself more CPUs).
I'm facing similar symptoms in my ZIO application (it hangs randomly under heavy concurrent pressure with clean thread dumps) so I took looked into this case out of curiosity.
And I have 2 concerns/hypotheses about why it could hang:
cached.indexTransfer.offer(idx)
which theoretically could fail to offer and return false, which is not checked. I'm not sure if it is a bug, or if it is guaranteed never to happen. But a quick look at how this single-item queue is offered and taken reveals that there is a possibility that 2 simultaneous workers will try to offer themselves and only one succeeds. But maybe I missed something.@JustSokol Thanks for looking into this! Some quick thoughts:
you have cached.indexTransfer.offer(idx) which theoretically could fail to offer and return false
This is subtle! Take a look a few lines above. cached
itself comes from pool.cachedThreads.pollFirst()
, which gets an exclusive handle on the thread. This in turn means that you can never be in a situation where two workers simultaneously offer their index to one cached thread, because we already made that access exclusive. In turn, the only reason the indexTransfer
mechanism itself is a queue in the first place is because Java doesn't have MVar
. :-) We basically need the ability to hard-block on the non-transfer of the index as a way of parking the cached thread, which we do up in run()
.
the global runtime have blocking executor, which based on the ThreadPoolExecutor, which could fail to execute with RejectedExecutionException
Confusingly, that particular executor is only used as a fallback if the WorkStealingThreadPool
returns false
from canExecuteBlockingCode
. This conditional is a bit tangled and I want to revise some of this, but in general it's really just a fallback. I was initially confused by this as well and thought we were generating a bunch of threads on this pool, but there's a naming difference (io-compute-blocker
vs io-blocker
) which proves that we are indeed using the blocking mechanism on WSTP.
As for the exceptions… that's honestly a thing that I do worry about. Lost exceptions are super common in this type of problem, so I wouldn't be surprised if this ultimately turns out to be something a bit like that. We definitely don't see anything of this sort printed, but that doesn't mean it isn't happening.
Possibly related (or maybe not): #3444.
For posterity, #3444 is indeed related to this issue.
0001-How-to-lose-a-fiber-after-blocking.patch
This patch demonstrates that the added println (when a cached worker thread expires after 60 seconds of not being picked up) can indeed refer to a cedeBypass
which is not null, thus losing a fiber in the process. Running the reproducer in this issue with this patch included causes something like this to be printed to the console.
I JUST LOST A FIBER cats.effect.IOFiber@644fa7a2 RUNNING: get @ example.DebugTopicFlow$$anon$2.$anonfun$add$3(DebugTopicFlow.scala:156) SCREAM!!!!
The whole issue can probably be more reliably reproduced if the default expiration time of 60 seconds is reduced. As far as I know, this can only be done in code, not in configuration. I also had more success in reproducing it by running several instances of the program in parallel (as separate processes).
I ran three tests overnight on EC2:
The negative test and @vasilmkd's PR both reproduced the issue. @durban's PR did not. In other words, I believe we have fixed this.
I just want to pause and say I'm absolutely floored at the work here. Amazing job, all around. The reproduction was incredibly helpful, and then all of the work to track this down and find the origination point from Vasil and Daniel (other Daniel 😃)… Marvelously done.
How to reproduce: run this code on CE2 and CE3 multiple times and measure how much it takes to complete:
On my machine it takes 20-30 seconds to complete with CE2 and 5 minutes with CE3.