Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.06k stars 1.85k forks source link

Consider sharing a Flow through a ConnectableFlow #1086

Closed streetsofboston closed 4 years ago

streetsofboston commented 5 years ago

Enhancement: Add ConnectableFlow to the Flow API.

Each time an observer of a Flow starts collecting, the source of the Flow is executed, much like a call to subscribe of a Flowable in RxJava executes the Flowable's source.

This change is to defer the execution of the source of the Flow until a specific point in time, possibly after one or more observers started collecting the 'shared' Flow.

The use-case for deferring the execution of the source of a Flow is for (cold) Flows whose data-source is a resource that should not be started/created or stopped/destroyed by each and every call to collect and should be explicitly managed by a call to a function (connect, for example) instead. It differs from using broadcastIn by the fact that publish will return a Flow, not a BroadcastChannel.

E.g.

val dataFlow = flowViaChannel<MyData> { channel ->
    val resource = getResource(channel)
    channel.invokeOnClose {
        resource.close()
    }
    resource.startReceivingData()
}

val sharedFlow = dataFlow.publish()
...
...

val observer1 = launch {
    sharedFlow.collect { ... }
}
...
...
val observer2 = launch {
    sharedFlow.collect { ... }
}
...
// Start the flow right now.
val connection = sharedFlow.connect(scope)
...
...
...
// Cancel the flow here.
// Note that when 'scope' is cancelled, this 'connection' would be canceled as well.
connection.close()
...

I propose creating these new classes and extension functions or something similar (they are modeled after RxJava ConnectableObserver):

/**
 * A [Flow] of type [T] that only starts emitting value after its [connect] method is called.
 * 
 * If this flow's [Connection] is still connected, the current [Connection] will be returned when 
 * [connect] is called and the flow will not be restarted. 
 *
 * When its [collect] method is called, this flow will not immediately start collecting. Only after
 * [connect] is called, the emission and actual collecting of values starts.
 */
interface ConnectableFlow<out T> : Flow<T> {
    /**
     * Connects this shared [Flow] to start emitting values.
     *
     * @param scope The [CoroutineScope] in which the emissions will take place.
     * @return The [Connection] that can be closed to stop this shared [Flow].
     */
    fun connect(scope: CoroutineScope): Connection
}

and

/**
 * A connection returned by a call to [ConnectableFlow.connect].
 */
interface Connection {
    /**
     * Returns true if this connection is connected and active.
     */
    suspend fun isConnected(): Boolean

    /**
     * Closes this connection in an orderly fashion.
     */
    suspend fun close()
}

/**
 * Publishes and shares an upstream [Flow] of type [T] and returns a [ConnectableFlow] of type [T].
 *
 * The upstream [Flow] begins emissions only after the [ConnectableFlow.connect] has been called.
 *
 * @return The [ConnectableFlow] that represents the shared [Flow] of this receiver.
 */
fun <T> Flow<T>.publish(): ConnectableFlow<T> 

/**
 * Creates a [Flow] of type [T] from this [ConnectableFlow] that automatically connects (i.e. calls
 * [ConnectableFlow.connect]) when the first [numberOfCollectors] observer starts collecting (i.e. calls [Flow.collect])
 * and automatically cancels this [ConnectableFlow] when the last observers stops collecting.
 *
 * @param scope The scope in which this [ConnectableFlow] will be connected.
 * @param numberOfCollectors The number of observers that need to start collecting before the connection (re)starts.
 * @return The shared referenced-counted [Flow].
 */
fun <T> ConnectableFlow<T>.refCount(scope: CoroutineScope, numberOfCollectors: Int = 1): Flow<T> =

/**
 * Shares this [Flow] of type [T] with multiple observers without restarting when each observer starts
 * collecting. This is the same as calling [Flow.publish] and then [ConnectableFlow.refCount].
 *
 * @param scope The scope in which this [ConnectableFlow] will be connected.
 * @return The new [Flow] that shares this [Flow]
 */
fun <T> Flow<T>.share(scope: CoroutineScope): Flow<T> = publish().refCount(scope)

This is my first stab at an initial/draft/try-out implementation: https://gist.github.com/streetsofboston/39c3f8c882c9891b35e0ebc3cd812381

Update: I took autoConnect out: This is more for 'replay' and 'caching'. If needed, this should be addressed in a separate issue.

elizarov commented 5 years ago

Can you please add some actual use-case as in "here is the what application is trying to do and that is what it wants to achive" without tying this use-case to the actual solution in the form of ConnectableFlow. There are tons of methods here. All of them need use-case (refCounting, autoConnect), etc.

streetsofboston commented 5 years ago

Use case for ConnectableFlow:

Cold Streams are often Unicast. When an observer/consumer starts observing, the source of the Stream is started again. The Flow API currently allows for cold unicasts, where its source is (re)started each time collect is called.

Hot Streams are often Multicast where observers/consumers can come and go without them restarting anything. The Channel and BroadcastChannel APIs support this and the method broadcastIn already exists.

Sometimes, it is desirable to have Cold Streams that are Multicast. The source of the stream may not always be active (it may be expensive to have the stream being active all the time or starting a new one each time), and starting the stream does not depend on whether any observers/consumers are actually observing. Starting and stopping the cold multicast stream needs to be managed explicitly.

The proposed ConnectableFlow would implement such Cold Multicast Flow, where the source of the Flow (re)starts each time when its connect method is called and where the source of the Flow stops when this connection is closed.

Examples of such cold multicast streams are BLE (Bluetooth Low Energy) characteristics that notify the observer of data changing on an external device, e.g. a BLE thermometer or any other continuous monitoring device. Starting a characteristic keeps a connection open between the observer and the BLE device and this can be somewhat expensive. It is best to manage this explicitly, e.g. have the user click a 'connect' and 'disconnect' button or to manage it implicitly by only starting the connection when observers on the UI are observing the device.

Make a cold unicast Flow a cold multicast ConnectableFlow: fun <T> Flow<T>.publish(): ConnectableFlow<T>

Implicitly manage the connection of a ConnectableFlow by reference-counting: This allows a cold multicast Flow to be active only when observers are listening/collecting. E.g. Keep a BLE Characteristic notification stream active if the user is looking at a UI that needs it. fun <T> ConnectableFlow<T>.refCount(scope: CoroutineScope, numberOfCollectors: Int = 1): Flow<T>

elizarov commented 5 years ago

Sometimes, it is desirable to have Cold Streams that are Multicast. The source of the stream may not always be active (it may be expensive to have the stream being active all the time or starting a new one each time), and starting the stream does not depend on whether any observers/consumers are actually observing. Starting and stopping the cold multicast stream needs to be managed explicitly.

Can you, please, provide an example with an actual application scenario (as in "here is the actual application I'm writing and here is why I need it") where a cold stream needs to be mutlicast, but it cannot be always active (so you cannot just use always active BroadcastChannel) and when, at the same time, starting/stopping stream does not depend on the presence of observers, so that it needs to be managed explicitly.

LouisCAD commented 5 years ago

@streetsofboston To me, that Bluetooth GATT notifications use case is not a cold stream at all, but a hot one with manual start and stop, so effectively just a channel (possibly broadcast), and two custom functions to start/subscribe and stop/unsubscribe.

streetsofboston commented 5 years ago

@LouisCAD You're correct. But adding the two custom functions to start and stop the steam could be also handled by the ConnectableFlow's connect method and the close method on the returned Connection, much like a ConnectableObserver in Rx (connect and dispose). The connect call would make the underlying stream hot (i.e. it will start it and the stream will emit values until close on the returned connection is called).

I also do believe the addition of ConnectableFlow to the core Flow API is not required and stuff can be done manually (I implemented a ConnectableFlow myself in the gist I linked, using the public Flow api). But it can be convenient to other devs. Maybe ConnectableFlow can be part of an extension library?

I'm currently not working on any BLE app right now, but have been in the past on an apps that used plain callbacks the RxAndroidBLE library. There we make use of ConnectableObservables. But that has been a while and I need to dig into the past a little to get a good use-case. :-)

LouisCAD commented 5 years ago

@streetsofboston Actually, I made a library for Bluetooth Low Energy with coroutines a while ago (and I keep it updated). Notifications support works with channels, although I've not needed notifications support myself. If you think that part of the API may be improved, feel free to open an issue there!

matejdro commented 5 years ago

To me, that Bluetooth GATT notifications use case is not a cold stream at all, but a hot one with manual start and stop, so effectively just a channel (possibly broadcast), and two custom functions to start/subscribe and stop/unsubscribe.

All that is true, but wrapping GATT into cold stream provides benefits of automatic state management. Programmer can easily forget to call manual start/stop method (especially stop). But with flow, this is done automatically (start on collect call, stop when coroutine is cancelled).

Another use case:

Developing a mobile app that uses GPS location at various points. Location is exposed as flow stream. Whenever part of app needs access to Location, it starts collect on stream, GPS receiver activates and location is transmitted. If another part needs access to Location, it can collect on the same stream. Since stream is already active, it would just multicast new location data to all subscribers. When part does not need access to Location anymore, it cancels the collecting coroutine. Once all producers are cancelled, GPS receiver shuts off, saving battery.

inorichi commented 5 years ago

This last example might be somewhat related to #1097, but using Flows instead. Not sure if they could share a common implementation, though.

Edit: although it's only the analogue of the .refCount() operator. .connect(N) must be cancelled manually, or with the scope (what OP has asked for).

elizarov commented 5 years ago

Let me summarize what I'm getting out of this thread so far. I see a bunch of use-cases here for an operator that automatically actives a flow on a first collector, shares the emitted events with all the other collectors, and cancels the flow instance as soon as the last collector is done. Easy, usefull, no chance of resource leakage, no need to introduce any new types like ConnectableFlow -- it is just an operator. The only question is how do we have name it. Can we name it just share()?

A kind of manual activation/deactivation of the flow sounds like a use-cases for a channel to me. You can already do flow.produceIn(scope) to active a flow and we might even provide a scope-less flow.produce() variant. produce activates a flow and returns a channel. You can cancel this channel when you no longer need it -- that is your manual activation even when you don't have any collectors.

Does this sound like a plan?

matejdro commented 5 years ago

For my use case, this sounds perfect.

nhaarman commented 5 years ago

Next to just share()ing the subscription, new subscribers often want to immediately receive the most recent value that was emitted. In Rx this is done with the replay(1) operator.

Take for example the way Firebase Database provides lists of items. Firebase emits events like 'item added', 'item removed', 'item moved', etc. Clients can construct the entire list from these events. A Flow can scan these events and construct the list, sharing the result:

databaseEvents
  .scan(emptyList()) { list, event ->
     list.apply(event)
  }
  .replay(1).refCount()

Replacing replay(1) with publish(), new subscribers potentially never receive any emissions.

streetsofboston commented 5 years ago

@elizarov I updated my gist with some examples, use-cases for ConnectableFlow and share(). https://gist.github.com/streetsofboston/39c3f8c882c9891b35e0ebc3cd812381 (see the 2nd file in this gist called Example.kt)

I also tried to implement the UseCasesForConnectableFlow.manage_expensive_start_and_stop_of_resource() use-case/example using the produceIn function instead, and even tried it using the broadcastIn function. I was not able to do this in a way that produced the same results when using ConnectableFlow from my implementation. In this use-case, the ExpensiveResource should be started about 3 seconds after the call to fun manage_expensive_start_and_stop_of_resource(), not earlier.

The main issue with using produceIn is that the listeners (collectors/consumers) need to be able to get a reference to a Flow (or a ReceiveChannel, BroadcastChannel, etc) before the cold stream is activated, but produceIn returns an activated ReceiveChannel...

Maybe I'm overlooking something and missing an implementation that works without the introduction of something new like a ConnectableFlow.

elizarov commented 5 years ago

@streetsofboston Indeed. You cannot easily emulate post-factum activation of the flow via existing APIs. But what is the use-case for that? Why would you need to get a reference to the flow that is not "active" yet and then activate it later?

streetsofboston commented 5 years ago

A use-case is a data-store (key-value) store that is relatively expensive to start-up (and shut down).

There is a UI (eg Android Activities/Fragments) that examines and shows key-value pairs from this data store. Using a share()-like method for the UI-screens to get the Flow of the key-value pairs would start up the data-store (again) as soon as there is at least one collector. It would shut it down when there are no more collectors left.

However, the moving of the user from Activity to Activity or from Fragment to Fragment should not dictate when the data-store starts (and shuts down), since this can be unpredictable and may cause expensive re-starts of the data-store.

Instead, a Service (or some other 'manager') can be used to explicitly (re)start and stop the data store, reducing the amount of re-starts by keeping the data-store alive a little longer.

The order of the appearance of the UI-screens, that need the key-value pairs from the data-store, and the start of the Service should be independent: The UI should be unaware of this Service and focus on just the key-value pairs Flow. The Service should not be bothered when the UI appears or disappears. The UI would receive a ConnectableFlow instance but exposed as a Flow. The Service would receive a ConnectableFlow instance as well, exposed as a ConnectableFlow allowing it to manage starting and stopping of the data-store.

matejdro commented 5 years ago

Wouldn't simple share() operator also fix this use case? You would just create dummy collect inside that service which should prevent your store from closing until service closes.

nhaarman commented 5 years ago

Wouldn't simple share() operator also fix this use case? You would just create dummy collect inside that service which should prevent your store from closing until service closes.

'Dummy collect' shows a different intention than explicitly starting and stopping the data store, and has more potential to be removed by accident.

matejdro commented 5 years ago

Yes I see now that this would not look good in code. Even if some better function like keepOpen() would be added, it would be just infinitely-suspending function (until cancelled), which I feel does not work very well with coroutines style.

Also now that you mentioned the activities, timeout on share() operator would also be benefitial.

Use case for this is that Android's screens often go through configuration change, which destroys the screen and creates new one. It goes like this:

  1. Activity/Fragment collects Flow
  2. User rotates device
  3. Activity gets destroyed - Collection gets cancelled, flow terminates and expensive resource behind it gets close
  4. New activity gets created with new orientation - Flow is collected again and expensive resource reopens

This wastes resources, because resource behind Flow stream is closed for like one millisecond and then reopened again. Timeout (like RxJava's refCount operator) would be useful here - for example "only terminate the stream if there are no subscribers for 2 seconds."

streetsofboston commented 5 years ago

@matejdro RxJava has such operators. It has overloaded refCount operators and a few of them take a timeout param that does exactly that. Still, that could be 'flaky' is certain situations.

Having some code that acts as a manager to start and stop the stream exactly when it wants leaves it up to the exact needs of the use case. Maybe later we can add those overloaded refCount operators ( http://reactivex.io/RxJava/javadoc/io/reactivex/observables/ConnectableObservable.html#refCount-int-long-java.util.concurrent.TimeUnit- ) (and fun <T> Flow<T>.share() = publish().refCount())

matejdro commented 5 years ago

Yes, I know about refCount, I mentioned it in my comment above.

streetsofboston commented 5 years ago

Sorry @matejdro. Me reading and typing an answer/reply on a phone is not the smartest thing to do :)

matejdro commented 5 years ago

Why does this issue still have use-case-needed tag? Multiple use cases were provided.

streetsofboston commented 5 years ago

Does this issue need any additional use-cases?

LouisCAD commented 5 years ago

I don't think so. I made a formal proposal in #1221 that leaves a few open questions. They certainly need an answer to make an implementation possible.

elizarov commented 5 years ago

What open-source projects you'd recommend too look at that use/need this kind of conntable flow (it is Ok if they are using Rx now)?

streetsofboston commented 5 years ago

Right now, I'm not aware of any open-source project that may need it. But I'm not aware off all open-source projects out there :)

For our own private repos, for our clients, we're often using ConnectableObservable when we need BLE related functionality, for example. If we'd want to write this using Coroutines (not Rx), then a ConnectableFlow would come in really handy.

elizarov commented 5 years ago

Here is my proposal for share operator that, I think, should solve your use-cases without adding any new concepts like "ConnectableFlow": https://github.com/Kotlin/kotlinx.coroutines/issues/1261

streetsofboston commented 5 years ago

@elizarov Looks good.

Looking forward into the future, where I can imagine that folks would have the need for cache(): Maybe make ConnectableFlow an internal (or private) class to ease a possible implementation of fun Flow<T>.cache(scope: CoroutineScope) : Flow<T>. Something like this gist here, but without exposing the ConnectableFlowImpl publicly through the ConnectableFlow interface. https://gist.github.com/streetsofboston/39c3f8c882c9891b35e0ebc3cd812381

elizarov commented 5 years ago

I'm proposing that you can use share(n) (specifying a buffer size right there), so I don't see why a separate cache might be needed. It seems that a single share would be enough to cover all the use-cases from your gist.

elizarov commented 5 years ago

The only missing feature is autoConnect(numberOfCollectors), where numberOfCollectors > 1, because a proposed share is always autoConnect with numberOfCollectors==1. I don't see what is the use-case for the value of > 1.

streetsofboston commented 5 years ago

I see! Yep, adding a buffer-size param to share would do the trick. We could set it's default size param to 0...?

An extension function fun <T> Flow<T>.cache(size: Int = 1) = share(size) could be added somewhere else, if need be, for providing a more semantic function name...

streetsofboston commented 5 years ago

The numberOfCollectors > 1 use-case is in my experience very rare. I can imagine use cases to have automatic shared resource handling when you know that you have at least X consumers. Still, never actually seen that use case.

zach-klippenstein commented 5 years ago

I know this issue has been all but superseded by #1261, but FYI RxJava is considering changing their Connectable API to make the state machine more explicit when reconnecting: https://github.com/ReactiveX/RxJava/issues/5628

akarnokd commented 5 years ago

If I understand suspension correctly, there could be some trouble with connecting and collecting because you have to launch { } them all to not get suspended (as connect() has to call collect on the upstream).


val connectable = ...

launch { connectable.collect { println } }

launch { connectable.collect { println } }

launch { connectable.connect() }

With a publish type of sharing, if there are no collectors, the upstream data may get dropped. In contrast, collectors may not even appear so waiting for some could be equally troubling. Could this be solved within the coroutine conceptual framework?

elizarov commented 5 years ago

@akarnokd I'm not sure I understand this question. Can you, please, clarify.

pacher commented 4 years ago

If a Flow.share discussed in #1261 automatically activates a flow on a first collector, than only that first collector is guaranteed to receive all the events. If I understand correctly, other collectors might miss a few events in the beginning of the sequence due to concurrency. As per example from #1261:

launch { flow.collect { println("A: got $it") } }
launch { flow.collect { println("B: got $it") } }

Collectors are launched concurrently. First one activates the flow. Second might see all of it if subscribed fast enough, but could miss a few if out of luck.

That is why in my opinion ConnectableFlow story is different from share operator. Here we want to share expensive cold source of data, where loosing data items is not acceptable compared to hot sources like mouse clicks.

Here is an example: Imagine I have a huge log file, which is naturally represented as a cold Flow of lines/records. I need to read and analyze it in multiple ways. All these tasks are very convenient to code separately as reactive operator chains, like map{..}.filter{...}. E.g. collectorA is looking for apples, collectorB is looking for oranges, collectorC is counting ducks etc. So at some point in time I have a selected bunch of collectors and want to run them to get results. If I just collect my cold flow, every collector will initiate reading of a file from disk, which is slow and wasteful. I want to share it, so that the file is read only once, but I want to be sure that each collector gets log items from the beginning. I can't imagine how it can be achieved without some form of delayed explicit connect. Also, the file is huge but collectors are expected to find what they are looking for at some point, so the reading of the file should stop when there is no more need for it.

I would image some kind of StartableFlow (ColdFlow, heh) which is not started until explicit start call. It would be reasonable to make it one-start-only and not accept any new collectors after it is started. Similar to how consumeAsFlow results in a flow which can be collected only once.

The closest solution from Rx world I found is publish operator with "selector" function https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#publish-java.util.function.Function- http://reactivex.io/RxJava/javadoc/io/reactivex/Flowable.html#publish-io.reactivex.functions.Function-

I use it quite a lot in many different circumstances and it would be great to have something like this for Flow. The key difference from share is that all subscriptions happen within provided lambda, so operator can safely connect to upstream after everybody is subscribed and no events are lost.

akarnokd commented 4 years ago

The closest solution from Rx world I found is publish operator with "selector" function

The problem is that it is generally unclear (to me) when the collector(s) are all lined up to receive items. For example, the multicast operator equivalent to RxJava's publish(Function) has to collect the returned Flow asynchronously otherwise the scope wouldn't progress to the connect phase.

In RxJava, consumers are by default non-blocking and synchronous, thus, we generally know all the consumers to the subject have lined up and ready to receive items the moment subscribe returns.

elizarov commented 4 years ago

@pacher This is quite a valid concern. There are two different use-cases here:

flow.replicate {
    replica { collect { println("A: got $it") } }
    replica { collect { println("B: got $it") } }
}
pacher commented 4 years ago

Another example of similar feature is shiny new teeing collector from java 12

@elizarov Exactly! I would formulate the difference as follows:

My example of data processing is just one use-case. As I mentioned publish(Function) is actually very powerful and I use it really a lot. That's because it's not terminal, but an operator which returns Flowable/Flux. Couple of examples:

  1. Connection health check. I have a stream of events from network connection. Than inside of publish lambda I spawn a secondary stream with debounce. It detects that no events were received for a certain time so that it can ping/reconnect and so on. publish returns a stream of original events to process further, while this secondary stream silently keeps connection healthy somewhere in the background
  2. Painless split and merge of streams. Something like this:
    flux.publish { flux ->
    val first = flux.filter { ... }
    val second = flux.filter { ... }
    combineLatest(first, second, BiFunction { ... })
    }

    It is amazing how far you can get with it

This is probably already another use-case territory, but as usual I just want you to keep it in mind while in the design and discussion phase. (maybe replicate should return a flow as well)

@akarnokd Totally agree. It is tricky and I don't see a simple solution either, otherwise would just code something for myself instead of bothering all of you. I am just trying to keep discussion and thinking going instead of dismissing it as resolved by #1261

elizarov commented 4 years ago

There is a design for SharedFlow that, I believe, covers most of the need that advanced use-cases of ConnectableFlow (full control on upstream, observability of the number of downstream connections, etc) and provides a framework to implement easy-to-use sharing operators for simpler use-cases. See #2034

elizarov commented 4 years ago

Basic use-cases described herein are now taken into account, too, in the design of sharing operators as described in #2047, so I'm closing this issue.