Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

High CPU usate in SharedFlowImpl #4030

Open fvasco opened 7 months ago

fvasco commented 7 months ago

We detected high CPU usage on kotlinx.coroutines.flow.SharedFlowImpl using Java Flight Recorder on a 2 CPU machine. The consumed CPU was two order of magnitude than others, neither other code looks causing this CPU usage.

image

JFR's thread dump:

"DefaultDispatcher-worker-1" #46 [66] daemon prio=5 os_prio=0 cpu=1192711.97ms elapsed=723096.39s tid=0x00007fe79081eb00 nid=66 waiting for monitor entry  [0x00007fe7a9bfe000]
   java.lang.Thread.State: BLOCKED (on object monitor)
    at kotlinx.coroutines.flow.SharedFlowImpl.tryTakeValue(SharedFlow.kt:783)
    - waiting to lock <0x00000007f20bf098> (a kotlinx.coroutines.flow.SharedFlowImpl)
    at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:377)
    at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(SharedFlow.kt)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

"DefaultDispatcher-worker-6" #326 [300] daemon prio=5 os_prio=0 cpu=1192775.65ms elapsed=723053.16s tid=0x00007fe79c32e4d0 nid=300 runnable  [0x00007fe7652e3000]
   java.lang.Thread.State: RUNNABLE
    at kotlinx.coroutines.flow.SharedFlowImpl.updateCollectorIndexLocked$kotlinx_coroutines_core(SharedFlow.kt:774)
    at kotlinx.coroutines.flow.SharedFlowImpl.tryTakeValue(SharedFlow.kt:634)
    - locked <0x00000007f20bf098> (a kotlinx.coroutines.flow.SharedFlowImpl)
    at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:377)
    at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(SharedFlow.kt)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

a bit later:

"DefaultDispatcher-worker-1" #46 [66] daemon prio=5 os_prio=0 cpu=1192715.16ms elapsed=723096.42s tid=0x00007fe79081eb00 nid=66 runnable  [0x00007fe7a9bfd000]
   java.lang.Thread.State: RUNNABLE
    at kotlinx.coroutines.scheduling.CoroutineScheduler.signalCpuWork(CoroutineScheduler.kt:439)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.dispatch(CoroutineScheduler.kt:415)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.dispatch$default(CoroutineScheduler.kt:392)
    at kotlinx.coroutines.scheduling.SchedulerCoroutineDispatcher.dispatch(Dispatcher.kt:112)
    at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:161)
    at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:474)
    at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:590)
    at kotlinx.coroutines.selects.SelectKt.tryResume(Select.kt:842)
    at kotlinx.coroutines.selects.SelectKt.access$tryResume(Select.kt:1)
    at kotlinx.coroutines.selects.SelectImplementation.trySelectInternal(Select.kt:623)
    at kotlinx.coroutines.selects.SelectImplementation.trySelect(Select.kt:600)
    at kotlinx.coroutines.channels.BufferedChannel.tryResumeReceiver(BufferedChannel.kt:634)
    at kotlinx.coroutines.channels.BufferedChannel.updateCellSend(BufferedChannel.kt:458)
    at kotlinx.coroutines.channels.BufferedChannel.access$updateCellSend(BufferedChannel.kt:36)
    at kotlinx.coroutines.channels.BufferedChannel.send$suspendImpl(BufferedChannel.kt:3089)
    at kotlinx.coroutines.channels.BufferedChannel.send(BufferedChannel.kt)
    at kotlinx.coroutines.channels.ChannelCoroutine.send(ChannelCoroutine.kt)
    at kotlinx.coroutines.flow.internal.SendingCollector.emit(SendingCollector.kt:19)
    at com.now4real.server.backend.LsMessageUserListItemBeAdapter$consumeChannel$$inlined$filterIsInstance$1$2.emit(Emitters.kt:223)
    at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:382)
    at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(SharedFlow.kt)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

"DefaultDispatcher-worker-6" #326 [300] daemon prio=5 os_prio=0 cpu=1192787.63ms elapsed=723053.19s tid=0x00007fe79c32e4d0 nid=300 runnable  [0x00007fe7652e3000]
   java.lang.Thread.State: RUNNABLE
    at kotlinx.coroutines.flow.SharedFlowImpl.updateCollectorIndexLocked$kotlinx_coroutines_core(SharedFlow.kt:774)
    at kotlinx.coroutines.flow.SharedFlowImpl.tryTakeValue(SharedFlow.kt:634)
    - locked <0x00000007f20bf098> (a kotlinx.coroutines.flow.SharedFlowImpl)
    at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl(SharedFlow.kt:377)
    at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend(SharedFlow.kt)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
    at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
    at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

Unfornutately I am not able to provide a reproducer, we don't have idea how can cause this issue in our code. Secondary, this version of our server is running from Jan 8, 2024 without issue.

Our code use lesser 1% of CPU (SharedFlowImpl.emit), should we check some API usage that can cause this issue?

fvasco commented 7 months ago

Hi @qwwdfsad I sent you JFR recording privately via email for more details:

$ sha256sum recording.zip 
c2224744906842a45f71dc588481100f74b1bf014fad02bd93d6af2b288a3c69  recording.zip
qwwdfsad commented 7 months ago

Thanks!

I'm on vacation right now, so expect a bit of radio silence from me; I've got what you sent and will return to it later

fvasco commented 7 months ago

Thank you @qwwdfsad I found some other details in another JFR, I sent you for further reference:

$ sha256sum recording-0e09ce990f4cb86c2-regular.zip
e13570bcc543eb8803279089b330120c8b607211dea2dc6c164aeb3f84e9f338  recording-0e09ce990f4cb86c2-regular.zip

This time, the profiler points to the updateCollectorIndexLocked method.

Happy holiday

fvasco commented 7 months ago

I tried to create a reproducer.

The code is:

fun main() {
    runBlocking {
        val consumerCount = 1_000
        val messageCount = 1000

        repeat(50) {
            val mutableStateFlow = MutableSharedFlow<Int>()
            val sharedFlow = mutableStateFlow.asSharedFlow()
            val nanos: Long
            coroutineScope {
                repeat(consumerCount) {
                    launch(start = CoroutineStart.UNDISPATCHED) {
                        val channel = sharedFlow.produceIn(this)
                        repeat(messageCount) { channel.receive() }
                        cancel()
                    }
                }

                delay(1.seconds)
                nanos = System.nanoTime()
                launch(Dispatchers.Default) {
                    repeat(messageCount) { mutableStateFlow.emit(it) }
                }
            }

            val delta = System.nanoTime() - nanos
            println(NumberFormat.getIntegerInstance().format(delta / consumerCount / messageCount))
        }
    }
}

I did some benchmark with different counts

val consumerCount = 1_000
val messageCount = 100
824
794
819
823

val consumerCount = 10_000
val messageCount = 100
11.109
11.145
11.019

val consumerCount = 1_000
val messageCount = 1_000
773
784
758

val consumerCount = 10_000
val messageCount = 1_000
6.010
10.349
10.427
10.556

val consumerCount = 10_000
val messageCount = 10_000
10.623

I attach the JFR reproducer.zip

Maybe a large number of subscribers can cause this behavior, so these measures can be normal. At same time, performance changes greatly depending on subscribers.

fvasco commented 7 months ago

I confirm that our issue was caused by a code similar to my reproducer, we updated our code to reduce subscribers count.

If the above benchmark are OK for you, feel free to close this issue.

qwwdfsad commented 6 months ago

Thanks for the self-contained reproducer and all the profiles, it made my investigation so much easier 🙇

You hit the weakest spot of the SharedFlow collector algorithm -- unfortunately, a single collect scales linearly with the number of collectors existing, which makes it quadratic for any reasonable use-case (each collector scales linearly -> the total CPU burnt is quadratic).

I have a draft idea of how to fix it -- for each unique update (value/index/version) we can fallback to concurrent helping for the linear part (which still might be quadratic if you are unlucky enough and all collectors get OS-scheduled at the same time), but should be much better and eliminate the issue for a single-threaded usages. Yet it requires a proper investigation and thoughtful testing. I'll keep the issue open, as it's clearly a performance bottleneck