Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

CancellationException in flatMapMerge/flatMapLatest operators #2942

Closed marcin-adamczewski closed 2 years ago

marcin-adamczewski commented 2 years ago

Hi. I've noticed strange behavior of CancellationException when using flatMapMerge and flatMapLatest.

Let's have a look at a simple example to visualize the problem:

val loginClick = MutableSharedFlow<Unit>()
loginClick
    .flatMapLatest {
        loginWithFacebook()
    }
    .collect()

When the loginWithFacebook() flow cancels then the whole flow cancels too. It's problematic because when a user cancels the first time then he can't log in again. I could catch the CancellationException in the inner flow but I guess it's not the best solution as you have to remember about it each time and adds some boilerplate. I would just expect the CancellationException in the inner flow (inside flatMap) to not propagate up. In RxJava it doesn't work that way. You could cancel (complete) the Observable inside the flatMap and both the upstream and observer would still work fine.

A simple test to reproduce the case:

 @Test
  fun simpleTest() = runBlockingTest {
      val actions = MutableSharedFlow<Int>()

      launch {
          actions
              .flatMapLatest {
                  flow { emit(getNumber(it)) }
              }
              .collect {
                  println("lol onEach $it")
              }
      }

      launch {
          actions.emit(1) // emmited 
          actions.emit(2) // not emitted
          actions.emit(3) // not emitted but should be
      }
  }

  suspend fun getNumber(number: Int): Int {
      return suspendCancellableCoroutine { cont ->
          if (number == 2) {
              cont.cancel()
          } else {
              cont.resume(number)
          }
      }
  }
qwwdfsad commented 2 years ago

It seems like your test has the race in it: all emissions happen prior to actual subscription.

E.g. in this code:

@Test
fun simpleTest() = runBlocking<Unit> {
    val actions = MutableSharedFlow<Int>()

    launch {
        actions
            .flatMapLatest {
                flow { emit(getNumber(it)) }
            }
            .collect {
                println("lol onEach $it")
            }
    }
    yield()

    launch {
        actions.emit(1) // emmited
        actions.emit(2) // not emitted
        actions.emit(3) // not emitted but should be
    }
}

suspend fun getNumber(number: Int): Int {
    return suspendCancellableCoroutine { cont ->
        if (number == 2) {
            cont.cancel()
        } else {
            cont.resumeWith(Result.success(number))
        }
    }
}

everything works as expected. Could you please ensure that you do not have such a bug in your code?

marcin-adamczewski commented 2 years ago

@qwwdfsad Thanks for looking into that. I ran your test and the result is the same: lol onEach1

but IMO should be lol onEach1 lol onEach3

when I catch the CancellationException like this flow { emit(getNumber(it)) }.catch { if (it !is CancellationException) throw it } The result is correct (onEach1 and onEach3)

I use version 1.5.2 of coroutines.

qwwdfsad commented 2 years ago

Thanks!

This is the expected behaviour of Flow -- the cancellation in the collect or in any intermediate operator cancels the flow. If you, as a developer, expect that intermediate operation can be cancelled, but you do not want to cancel the whole flow -- catch operator is the way to go. RxJava is different here, as you cancel a particular instance of subscription, while in coroutines you cancel the whole scope with flow attached to it.

The mental model is quite simple, let's start with the map:

  1. flow.map { if (...) cancel() else ... }.collect { println() }. It's quite expected that if map operator cancels the operation, then the whole flow is done.
  2. Next step is flatMapConcat. By definition, it's just a map(transform).flatten which has the very same properties as a regular flow chain -- cancellation cancels the whole flow, not a single element.
  3. flatMapMerge is a concurrent version of it, so it also has the very same properties.
  4. The last, flatMapLatest is flatMapMerge with conflation.

The keypoint here -- when you have catch operator, it's quite easy to emulate both cancellation "modes" using current API, while the opposite is not true

marcin-adamczewski commented 2 years ago

@qwwdfsad I see your point, however, let me try to convince you to change this behavior. Please have a look at two simple examples:

  1. A flow that downloads songs. We can download many songs at a time, and cancel the download of a specific song. Currently, canceling a single download would cancel the whole flow (all songs).
    
    val downloadSong = MutableSharedFlow<Song>()

downloadSong.flatMapMarge { downloadSongFlow() // cancelling any of download will cancel all of them! }.collect()


2. Handle UI actions in the MVI manner. We handle all UI actions and map them to some state or effect. Now, canceling any of the internal flows will block the whole UI forever as the whole flow would be canceled.

val action = MutableSharedFlow()

action.flatMapMerge { when (it) { LoginAction.EmailChanged -> updateEmail(it), LoginAction.PasswordChanged -> updatePassword(it), LoginAction.LoginClick -> logIn(), LoginAction.FacrbookLoginClick -> logInWithFacebook() // Cancelling here breaks UI } }.collect()



I could come up with many such examples. FlatMapMerge allows creating many flows concurrently. Why would canceling one flow break the others? Is there any argument behind it? In most cases I can think of, it shouldn't work that way. I think we could compare it to some extent to launching a coroutine with SupervisorJob or with a regular Job. With the former, we don't pass cancellation from child to the parent coroutine, right?
So maybe there should be an option to configure the behavior of flatMapMerge?

PS I'd like to also mention that it makes migration from RxJava harder.
qwwdfsad commented 2 years ago

Thanks for the detailed arguments! That indeed makes sense and worth rethinking, I'll come back with additional details a bit later

qwwdfsad commented 2 years ago

Fixed in 1.6.0-RC

MoonRiser commented 1 year ago

it seems like the problem still exists ,it waste me almost whole day off , my coroutine version is 1.6.3 @qwwdfsad