Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.86k stars 1.83k forks source link

```ObservableSource.asFlow()``` blocks the singleThreadContext/Dispatchers.Main on android, if the default buffer is exceeded #3282

Open Motoaleks opened 2 years ago

Motoaleks commented 2 years ago

I found an issue, that the next block of code suddenly blocks Dispatcher. It happens only on scopes with single thread attached (like created with newSingleThreadContext("SingleThread#1") or Dispatchers.Main on android). I understand, that the default buffer is 64 items and it should suspend sending and resume collecting if no back-pressure operator is applied. But why is this does not happen in that case? Seems that runBlocking under the hood of asFlow operator does not resume collect. Is there any way to preserve the context and handle this situation?

case 1: JVM, with newSingleThreadContext

code:

fun main() {
    val context = newSingleThreadContext("SingleThread#1")
    val scope = CoroutineScope(context)

    scope.launch {
        Observable.create<Int> {
            for (i in 0..1000) {
                println("${Thread.currentThread().name}-provide: $i")
                it.onNext(i)
            }
        }
            .asFlow()
            .collect { println("${Thread.currentThread().name}-collect $it") }

        println("finished")
    }

    readLine()
}

result:

SingleThread#1-provide: 0
SingleThread#1-provide: 1
...
SingleThread#1-provide: 64
SingleThread#1-provide: 65

deps:

implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.1-native-mt'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-rx3:1.6.1-native-mt'
implementation 'io.reactivex.rxjava3:rxjava:3.1.4'
implementation 'io.reactivex.rxjava3:rxkotlin:3.0.1'

case2: Android, with Dispatchers.Main

code:

val context = Dispatchers.Main
val scope = CoroutineScope(context)
... // same

deps:

implementation 'io.reactivex.rxjava3:rxkotlin:3.0.1'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.1"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.1"
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-rx3:1.6.1'

result is the same.

dkhalanskyjb commented 2 years ago

tl;dr: try Dispatchers.Main.immediate instead of Dispatchers.Main.

it.onNext is a blocking operation, not a suspend function. If there is no more space left in the buffer, the operation will hang until there is.

Most dispatchers, when asked to resume a coroutine, will not execute it immediately, but will schedule it. This is what happens here: no one is collecting anything, because the only thread in use is busy creating new elements and notifying the dispatcher that someone should process them.

Some dispatchers, like Dispatchers.Unconfined or Dispatchers.Main.immediate, when being notified that a coroutine should be resumed, instead of processing this notification in some elaborate manner, will just run the code in that coroutine until the next suspension point (in case of Dispatchers.Main.immediate, only if the thread in use is already the correct one).

On a separate note, it is incorrect to create newSingleThreadContext and not close it, as this will lead to threads leaking. You should employ the pattern newSingleThreadContext(...).use { context -> ... } instead.

vganin commented 2 years ago

it.onNext is a blocking operation, not a suspend function. If there is no more space left in the buffer, the operation will hang until there is.

Isn't this a common case of missing backpressure? RxJava commonly throws MissingBackpressureException on buffer overflow. Flow has its own instruments to deal with backpressure - suspending by default.

In this case it turns out that backpressure results in deadlock which is not common neither in RxJava nor in Flow hence unexpected for user. Maybe we can throw MissingBackpressureException in asFlow somehow to be more consistent with RxJava model?

Motoaleks commented 2 years ago

Oh, I see an issue, thank you. But this is completely unexpected for collect caller behaviour. In some cases (when the user forget to use immediate/unconfined dispatcher) it leads to dispatcher deadlock with no notification (crash, lint or log) in others it is fine. This makes operator not universal but flow dispatcher based. Also there is no word about this behaviour in method documentation.

It would be much more informative to have an exception like the default Flowable behaviour, or another way to prevent this situations. Even the word about this in method documentation will enhance this issue investigation.

dkhalanskyjb commented 2 years ago

@vganin this suggestion has some pros and cons.

@Motoaleks not sure what a good place for mentioning this issue would be, since this knowledge is just about suspend functions and dispatchers in general and not flow-specific: https://kotlinlang.org/docs/coroutine-context-and-dispatchers.html#unconfined-vs-confined-dispatcher