Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.01k stars 1.85k forks source link

Question: Should variable in coroutine body be synchronized with AtomicReference or Channel if only one inner coroutine changes it? #1363

Closed gmarchelos closed 5 years ago

gmarchelos commented 5 years ago

Having simple variable in coroutine block and only one producer coroutine changes the state of the variable and another coroutine consumes it, should this variable be synchronized somehow with AtomicReference or Channel to be visible in reader coroutine or reader coroutine automatically fetches new data from the memory?

fun main() = runBlocking<Unit>(Dispatchers.Default) {
    var state: State? = null

    launch {
        data0Flow.collect { newState ->
            state = newState
        }
    }

    launch {
        data1Flow.collect { someData ->
            // Will the state be visible immediately when first coroutine updated it ?
            if (state != null) {
                // do something
            }
        }
    }
}
fvasco commented 5 years ago

should this variable be synchronized somehow

Yes, absolutely.

Local variables are not thread safe.

elizarov commented 5 years ago

I would recommend to use something like combineLatest to combine two flows data0Flow and data1Flow into a single flow that would let you avoid having to use a local variable in the first place.

gmarchelos commented 5 years ago

@fvasco, @elizarov thanks for the replies but I am not sure I get the final answer.

Let me explain my point:

In Shared mutable state and concurrency section from Kotlin documentation you have this example

// This function launches a new counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor state
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

Here, as you can see, we have counter variable without any sync wrappers. This counter variable can be modified by different threads but not concurrently. Just like in my example

launch {
        data0Flow.collect { newState ->
            state = newState
        }
}

And the question: Why don't we wrap counter to AtomicReference ? Is it automatically visible to other threads ? (I mean if this local variable does not have volatile marker, it can be cached by the thread)

I agree that variable must be wrapped to AtomicReference and special method should be used like increment and get.. but in this case I am worrying more about thread cache.

And in my example I have second coroutine that just reads common state and I wonder whether this coroutine will read state variable from the memory or from the thread cache.

launch {
        data1Flow.collect { someData ->
            // Will the state be visible immediately when first coroutine updated it ?
            if (state != null) {
                // do something
            }
        }
}

Does kotlin coroutine read state from the main memory or coroutine (thread that executes coroutine) can cache state ?

Thanks in advance!

fvasco commented 5 years ago

Hi @gmarchelos, a reliable answer depends by two factors: current library implementation and effective target platform. Without consider tricky behaviors subject to change, my personal opinion is to stay conform with official, public documentation.

Why don't we wrap counter to AtomicReference ?

Coroutine cannot run concurrently with itself.

Is it automatically visible to other threads?

It is visible inside the coroutine, indifferently by thread. When counter is published to other thread is unspecified, but when actor resumes than the counter value is acquired.

I mean if this local variable does not have volatile marker, it can be cached by the thread

@Volatile is not applicable on local variable, and yes, it can be cached by the thread, however previous statement remains valid.

Does kotlin coroutine read state from the main memory or coroutine (thread that executes coroutine) can cache state?

The behavior is unspecified, please consider some already present data structures, as @elizarov suggested.

gmarchelos commented 5 years ago

Hi @fvasco and thanks for detailed response! So, looking at my initial example where one coroutine changes state and another reads the state, you are saying that in this case behaviour is unspecified (read coroutine can cache the state and it will not have fresh values) and some data structures must be used. Did I understand that correctly ? But changing state within the same coroutine (launched by launch, async, flow, ...) is safe, correct ?

fvasco commented 5 years ago

read coroutine can cache the state and it will not have fresh values

Yes, on JVM you can get worse than you expect (see JVM Memory Model).

But changing state within the same coroutine (launched by launch, async, flow, ...) is safe, correct ?

Yes, this is my experience.

gmarchelos commented 5 years ago

Understood. Thanks again @fvasco