Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

Suspending broadcast flow emission #1901

Closed fluidsonic closed 4 years ago

fluidsonic commented 4 years ago

I propose to add a Flow operation that broadcasts upstream values downstream without involving a BroadcastChannel and thus suspending the emitter until the broadcast for a value is complete.

Status quo on suspending emitters

Example output

(1) Emitting two values [1, 2] to one collector looks like this:

1: before emit
1: processing started
1: processing completed
1: after emit
2: before emit
2: processing started
2: processing completed
2: after emit

(2) Emitting two values [1, 2] to multiple collectors through a BroadcastChannel looks like this:

1: before emit
1: after emit
2: before emit
2: after emit
1: processing A started
1: processing B started
1: processing A completed
1: processing B completed
2: processing A started
2: processing B started
2: processing A completed
2: processing B completed

(3) The desired behavior is that two values [1, 2] emitted to multiple collectors looks like this:

1: before emit
1: processing A started
1: processing A completed
1: processing B started
1: processing B completed
1: after emit
2: before emit
2: processing A started
2: processing A completed
2: processing B started
2: processing B completed
2: after emit

Use case

My use case for (3) are events where the event emitter has to wait for all event subscribers to fully process the event.

Example

.broadcast() example for (3)

The following code is very naive. It merely illustrates how the behavior in the example output for (3) can be achieved.

fun <T> Flow<T>.broadcast(): Flow<T> =
    BroadcastFlow(this)

@OptIn(FlowPreview::class)
class BroadcastFlow<T>(
    private val flow: Flow<T>
) : AbstractFlow<T>() {

    private val collectors: MutableList<Pair<FlowCollector<T>, CoroutineContext>> = mutableListOf()
    private var completion: CompletableDeferred<Unit>? = null
    private val mutex = Mutex()

    override suspend fun collectSafely(collector: FlowCollector<T>) {
        mutex.lock()

        val completion = completion

        collectors += collector to coroutineContext

        try {
            if (completion != null) {
                mutex.unlock()

                completion.await()
            } else {
                val completion = CompletableDeferred<Unit>()
                this@BroadcastFlow.completion = completion

                mutex.unlock()

                try {
                    flow.collect { value ->
                        mutex.withLock { collectors.toList() }
                            .forEach { (collector, coroutineContext) ->
                                withContext(coroutineContext) {
                                    collector.emit(value)
                                }
                            }
                    }

                    completion.complete(Unit)
                } catch (e: Throwable) {
                    completion.completeExceptionally(e)

                    throw e
                }
            }
        } finally {
            mutex.withLock {
                collectors.removeIf { it.first === collector }
            }
        }
    }
}

A fully working example which creates all of the output mentioned above can be found here: https://gist.github.com/fluidsonic/f7b2b0084f184932ea3be4cf0074496a

elizarov commented 4 years ago

What is your use-case? What kind of application scenario calls for this kind of behavior?

fluidsonic commented 4 years ago

At the moment I would be beneficial in the sign-in and sign-out processes of an Android app.

  1. An AuthenticationManager performs sign-in/sign-out operations (verify credentials with API, save token, etc.).
  2. When done, it sends respective events (UserSignedIn / UserSignedOut) to subscribers. They now have a chance for set-up work (e.g. fetch & save user object to DB on sign-in) or clean-up work (e.g. wipe all user data in DB, memory & disk on sign-out) before the AuthenticationManager completes the process and gives control back to the UI.
  3. Once everyone had their chance to set things up, the UI can safely transition to logged-in/out state.

If I write that without subscriptions, the AuthenticationManager (or an intermediate component) would have to know all components that need to perform some work synchronously upon sign-in/sign-out.

If I use a synchronous Flow, synchronous event bus or similar then the AuthenticationManager just needs to know that there are some components that need to do some related work and waits for them to finish.

I've heard that a Flow.share() operator is planned that I think covers such a use case already?

elizarov commented 4 years ago

I'm not sure that Flow (which is cold and reactive) is a good fit for this kind of use-case to start with. Have you tried just doing a regular suspending callbacks? E.g. define the property in your AuthenticationManager:

val authProcessors = ArrayList<suspend (AuthEvent) -> Unit>()

Here, I suppose that AuthEvent is a super class for UserSignedIn and UserSignedOut. Now, in your subscribers that should do some required processing of those events, write:

authManager.authProcessors += { event -> ... process event }

In the code of the AuthenticationManager itself, when you have an authentication event and need to wait until every component has processed it, you just write:

authProcessors.forEach { it(event) }

This looks pretty straigtforward and simple to me. Would it work for you?

fluidsonic commented 4 years ago

That for sure would work. But I'd have to build things on top which I have for free in Flow, like unsubscribing when the CoroutineScope is closed for example.

I just wonder why I can use Flow to

but not

It just feels weird that I have to make an exception here and use Flow for all the other cases.

Maybe Flows aren't a good idea for events in general? Or the synchronous single-consumer case isn't something I should rely on?

elizarov commented 4 years ago

Flow for events is a complicated story, indeed. There was a big discussion in #1082 and one outcome for that is that we might introduce some kind of EventFlow that would exactly cover your use-case of flowing to many consumers synchronously.

elizarov commented 4 years ago

Here is a design for SharedFlow that covers this use-case for suspending broadcast flow emission. See #2034

elizarov commented 4 years ago

It looks like it is now safe to close this issue, as its use-cases are fully accounted for by #2034.