Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.01k stars 1.85k forks source link

ChildCancelledException when using Rx Observable.asFlow() extension within combine() #2104

Closed mhernand40 closed 3 years ago

mhernand40 commented 4 years ago

I apologize but I do not have a reduced repro for this at the moment. I am encountering a race condition that occurs in an Android app I am working on where we are slowly migrating from RxJava 2 to Coroutines. Essentially we have an upstream MutableStateFlow that we debounce and then combine with another MutableStateFlow and emit a Pair. From there, we apply flatMapLatest which involves a downstream Flow that is a combine against several Flows where some of these Flows are Rx Observables that are converted to Flows using the Observable.asFlow() extension, which btw, is still experimental.

The logic/pseudocode is as follows:

searchQueryFlow // MutableStateFlow
    .debounce(SEARCH_QUERY_DEBOUNCE_IN_MILLIS)
    .combine(isLoadingFlow /* MutableStateFlow */) { query, isLoading ->
        Pair(query, isLoading)
    }
    .flatMapLatest { (searchQuery, isLoading) ->
        combine(
            rxObservable1.asFlow().flowOn(Dispatchers.IO),
            rxObservable2.asFlow().flowOn(Dispatchers.IO),
            anActualFlow,
            rxObservable3.asFlow().flowOn(Dispatchers.IO)
        ) {
            ... // Process and build a list of items
        }.flowOn(Dispatchers.Default)
    }

Here is the stack trace I am seeing:

2020-06-23 18:23:11.885 ********: Undeliverable RxJava exception
    kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
2020-06-23 18:23:11.965 28425-28425/******* E/TimberLogger: error observing match list items provider
    java.lang.NullPointerException: Actually not, but can't throw other exceptions due to RS
        at io.reactivex.Observable.subscribe(Observable.java:12276)
        at io.reactivex.internal.operators.observable.ObservableConcatMap.subscribeActual(ObservableConcatMap.java:55)
        at io.reactivex.Observable.subscribe(Observable.java:12267)
        at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
        at io.reactivex.Observable.subscribe(Observable.java:12267)
        at io.reactivex.internal.operators.observable.ObservableCombineLatest$LatestCoordinator.subscribe(ObservableCombineLatest.java:117)
        at io.reactivex.internal.operators.observable.ObservableCombineLatest.subscribeActual(ObservableCombineLatest.java:71)
        at io.reactivex.Observable.subscribe(Observable.java:12267)
        at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
        at io.reactivex.Observable.subscribe(Observable.java:12267)
        at io.reactivex.internal.operators.observable.ObservableDistinctUntilChanged.subscribeActual(ObservableDistinctUntilChanged.java:35)
        at io.reactivex.Observable.subscribe(Observable.java:12267)
        at io.reactivex.internal.operators.observable.ObservableCombineLatest$LatestCoordinator.subscribe(ObservableCombineLatest.java:117)
        at io.reactivex.internal.operators.observable.ObservableCombineLatest.subscribeActual(ObservableCombineLatest.java:71)
        at io.reactivex.Observable.subscribe(Observable.java:12267)
        at io.reactivex.internal.operators.observable.ObservableDistinctUntilChanged.subscribeActual(ObservableDistinctUntilChanged.java:35)
        at io.reactivex.Observable.subscribe(Observable.java:12267)
        at kotlinx.coroutines.rx2.RxConvertKt$asFlow$1.invokeSuspend(RxConvert.kt:102)
        at kotlinx.coroutines.rx2.RxConvertKt$asFlow$1.invoke(Unknown Source:10)
        at kotlinx.coroutines.flow.ChannelFlowBuilder.collectTo$suspendImpl(Builders.kt:323)
        at kotlinx.coroutines.flow.ChannelFlowBuilder.collectTo(Unknown Source:0)
        at kotlinx.coroutines.flow.CallbackFlowBuilder.collectTo(Builders.kt:336)
        at kotlinx.coroutines.flow.internal.ChannelFlow$collectToFun$1.invokeSuspend(ChannelFlow.kt:53)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:56)
        at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:738)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)
     Caused by: kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
mhernand40 commented 4 years ago

As a mitigation, I've switched from using Observable.asFlow() to Observable.toFlowable(BackPressureStrategy.LATEST).asFlow() which ultimately ends up using the stable Publisher.asFlow() extension instead. I am unable to reproduce this stack trace with this approach.

damianw commented 4 years ago

We ran into this as well... if I understand the problem correctly, it's more general than just ChildCancelledException as a number of internal coroutines exceptions seem to get accidentally propagated to the Observable which ends up in RxJava's global error handler.

I have created a custom asFlow using @mhernand40's suggestion above (although changing to use BackpressureStrategy.BUFFER) and added a test to our codebase something like the following:

import com.example.rx2.asFlow // via Publisher
import kotlinx.coroutines.rx2.asFlow as _asFlow

class ToFlowConversionsTest {

    private val uncaughtRxExceptions = CopyOnWriteArrayList<Throwable>()

    // Purposefully do not customize any schedulers - the tests will use Thread.sleep.
    // I can't seem to reproduce the bug any other way.
    @[JvmField Rule] val rxJavaPluginsRule = RxJavaPluginsRule(
        errorHandler = { uncaughtRxExceptions += it }
    )

    @[JvmField Rule] val coroutineRule = TestCoroutineRule {
        TestProvidedCoroutineScope(context = TestCoroutineExceptionHandler())
    }

    @Test
    fun testThatWorkaroundFixesTheProblem() {
        doTest { asFlow() }
        assertEquals<List<Throwable>>(emptyList(), uncaughtRxExceptions)
    }

    @Test
    fun testThatBuiltInObservableAsFlowIsStillBroken() {
        doTest { _asFlow() }
        val failureMessage = uncaughtRxExceptions.toString()
        // Sometimes there can be more than one!
        assertNotEquals(0, uncaughtRxExceptions.size, failureMessage)
        val expectedPrefix = "kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled"
        for (thrown in uncaughtRxExceptions) {
            assertTrue(thrown.toString().startsWith(expectedPrefix), failureMessage)
        }
    }

    private fun doTest(convert: Observable<Long>.() -> Flow<Long>) {
        val observable = Observable.interval(1L, TimeUnit.MILLISECONDS)
        val flow = observable.convert()
        val items = mutableListOf<Long>()
        val job = flow
            .onEach { items += it }
            .launchIn(coroutineRule)
        Thread.sleep(20L)
        coroutineRule.advanceUntilIdle()
        job.cancel()
    }
}

Although I was initially getting pretty good results from testThatBuiltInObservableAsFlowIsStillBroken, it turned out that it was flakier than I initially thought and ended up disabling it. YMMV if that test passes or fails when run multiple times.

Regardless, it seems like none of these exceptions should be propagating to Rx's global error handling.

mhernand40 commented 4 years ago

Seeing another crash with a slightly different stack trace that seems related:

Fatal Exception: java.lang.NullPointerException: subscribeActual failed
       at io.reactivex.Maybe.subscribe(Maybe.java:4295)
       at io.reactivex.internal.operators.maybe.MaybeToSingle.subscribeActual(MaybeToSingle.java:46)
       at io.reactivex.Single.subscribe(Single.java:3666)
       at io.reactivex.internal.operators.single.SingleToObservable.subscribeActual(SingleToObservable.java:35)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at io.reactivex.internal.operators.observable.ObservableConcatMap$ConcatMapDelayErrorObserver.drain(ObservableConcatMap.java:475)
       at io.reactivex.internal.operators.observable.ObservableConcatMap$ConcatMapDelayErrorObserver.onSubscribe(ObservableConcatMap.java:330)
       at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:31)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at io.reactivex.internal.operators.observable.ObservableConcatMap.subscribeActual(ObservableConcatMap.java:55)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at kotlinx.coroutines.rx2.RxConvertKt$asFlow$1.invokeSuspend(RxConvertKt.java:102)
       at kotlinx.coroutines.rx2.RxConvertKt$asFlow$1.invoke(RxConvertKt.java:10)
       at kotlinx.coroutines.flow.ChannelFlowBuilder.collectTo$suspendImpl(ChannelFlowBuilder.java:328)
       at kotlinx.coroutines.flow.ChannelFlowBuilder.collectTo(ChannelFlowBuilder.java)
       at kotlinx.coroutines.flow.CallbackFlowBuilder.collectTo(CallbackFlowBuilder.java:341)
       at kotlinx.coroutines.flow.internal.ChannelFlow$collectToFun$1.invokeSuspend(ChannelFlow.java:53)
       at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(BaseContinuationImpl.java:33)
       at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.java:56)
       at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.java:571)
       at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.java:738)
       at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.java:678)
       at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.java:665)
Caused by kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled

Unfortunately, given the depth of this stack trace, it is difficult to trace it down to the root source.

mhernand40 commented 4 years ago

Here is another one:

Fatal Exception: java.lang.NullPointerException: Actually not, but can't throw other exceptions due to RS
       at io.reactivex.Observable.subscribe(Observable.java:12293)
       at io.reactivex.internal.operators.observable.ObservableRepeatWhen$RepeatWhenObserver.subscribeNext(ObservableRepeatWhen.java:151)
       at io.reactivex.internal.operators.observable.ObservableRepeatWhen.subscribeActual(ObservableRepeatWhen.java:60)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at io.reactivex.internal.operators.observable.ObservableDistinctUntilChanged.subscribeActual(ObservableDistinctUntilChanged.java:35)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at kotlinx.coroutines.rx2.RxConvertKt$asFlow$1.invokeSuspend(RxConvertKt.java:102)
       at kotlinx.coroutines.rx2.RxConvertKt$asFlow$1.invoke(RxConvertKt.java:10)
       at kotlinx.coroutines.flow.ChannelFlowBuilder.collectTo$suspendImpl(ChannelFlowBuilder.java:328)
       at kotlinx.coroutines.flow.ChannelFlowBuilder.collectTo(ChannelFlowBuilder.java)
       at kotlinx.coroutines.flow.CallbackFlowBuilder.collectTo(CallbackFlowBuilder.java:341)
       at kotlinx.coroutines.flow.internal.ChannelFlow$collectToFun$1.invokeSuspend(ChannelFlow.java:53)
       at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(BaseContinuationImpl.java:33)
       at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.java:56)
       at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.java:571)
       at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.java:738)
       at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.java:678)
       at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.java:665)
Caused by kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled

This concerns me because RxJava 2 is used heavily in our code base and recently we have encouraged developers to prefer Coroutines instead. However the Rx2 <-> Coroutines interop is a must in order for us to move forward with Coroutines for new code.

drinkthestars commented 4 years ago

On a similar trace, turning on kotlinx.coroutines.DEBUG_PROPERTY_NAME yielded some more details:

Caused by: kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
    (Coroutine boundary)
    at kotlinx.coroutines.channels.AbstractSendChannel.offer(AbstractChannel.kt:156)
    at kotlinx.coroutines.channels.ChannelCoroutine.offer(Unknown Source:2)
    at kotlinx.coroutines.channels.ChannelsKt__ChannelsKt.sendBlocking(Channels.kt:21)
    at kotlinx.coroutines.channels.ChannelsKt.sendBlocking(Unknown Source:1)
    at kotlinx.coroutines.rx2.RxConvertKt$asFlow$1$observer$1.onNext(RxConvert.kt:98)

Where AbstractSendChannel.offer(AbstractChannel.kt:156) looks to be:

result is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(result))

This might suggest that the underlying issue is actually about offering to a closed Channel https://github.com/Kotlin/kotlinx.coroutines/issues/974. Looks like it either might already have been or will be fixed in 1.4+.

drinkthestars commented 4 years ago

The documentation in callbackFlow demonstrates a try/catch surrounding a sendBlocking: https://github.com/Kotlin/kotlinx.coroutines/blob/1b34e1c7dd6207d7683c307bae0b934a3dc18d09/kotlinx-coroutines-core/common/src/flow/Builders.kt#L302-L310

Looks like the implementation of ObservableSource.asFlow() violates that: https://github.com/Kotlin/kotlinx.coroutines/blob/1b34e1c7dd6207d7683c307bae0b934a3dc18d09/reactive/kotlinx-coroutines-rx2/src/RxConvert.kt#L78-L90

Within the callbackFlow, the awaitClose that disposes the Disposable only runs after the resulting Flow collection is cancelled. If there is an onNext happening before awaitClose can run, then it will try to call sendBlocking on a closed Channel.

The reason Publisher.asFlow() is stable in this regard is because:

So, this is related to both https://github.com/Kotlin/kotlinx.coroutines/pull/1826, and https://github.com/Kotlin/kotlinx.coroutines/issues/974.