Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.06k stars 1.85k forks source link

Flow.share operator #1261

Closed elizarov closed 4 years ago

elizarov commented 5 years ago

The share() operator operates on Flow<T> and returns Flow<T>. It shall have the following semantics. The resulting flow is cold, but when one collector shart collecting from it, the it starts to collect from the upstream flow, activating the emitter upstream. The trick of the share operator is that when additional collectors appear in the downstream, they all "share" the same upstream emitter.

For example, consider the flow:

val flow = flow { 
    var i = 0
    while(true) { 
        delay(1000) 
        println("Emit $i")
        emit(i++)
    }
}

If you launch two collectors:

launch { flow.collect { println("A: got $it") } }
launch { flow.collect { println("B: got $it") } }

Then you shall see "Emit 0 / A: got 0 / Emit 0 / B: got 0 / Emit 1 / A: got 1 / Emit 1 / B: got 1 / ...".

However, if you change the flow to val flow = flow { /* same */ }.share(), then you shall see "Emit 0 / A: got 0 / B: got 0 / Emit 1 / A: got 1 / B: got 1 / ...", that is one emission gets delivered to both collectors.

Now if you need to artificially "start" the shared flow simply to keep it active, then you can always launch a dummy collector: launch { flow.collect {} } that works as a "reference" which is active until you cancel the resulting job.

TBD: Share operator might need some configuration with the respect to how much "history" to keep in memory for "late collectors". So far it seems that one non-negative integer is enough (with zero -- new collector don't get any history, with one -- only the most recent value, with more -- the specified number of recent values). What is unclear is what shall be the default value (if any).

UPDATE: It will have to be, actually, a shareIn(scope) operator. Otherwise, the scope it works in will be bound the the first collectors and when this callector is cancelled it will not be able to maintain emissions to other collectors.

matejdro commented 4 years ago

I see the purpose here. So we would need support for both explicit and implicit scopes?

streetsofboston commented 4 years ago

@zach-klippenstein Issue https://github.com/Kotlin/kotlinx.coroutines/issues/1086 discusses such a class for explicitly managing the scope: ConnectableFlow.

Instead of the fun connect(CoroutineScope): Connection shown there, though, it just could have fun launchIn(CoroutineScope): Job, like a regular Flow, to manage its lifecycle. For the share() operator, it is then a question of managing that 'shared' CoroutineScope based on reference-counting the number of subscribers.

elizarov commented 4 years ago

Scoping by downstream is not enough. We need the real scope to provide the context for pulling upstream data. We cannot just take the context of the first downstream subscriber. It can get destroyed, while other subscribers should not be suffering. In a case where you don't really care, you can always use shareIn(GlobalScope).

elizarov commented 4 years ago

📣 Asking here a question to the community interested in this issue. There are tons of use-cases listed here and in #1086, but they all revolve around working with never-ending streams of incoming events or state updates. However, a Flow (just like other reactive streams) does not have to be infinite. It can complete. It can produce a number of events and stop. Disregard the question of failures for a while. Let's discuss just a regular completion. Assume that we don't retry either and are not just polling a non-streaming data source. So we have upstreamFlow.shareIn(scope) and the upstream flow completes.

Now, in Rx there is a replay operator that records all the upstream emissions and replays them to each downstream, completing the replay when the upstream had completed. I understand how replay works, but why would anyone ever need it? What are the actual use-cases for replay operator in real-life application? More generally, what are use-cases for having a stream that completes (so it is not an infinite stream of updates) and which you want to share among multiple downstream consumers at the same time?

matthiaswelz commented 4 years ago

What are the actual use-cases for replay operator in real-life application? More generally, what are use-cases for having a stream that completes (so it is not an infinite stream of updates) and which you want to share among multiple downstream consumers at the same time?

For me, having a share/replay operator would allow to implement some sort of if-/else logic with flows, i.e. to distribute elements into multiple flows based on criteria.

For example, I need the first 100 item with foo == "Bar' in flow a and all other items in flow b.

With share/replay:

var flow = //...
flow = flow.shareReplay() //or just share()
val a = flow.filter { it.foo == "Bar" }.take(100)
val b = flow.filter { it.foo != "Bar" }

In this case, I don't want to execute the input Flow multiple times (which could be a file consisting of a couple of GB of data on a network drive etc.). So shareReplay should be able to transparently handle the "take" (i.e. don't block the other flow) and avoid evaluating the input flow multiple times.

matthiaswelz commented 4 years ago

For reference: This is the share implementation I ended up using which seems to fulfill my requirements (in my case parsing a binary file with a dynamic schema and converting to CSV - for this, I need to "split" the flow to "guess" the column headers based on the first X entries, but then need all entries again to actually to the parsing and writing):

(I wasn't able to get rid of the consumerCount parameter, because in my case I collect the first flow before starting the second one)

private val logger = KotlinLogging.logger {}

fun <T> Flow<T>.share(consumerCount: Int, bufferSize: Int = BUFFERED, scope: CoroutineContext? = null): Flow<T> {
    if (this is SharedFlow)
        return this

    logger.debug { "Sharing flow" }

    var broadcastChannel: BroadcastChannel<T>? = null
    var channels: List<ReceiveChannel<T>>? = null
    val lock = ReentrantLock()

    val pendingConsumers = AtomicInteger(consumerCount)
    val index = AtomicInteger(0)

    return SharedFlow(flow {
        lock.withLock {
            if (broadcastChannel == null) {
                logger.debug { "First consumer found. Creating channel and flows" }

                broadcastChannel = buffer(bufferSize).broadcastIn(CoroutineScope(scope ?: coroutineContext))
                channels =  0.until(consumerCount).map { broadcastChannel!!.openSubscription() }.toList()
            }
        }

        val curIndex = index.getAndIncrement()
        val receiveChannel = channels!![curIndex]
        val flow = receiveChannel.consumeAsFlow()
        logger.debug { "Flow $curIndex is now being consumed" }

        try {
            logger.debug { "Starting emitting (pendingConsumers is ${pendingConsumers.get()})" }
            emitAll(flow)
        } finally {
            receiveChannel.cancel()

            if (pendingConsumers.decrementAndGet() == 0)
                broadcastChannel!!.cancel()

            logger.debug { "Finished emitting (pendingConsumers is ${pendingConsumers.get()})" }
        }
    })
}

private class SharedFlow<T>(fl: Flow<T>) : Flow<T> by fl

fun <T, R> Flow<T>.flatMapItems(callback: (T) -> Iterable<R>): Flow<R> = flatMapConcat { callback(it).asFlow() }

inline fun <T> Flow<T>.parallelFilter(concurrencyLevel: Int = DEFAULT_CONCURRENCY, crossinline predicate: suspend (T) -> Boolean): Flow<T> {
    return flatMapMerge(concurrencyLevel) { value ->
        flow { if (predicate(value)) emit(value) }
    }
}

inline fun <T, R> Flow<T>.parallelMap(concurrencyLevel: Int = DEFAULT_CONCURRENCY, crossinline transform: suspend (T) -> R): Flow<R> {
    return flatMapMerge(concurrencyLevel) { value ->
        flow { emit(transform(value)) }
    }
}
zach-klippenstein commented 4 years ago

I work on a fairly large mobile app, and we use replay(1) (ie stateIn) all the time but I don't think I've ever seen an unbounded replay in our codebase.

elizarov commented 4 years ago

@matthiaswelz Thanks for your use-case. Note, that in your use-case you are not actually "widely sharing" the flow. You don't actually publish the shared flow for later consumption by an unknown number of future collectors. On the contrary, here you know, in advance, that there are going to be two downstream flows that are going to process your upstream flow. In this case, it looks like Rx-like replay is a bit of overkill, since it actually caches all the data forever so you'll be running out of memory if the data stream is very long, you are not getting advantage of the streaming nature of data producer here to minimize memory consumption.

I think that your use-case of "splitting the flow in a few other flows" should be covered by a separate operator which we can tentatively call replicate. See this comment with a strawman example: https://github.com/Kotlin/kotlinx.coroutines/issues/1086#issuecomment-585195388

elizarov commented 4 years ago

I work on a fairly large mobile app, and we use replay(1) (ie stateIn) all the time but I don't think I've ever seen an unbounded replay in our codebase.

@zach-klippenstein But do you ever call replay(1) on an upstream flow that is finite and completes at some moment of time?

elizarov commented 4 years ago

There is a design for SharedFlow that provides most of the underlying framework to implement the actual sharing operators. See #2034

elizarov commented 4 years ago

This issue is superseded by the worked-out design of sharing operators. See #2047

SzyQ commented 4 years ago

While it's not release, a good enough worakround is that simple extension:

fun <T> Flow<T>.cache(scope: CoroutineScope, default: T): Flow<T> {
    val stateFlow = MutableStateFlow(default)
    scope.launch {
        collect {
            stateFlow.value = it
        }
    }
    return stateFlow
}