badoo / Reaktive

Kotlin multi-platform implementation of Reactive Extensions
Apache License 2.0
1.18k stars 58 forks source link

ConditionLock.signal() doesn't wake up the thread. #769

Closed lemkoleg closed 11 months ago

lemkoleg commented 11 months ago

` class MyCondition {

@OptIn(InternalReaktiveApi::class)
val cond: ConditionLock = ConditionLock()

@OptIn(InternalReaktiveApi::class)
suspend fun cAwait(t: Long): Boolean{
    var isNegative: Boolean = false
    cond.synchronized { isNegative = cond.await(t.toDuration(DurationUnit.MILLISECONDS)).isNegative() }
    return !isNegative
}

@OptIn(InternalReaktiveApi::class)
fun cSignal(){
    cond!!.synchronized { cond.signal() }
}

} `

` suspend fun testMe(){

    val cond: MyCondition = MyCondition()

    println("Condition start")

    val time = DateTime.nowUnixMillisLong()

   if(cond.cAwait(10000)){
      println("Condition Wacke up")
   }else{
    println("Condition destroy")
   }

   println("time: ${DateTime.nowUnixMillisLong() - time}")
}

} `

Condition start Signal start Condition destroy time: 10010

arkivanov commented 11 months ago

Thanks for the report. We have tests for ConditionLock, seems to work fine on native and jvm targets.

What platform are you testing on? Also I don't see any thread switching in the provided code sample. And also cSignal seems like never called.

lemkoleg commented 11 months ago

Apple M1 mac OS Ventura testing for Android and IOS Sorry. I was cleaning the code and accidentally removed CoroutineScope(Dispatchers.Default).launch ` suspend fun testMe(){

val cond: MyCondition = MyCondition()

println("Condition start")

val time = DateTime.nowUnixMillisLong()
CoroutineScope(Dispatchers.Default).launch {
        val time = DateTime.nowUnixMillisLong()
        if(cond.cAwait(10000)){
            println("Condition Wacke up")
        }else{
            println("Condition destroy")
        }

        println("time: ${DateTime.nowUnixMillisLong() - time}")
    }

    CoroutineScope(Dispatchers.Default).launch {
        println("Signal start")
        cond.cSignal()
    }

println("time: ${DateTime.nowUnixMillisLong() - time}") } } `

arkivanov commented 11 months ago

Could you please provide a complete reproducer? I tried to reproduce on macOS using the provided code but couldn't.

class ConditionLockTest {

    @Test
    fun test() = runBlocking {
        testMe()
    }

    suspend fun testMe() {
        val cond: MyCondition = MyCondition()

        println("Condition start")

        val time = TimeSource.Monotonic.markNow()
        CoroutineScope(Dispatchers.Default).launch {
            println("Await")
            val time = TimeSource.Monotonic.markNow()
            if (cond.cAwait(1000)) {
                println("Condition Wacke up")
            } else {
                println("Condition destroy")
            }

            println("time: ${TimeSource.Monotonic.markNow() - time}")
        }

        CoroutineScope(Dispatchers.Default).launch {
            delay(500)
            println("Signal start")
            cond.cSignal()
        }
        println("time: ${TimeSource.Monotonic.markNow() - time}")
        delay(2000)
    }
}

class MyCondition {

    val cond: ConditionLock = ConditionLock()

    suspend fun cAwait(t: Long): Boolean {
        var isNegative: Boolean = false
        cond.synchronized { isNegative = cond.await(t.toDuration(DurationUnit.MILLISECONDS)).isNegative() }
        return !isNegative
    }

    fun cSignal() {
        cond!!.synchronized { cond.signal() }
    }
}

The output:

Condition start
time: 250.916us
Await
Signal start
Condition Wacke up
time: 503.406250ms
lemkoleg commented 11 months ago

I figured out what the problem was. The problem is that in my code, cond.cSignal() is called before cond.cAwait() because the first CoroutineScope(Dispatchers.Default).launch block does some extra work before cond.cAwait() is called. ( val time = DateTime.nowUnixMillisLong()). In your code, you call delay(500), thereby giving the opportunity to call cond.cAwait first. This is probably a problem if cond.cSignal() is called on an unlocked ConditionLock. because the streams will freeze.

arkivanov commented 11 months ago

Thanks for the update.

This is probably a problem if cond.cSignal() is called on an unlocked ConditionLock

I believe this is by design. ConditionLock#signal is basically equivalent to Java Condition#signalAll - it wakes up all currently waiting threads. Usually Condition is used together with checking some condition in a loop.

var sharedFlag = false
val lock = ConditionLock()

fun awaitFlag() {
    lock.synchronized {
        while (!sharedFlag) {
            lock.await()
        }
    }
}

fun setFlag() {
    lock.synchronized { 
        sharedFlag = true
        lock.signal()
    }
}

In this case, awaitFlag will only wait if the condition is false, and will wake up when the flag changes, without race conditions. It worth to note that ConditionLock (as well as Java Condition) automatically releases the synchronization lock after entering await, and acquires it again before returning from await.