typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.37k stars 602 forks source link

reactivestreams `StreamSubscriber` blocks (creates thread) each time it is called by upstream #2628

Closed bastewart closed 3 years ago

bastewart commented 3 years ago

I have not had time to prove this via a minimisation but this came from a discussion on discord and @djspiewak thinks this is correct.

StreamSubscriber, which implements reactivestreams Subscriber, uses dispatcher.unsafeRunSync each time it is called by the upstream source. Under cats-effect 3 this leads to a new thread (a HelperThread) being created as unsafeRunSync leads to a scala.concurrent.blocking call.

This means at least 2 new threads for each stream instance created (onSubscribe and onComplete) and potentially n for each element in the stream.

In real-terms this leads to a large number of threads being created in applications which use StreamSubscriber to convert a reactivestream to an fs2.Stream. For example the http4s JDK client will create 2+n threads for each response body it handles.

This was identified on fs2 3.1.2, cats-effect 3.2.8, cats 2.6.1 and Scala 2.13.6.

vasilmkd commented 3 years ago

This is indeed correct and I'm working on resolving this. It is not exactly a bug, since not spawning threads can lead to a deadlock of the compute pool, but maybe it is worth being slightly less pedantic about it. For now, you can revert to cats-effect 3.2.7 which doesn't contain the change.

In terms of a better remedy, Cats Effect 3.3.0 will be the release with a comprehensive overhaul of the blocking support, but maybe I can backport some changes to 3.2.10 for example. I cannot promise anything though.

bastewart commented 3 years ago

Yeah, agree on it not being a bug per-se, it's more a regression (I don't think I can choose the label though!).

Confirming what I said on discord but CE 3.2.7 still shows the same high rate of thread creation, so presumably has been around for longer than #2312.

There's no pressing need for any remedy from my end. It does seem to be something that's in consideration at the moment so happy to wait for CE 3.3.0!

vasilmkd commented 3 years ago

The PR that you linked is not related to Cats Effect 3.2.7 and the PR predates the release. Otoh, CE2 did not show this behavior because there was no effort to respect scala.concurrent.blocking in CE2 at all. If you want to restore that behavior immediately, you can create a FixedThreadPoolExecutor and use it as the compute pool.

https://github.com/typelevel/cats-effect/blob/series/3.x/benchmarks/src/main/scala/cats/effect/benchmarks/WorkStealingBenchmark.scala

That executor is used by CE2.

bastewart commented 3 years ago

Sorry, I linked the wrong PR. I forgot I wasn't in cats-effect so just linked the number, not the repo as well. I should have linked this which was introduced in 3.2.8 but doesn't seem to change behaviour from my end: https://github.com/typelevel/cats-effect/pull/2312

If you want to restore that behavior immediately, you can create a FixedThreadPoolExecutor and use it as the compute pool.

Thanks, but I don't need to restore behaviour at all. I'm more worried about this as I presume this is the cause of a significant performance hit in an application (50%-100% higher CPU use) since updating to CE3. It creates ~600 threads per second which seems like the most likely cause [CE2 was < 10]. I have no need for a short-term fix though (higher CPU is fine, or we can rollback); I'm more concerned about the longer-term issues [that this feels like a regression and could be fixed in fs2 or ce] 👍

FWIW I just tested out 3.2.3 and it shows the same behaviour as well (that's the version I happened to start looking into thsi on, subsequently trying 3.2.8).

bastewart commented 3 years ago

Oh, I suppose I could try that executor to see if it restores the performance to as it was before [as a confirmation of the cause of performance hit]. I'll try that tomorrow actually, thanks!

bastewart commented 3 years ago

@vasilmkd as discussed on discord I had a play around with the code as above, but replacing the compute pool with either the scala global or a Java fixed thread pool:

    val compute = scala.concurrent.ExecutionContext.global

or

    val compute =
      ExecutionContext.fromExecutorService(
        java.util.concurrent.Executors.newFixedThreadPool(Math.max(2, Runtime.getRuntime.availableProcessors()))
      )

Interestingly the global pool did not show excess thread creation. I think you were expecting that it would create lots of threads? Unfortunately it didn't seem to help with the CE3 performance regression (which I've re-tested and actually "only" seems to be 25-50%, not 50-100%). Potentially the threads aren't the cause of that then.

The Java pool unfortunately led to a non-functional application. I wondered if it was because the app runs in kubernetes so set the fixed pool to the exact request CPU (2) and it still didn't work. I didn't have time to dig into why unfortunately!

vasilmkd commented 3 years ago

The Java pool gets blocked because all threads are blocked on unsafeRunSync(). The Scala global pool spawns fewer backup threads to address the blocking which probably improves the performance slightly.

All in all, the reactive streams interop will require changes. That's my opinion.

bastewart commented 3 years ago

The Java pool gets blocked because all threads are blocked on unsafeRunSync(). The Scala global pool spawns fewer backup threads to address the blocking which probably improves the performance slightly.

Ah, that makes sense, thanks.

All in all, the reactive streams interop will require changes. That's my opinion.

@SystemFw wondered if it needed a rethink on discord as well (before you joined the conversation).

Thanks for your help!

notxcain commented 3 years ago

This should be fixed in #2632 on fs2 side.

vasilmkd commented 3 years ago

@notxcain @bastewart Would you like me to release a snapshot version with these changes so that you can test whether this resolves the issues discussed here?

mpilquist commented 3 years ago

All in all, the reactive streams interop will require changes. That's my opinion.

@vasilmkd Can you elaborate on this? The rx-stream interop was changed to not use a dispatcher, but I'm concerned that folks will have this same issue in other places where dispatchers are needed -- like in the output stream interop.

vasilmkd commented 3 years ago

Dispatcher#unsafeRunSync() is implemented in terms of Await.result, which is wrapped in scala.concurrent.blocking. This signals any pool that respects this to spawn new threads in preparation, both the default CE3 pool and EC.global. There's not much we can do here. The solution is to avoid unsafeRunSync() on the compute pool, which is the age old recommendation. It solves both the safety and the bad performance issues.

vasilmkd commented 3 years ago

scala.concurrent.blocking works fine in these cases, it just seems that people think that it somehow has no performance overhead, when it is in fact mainly a safety construct, while spawning threads, context switching and everything it entails are very expensive operations.

bastewart commented 3 years ago

@notxcain @bastewart Would you like me to release a snapshot version with these changes so that you can test whether this resolves the issues discussed here?

If you don't mind that would be really helpful. If it's a pain I can probably build from source myself though. Thanks!

vasilmkd commented 3 years ago

It should be this one "co.fs2" %% "fs2-core" % "3.1-11-d8073c7".

bastewart commented 3 years ago

It should be this one "co.fs2" %% "fs2-core" % "3.1-11-d8073c7".

@vasilmkd really good news! This completely fixes the CPU penalty we were seeing in our application, halving CPU use compared to the current release. Thank you!

In terms of cats-effect 2 to cats-effect 3 comparison CE3 now comes out needing only 75% of the CPU (35% performance uplift) when compared to CE2, so that's pretty nice! With the bug it was (as mentioned previously) 50%-100% worse, hah.

E: Should go without saying it doesn't spawn so many threads anymore as well.

vasilmkd commented 3 years ago

That is awesome news!

vasilmkd commented 3 years ago

Addressed in #2632.

SystemFw commented 3 years ago

The Java pool gets blocked because all threads are blocked on unsafeRunSync(). The Scala global pool spawns fewer backup threads to address the blocking which probably improves the performance slightly.

All in all, the reactive streams interop will require changes. That's my opinion.

First of all, awesome to hear that this is fixed :) Now, I need to look at the reactive spec again, but iirc without unsafeRunSync their test suite fails, since there are cases like cancel ; complete which assert that complete shouldn't execute (or something along those lines), and if cancel isn't synchronous, you can't guarantee that. It's been a few years since I've looked at it though

vasilmkd commented 3 years ago

I assume since the PRs were merged for both 2.5.x and main, they passed the CI. 😅