Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

Getting Flow invariant is violated in version above 1.4.2 #2944

Closed buntupana closed 2 years ago

buntupana commented 2 years ago

I'm excecuting this code in Android:

    return flow {
        .
        .
        .

        val flowContext = currentCoroutineContext()
        val loading: Job = coroutineScope {
            launch(flowContext) {
                databaseQuery().map {
                    if (it != null) {
                        Resource.Success<T>(it, true)
                    } else {
                        Resource.Loading()
                    }
                }.collect { withContext(flowContext) { emit(it) } }
            }
        }
       .
       .
       .
    }

And when I use a version of coroutines above 1.4.2 and worker version above 2.5.0 I'll get this exception:

java.lang.IllegalStateException: Flow invariant is violated:
        Flow was collected in [StandaloneCoroutine{Active}@d2397d6, Dispatchers.IO],
        but emission happened in [kotlinx.coroutines.UndispatchedMarker@5ef0a57, UndispatchedCoroutine{Active}@2381944, Dispatchers.IO].
        Please refer to 'flow' documentation or use 'flowOn' instead

Is this a bug or a normal behaviour since the update?

The code will work using these libraries versions:

org.jetbrains.kotlin:kotlin-reflect:1.5.31
org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.5.31
org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.2
org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2
androidx.work:work-runtime-ktx:2.5.0
qwwdfsad commented 2 years ago

This is expected behaviour since the update.

Please refer to 'flow' documentation or use 'flowOn' instead

The section you need is "Context preservation"

buntupana commented 2 years ago

@qwwdfsad I think I'm using the same context val flowContext = currentCoroutineContext() when I emit so I don't know why is throwing the exception.

I would appreciate if you could give me an example how that code could work in the new version of courotines, thanks!

qwwdfsad commented 2 years ago

withContext creates its own scope with its own Job in it.

I would appreciate if you could give me an example how that code could work in the new version of courotines, thanks!

.collect { emit(it) } instead of .collect { withContext(flowContext) { emit(it) } }

buntupana commented 2 years ago

@qwwdfsad I've already tried and throws this error:

java.lang.IllegalStateException: Flow invariant is violated:
        Emission from another coroutine is detected.
        Child of StandaloneCoroutine{Active}@ef4b7c2, expected child of StandaloneCoroutine{Active}@3b89d3.
        FlowCollector is not thread-safe and concurrent emissions are prohibited.
        To mitigate this restriction please use 'channelFlow' builder instead of 'flow'
qwwdfsad commented 2 years ago

Oh, sorry, I misread the snippet.

The simplest option is to use channelFlow:

  return channelFlow {
        .
        .
        .

        val flowContext = currentCoroutineContext()
        val loading: Job = coroutineScope {
            launch(flowContext) {
                databaseQuery().map {
                    if (it != null) {
                        Resource.Success<T>(it, true)
                    } else {
                        Resource.Loading()
                    }
                }.collect { send(it) }
            }
        }
       .
       .
       .
    }