Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

Forced parallel downstream processing #1187

Open circusmagnus opened 5 years ago

circusmagnus commented 5 years ago

It seems quite easy to force parallel downstream processing by accident. One can intuitively use parallel coroutines in flow builder (instead of producer) and suddenly all downstream operators, along with collect, are going to operate in parallel. Without any warning for the caller or the writer of such function. Like that:

fun surpriseConcurrent() = flow {

coroutineScope {
    repeat (10 ) {
        launch {
            delay(500) //some processing
            emit((Int.MIN_VALUE..Int.MAX_VALUE).random())
        }
    }
}

}

If call surpriseConcurrent().collect { ... } I would expect code in the block to execute sequentially, but it will not.

Or perhaps it is not of concern?

fvasco commented 5 years ago

FlowCollector isn't Thread safe, so you cannot invoke emit concurrently.

Are you considered flowViaChannel?

circusmagnus commented 5 years ago

Yes, now I understand, that using channel to serialize emissions is the way to go. But that was not obvious from the get go. Since I can launch{} from inside flow{}, I did just that - to parallelise work and emit results, like I would do from inside producer{} coroutine.

Perhaps an IDE warning, when trying to emit() from inside launch{} might save somebody from unpleasant surprise.

Or perhaps we could have thread-safe flow builder (serializedFlow{}?) - inside it, you could emit from concurrent coroutines, and it would use Channel underneath to serialize emissions properly.

circusmagnus commented 5 years ago

Ok, sorry - I realize, that it is what flowViaChannel is for.

IDE warning might still be in order :)

elizarov commented 5 years ago

It is not clear what would be pattern of code that we shall add warning for? What we can do is somehow detect concurrent emission (in debug mode only maybe?) and throws ISE. That would be "best effort" "fail fast" behavior (similarly to how collection iterators detect concurrent iteration).

circusmagnus commented 5 years ago

Having little knowledge about code pattern matching, could not it be:

launch {
     emit(..)
 }

?

launch{} is (should be) always concurrent vs "something", while flow collector, that you are emitting to, must have come from this external "something", being quite possibly a resource shared between concurrent coroutines.

I know that

flow {
   coroutineScope {
        launch {
            emit(x)
       }
    }
}

is ok, as far as concurrency is concerned, but it also does not make much sense.

flow {
   coroutineScope {
        launch {
            emit(x)
       }
        launch {
            emit(x)
        }
    }
}

is straight dangerous.

circusmagnus commented 5 years ago

Yay, I see, that now we throw an ISE, when emission from another coroutine happens. Another corotuine, as in not from the one, which is a parent of suspend function "collect". So the above example throws an ISE, while

flow {
   coroutineScope {
       val one =  async {
            get(x)
       }
       val two =  async {
            get(x)
        }
    emit(one.await() + two.await())
    }
}

works all right.

Thank you!

I guess we can close the issue now? Or are you planning to have some compile time check for this? Or maybe a syntax disallowing such things to happen?

elizarov commented 5 years ago

We are thinking on how it might be possible to check it a compile-time in the future. No specific solutions in sight right now, though.