Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.06k stars 1.85k forks source link

rx Flowable that use Unconfined dispatcher result in a deadlock when using a runblocking() that execute a suspend function with (Semaphore, mutex or ReentrantLock) #4222

Open cchaouachi opened 1 month ago

cchaouachi commented 1 month ago

Hello,

This code result in a deadlock :

sharedFlow.asFlowable(Unconfined)
            .flatMap {
 runBlocking {
            suspend function with (Mutex , Semaphore or  ReentrantLock) --> This code deadlocks here
        }
}

when we change the Unconfined dispatcher to the Default the problem disappear? is it the intended behavior ? I found a similar issue but with blockingGet

dkhalanskyjb commented 1 month ago

Could you provide a complete reproducer? When I write the following, it works the same for Unconfined and Default:

        val sharedFlow = MutableSharedFlow<Int>()
        val mutex = Mutex()
        GlobalScope.launch {
            delay(100)
            repeat(5) {
                sharedFlow.emit(it)
            }
        }
        sharedFlow.asFlowable(Dispatchers.Unconfined)
            .flatMap<Int> {
                runBlocking {
                    mutex.withLock {
                        flow {
                            emit(it)
                        }.asFlowable()
                    }
                }
            }
            .blockingForEach {
                println("Got $it")
            }

It also works if I move withLock inside the inner flow.