Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.84k stars 1.83k forks source link

Proposal: Flow pause cooperatively #2223

Open Maartyl opened 3 years ago

Maartyl commented 3 years ago

Basic idea

interface CoroutinePausing : CoroutineContext.Element {
    val isPaused: StateFlow<Boolean>

    override val key: CoroutineContext.Key<*> get() = Key
    companion object Key : CoroutineContext.Key<CoroutinePausing>
}

Value of this comes from libraries using it, but it is not imposed on anyone.

Motivation 1: SharedFlow

Motivation 2: Lifecycle (Android)

Benefits

Downsides

Conceptual usage

(names are poor, ignore those)

suspend fun awaitUnpaused() {
    val p = coroutineContext[CoroutinePausing] ?: return
    if (!p.isPaused.value) return
    p.isPaused.takeWhile { it }.collect()
}

//to wrap a flow that does not support this mechanism:
fun <T> Flow<T>.awaitingUnpaused() = flow {
    awaitUnpaused()
    collect {
        emit(it)
        //after emit, so an old value is not emitted later, once unpaused
        // instead immediately 'resumes' upsteram
        awaitUnpaused()
    }
}

//to wrap a flow that does not support this mechanism:
fun <T> Flow<T>.recollectWhenUnpaused(timeoutMillis: Long = 0) = flow<T> {
    //this is, I believe, the currently intended way for SharedFlow to handle upstream 'pausing'
    //it could be separated from SharedFlow
    TODO("stop collecting when paused; start collecting when unpaused")
}
psteiger commented 3 years ago

+1 to this feature.

elizarov commented 3 years ago

Can you please, elaborate on what you are trying to achieve? Can you provide a specific problem that you are running with SharedFlow into and that you are trying to solve?

psteiger commented 3 years ago

@elizarov , not OP, but I think a use case is pausing the collection of flows when an Android LifecycleOwner (e.g. Activity) goes into the background (lifecycle in onPause() or onStop()). This is closely related to #2194. I'm not sure how subscriptionCount will work for SharedFlow, but I believe counters will not decrease if a collection job, launched in a LifecycleCoroutineScope with launchWhenStarted(), gets paused, so subscriptionCount in itself would not be enough to serve as a substitute for LiveData's onActive()/onInactive() as suggested in #2194.

elizarov commented 3 years ago

Why this would be really needed? Do you have any specific use-case in mind?

Maartyl commented 3 years ago

@elizarov

Stopped Lifecycle is still collecting.

This is my main problem. (I am already using CoroutinePausing for this.)

SharedFlow recollecting

Essentially: recollecting is something, I can see to be something one 'often' wants to avoid.

Core point

By this 'interface' being in a common library, 3rd party libraries can use it to coordinate.

If SharedFlow implemented this, it would be even more useful. It doesn't strictly have to: One would always have to provide CoroutinePausing in context. It would just seem more 'correct' if it was integrated throughout the library.

(Pausing jobs was another request. Pausing jobs seems like a very complicated thing, and unsafe (and while I find pausing of coroutines useful too, I hope hard pausing of jobs will never be the chosen solution). This completely solves that. - In addition to being effectively needed for unwasteful hot flows.) ( as long as the code that needs to be paused cooperates)

(*1: After I started using CoroutinePausing, I am slowly refactoring away all LiveData, as it is no longer needed. For now, I still have to pass CoroutinePausing in the sharing context, though.) (Also, to not come of as deceitful: I am not using SharedFlow yet: I am using 1.3.9 atm. I am looking forward to SharedFlow, and think potentially integrating CoroutinePausing might even change the design a little before it's fully launched. - It seems like an important thing to have with hot flows.)

(*2: I am already wrapping all Room queries in .awaitingUnpaused() and it works well, but I imagine it would work better, and be more efficient, if it was implemented in Room directly.)

(*3 In practice, it's better to replace SharedFlow with LiveData directly, as it will have less overhead. (this spreads and pollutes all with LiveData 'nodes' in the flow 'graph') - But then: using the LiveData as flow will again keep it running continuously.)

Maartyl commented 3 years ago

@psteiger Yes. This is the main point. subscriptionCount will not change when lifecycle is stopped. Even if it did, a flow far upstream would not have access to it. Even if it did, it would be tightly coupled with SharedFlow - all it cares about is 'if someone is interest in it running'.

(Hot) flows have no way to inform upstream of being Active/Inactive like LiveData has.


Even if this were not considered enough "value / cost" to be in kotlinx.coroutines I consider it an absolute must-have, and will use it regardless. 1 I just think others could benefit from it too. It would also be nicer for me, if it were integrated with core library functions, and with other libraries. 2

(*1: unless there is some better alternative I'm missing)

(*2: I have already written a simple alternative to SharedFlow (wrapping StateFlow) that I intend to use instead, until something like this is supported 'natively' . Also, my implementations are definitively not as efficient as properly integrated solutions would have been.)

fluidsonic commented 3 years ago

I've also just realized that pausing flows is kinda what I'm looking for. At least it looks like a viable solution.

Scenario

The following happens whenever my long Flow ~turns hot~ is being collected:

  1. Load lots of cached data from database (minutes)
  2. Periodically refresh data from server when cache is stale (seconds to minutes, against rate-limited API)
  3. Perform expensive transformations whenever new data is available (many minutes)
  4. Actually consume the final data (e.g. in my case print to console on demand) (quick)

1, 2 & 3 are basically states that are occasionally updated.

Actual behavior

If the entire Flow ~turns cold all the way to upstream~ is no longer collected and then ~hot~ collected again (same Flow instance), the expensive chain starts all over.

Desired behavior

I'd like to avoid all expensive work in 1, 2 & 3 as nothing has changed since the Flow ~has turned cold~ was previously collected. It's still the same Flow instance.

Thoughts

For me, controlling the state a Flow is in and whether or not a Flow is working on that state are two different things. I'd like to control them separately.

Not collecting -> Paused -> Collecting -> Paused -> Not collecting

No work and no state ~during Cold~ while not collected. No work during Paused. Has state and is working while ~Hot~ being collected.

fluidsonic commented 3 years ago

Here's a very stupid implementation to better explain the idea: https://gist.github.com/fluidsonic/01702dbab744595a8dbdd41befe6829c

I basically pause the endless loop in the Flow builder while there are no subscribers instead of canceling it and starting over later on.

pausableFlow { … } creates a SharedFlow. It keeps the passed suspendable block hot while there are no subscribers. The block however can call joinPause() at any time to suspend itself if the Flow is paused. It will automatically resume once the Flow has subscribers again and is unpaused. It will also automatically join pauses before and after emissions.

More functionality could probably be built on top then. E.g. that chaining pauses with upstream pausable Flows or pausing by external means like a method in a PausableFlow.

Maartyl commented 3 years ago

@fluidsonic

Flow turns cold all the way to upstream and then hot again

I'm not sure what that means. In my understanding, flow cannot change between cold and hot.

... When a flow is hot, it cannot be cold anymore. I guess a flow can start sort of cold, and then become hot, but I don't see how a hot flow could become cold.

Are you using the words 'hot' and 'cold' to mean 'is being collected' and 'is not being collected' respectively ? That's not how they are used with flow. (as far as I know)

Maartyl commented 3 years ago

@fluidsonic

Here's a very stupid implementation to better explain the idea: https://gist.github.com/fluidsonic/01702dbab744595a8dbdd41befe6829c

From my understanding, You are proposing a sharedFlow, which instead of providing 'subscriptionCount' only provides 'subscriptionCount > 0' ...? (but through a special collector)

What are some benefits over my idea? To me, it just seems less generic, less orthogonal, and ... I don't see any advantage. (Sorry if rude. I don't mean to bash. I am legitimately interested in advantages of your proposal, because I think CoroutinePausing is very good, but I might be wrong, and would like to combine any good ideas if possible and useful.)

Your use-case seems to match what I described under "SharedFlow recollecting" and is one of the main reasons for this proposal, 'namely' "recollecting a flow might be expensive/impossible and it's better to keep collecting it and just inform it, that nobody is interested in emissions at the moment (=paused)" ( be it from a terminal collector, or from a SharedFlow etc.)

Here is my idea behind pausing upstream (or any coroutine, really):

For more, please read what I wrote in the posts above. I think you will like the idea. (If not, please tell me why.)

Here are some other points that I find in favor of CoroutinePausing:

*1: If my idea was integrated into the library, SharedFlow would already work as a pausingFlow (i.e. making the wrapped flow aware whether it has any (not paused again!) collectors) but unlike PausableFlowCollector or subscriptionCount which are only available directly to the code that is using the sharedFlow this will provide it in coroutineContext of the entire upstream. Any coroutine, anywhere in the producer can access it, and supend itself until the context it lives in becomes unpaused. (simply by calling awaitUnpaused()) Even code in libraries, that has no idea about you using it in some sharedFlow it has no reference to.

I have already provided essentially all necessary implementation in the first post, except for changes to ShareFlow implementation and a few utility methods, like Flow.pausedWhen(isPaused:StateFlow<Boolean>) that would make upstream paused when either this isPaused is true, or the one provided from downstream context is - one needs unsafe flow, to implement this, as it changes context, though. There is obviously a few more, but none too complicated. (I can share what I'm using (in production, btw.) but it's still very rough.)

---- Your code using my implementation (example of how it would look like if it was part of the library (and names were not changed)): https://gist.github.com/Maartyl/c44ea10fe04d6fcf924abe36099888b8

You also write "It will also automatically join pauses before and after emissions." - I think this is not always a good idea, but if you want it, you can use awaitingUnpaused() above (but nobody is forced to use it), or alternative that even awaits before the emit (but I explained in the comment, why I think it's a bad idea to generally pause before the emit). It does not use finally, as I think a failure should propagate immediately regardless.

Regarding the difference in outputs: The flow starts paused, because there are no collectors yet. pausingStateIn works as shareIn(...,started=Eagerly) and only starts paused, instead of starting lazily. If it did start lazily, the flow would start unpaused.

fluidsonic commented 3 years ago

@Maartyl ~the terminology doesn't seem to be 100% consistent.~

This is for example from Cold flows, hot channels:

Terminal operators on a flow collect all values emitted by the flow, activating the flow code only for the duration of the corresponding operation. It makes the flow cold β€” it is not active before the call to terminal operation, not active after, releasing all resources before returning from the call.

~According to that, a Flow becomes cold as soon as the last collector is gone.~

Nevermind. I had to read it like five times to see the nuance here. πŸ˜… It does refer to the entire Flow and doesn't mean that collect makes it cold.

Looking at the documentation of SharedFlow for example that doesn't seem to always be the case:

A hot Flow that shares emitted values among all its collectors in a broadcast fashion, so that all collectors get all emitted values. A shared flow is called hot because its active instance exists independently of the presence of collectors. This is opposed to a regular Flow, such as defined by the flow { ... } function, which is cold and is started separately for each collector.

It is a little confusing in general.

In my case I do mean "is being collected" and "is not being collected".

fluidsonic commented 3 years ago

@Maartyl regarding your other points:

From my understanding, You are proposing a sharedFlow, which instead of providing 'subscriptionCount' only provides 'subscriptionCount > 0' ...? (but through a special collector)

No. My Flow's implementation merely uses subscriptionCount to detect if the Flow is being collected at least once or not.

What are some benefits over my idea? To me, it just seems less generic, less orthogonal, and ... I don't see any advantage. (Sorry if rude. I don't mean to bash. I am legitimately interested in advantages of your proposal, because I think CoroutinePausing is very good, but I might be wrong, and would like to combine any good ideas if possible and useful.)

Doesn't sound rude and is a valid question.

Potential issues with being able to pause coroutines

I have a few issues with being able to pause coroutines in general:

  1. ~It's shared mutable state. It can be very hard to reason about when a coroutine was paused and where because isPaused would be accessible everywhere. That's a likely source of bugs. It's already difficult enough to figure out why and where a coroutine was canceled.~ Nevermind, I've misread that.
  2. Just because a lifecycle is considered paused doesn't mean that all coroutines in that lifecycle's scope should be paused. That would effectively be the case here. Coroutines would not have the ability to difference between "is paused because X" and "is paused because Y". What if two parent contexts pause based on different scenarios? How do they cooperate?
  3. All examples seem to be Flow-related. Pausing can be designed much better and easier for Flows than for coroutines in general. Such a solution or something similar could be provided for Flows at first and if useful still be extended to all coroutines later on.
  4. There are ways to model Android lifecycles in a more reliable way. I've had the same problem in an app I was writing. See below.

Alternatives for Android lifecycles

I've created multiple coroutine scopes to account for different lifecycle stages. I have a scope "resumed", a scope "started" and the standard scope "created".

That does require recollecting the Flow afterwords but in most cases is fine. For the remaining cases where I merely need a Flow to pause I've created another Flow that emits Android lifecycle updates (created <-> started <-> resumed … -> destroyed). By combining my upstream Flow with that Flow I can influence downstream work as needed. It's not as flexible and easy as properly pausing Flows but can serve as a building block to decide when to pause.

Pausing Flows

Here is my idea behind pausing upstream (or any coroutine, really):

  • They are not forced to pause, that's important.
  • It is also for collectors that will 'stay collecting' even as they are paused. (that includes SharedFlow - which is effectively collecting the wrapped flow *1) (but also a paused activity in android, or something you don't want to stop collecting, but only want to pause collecting temporarily)
  • All of upstream can use it, without explicitly passing a reference anywhere, that would be tightly coupled with it being used in a SharedFlow or pausingFlow.

All three points should be covered by my suggestion.

Your solution will not work with all(any) existing operators: the PausableFlowCollector will not propagate through. They will however propagate CoroutineContext automatically. Even across ChannelFlow (so, paused downstream can be easily observed from asynchronous producers).

If pause events downstream are properly propagated upstream then it can be used using flatMap as mentioned above. Theoretically we could add an extension to FlowCollector to allow something like joinPause in any operator that uses it.

I'd avoid that though. If joining pauses is readily available for every single Flow or even in any suspending function then developers would assume that their code properly pauses if needed. However pausing only works if a downstream Flow actually makes use of it. Therefor I'd not make it too easy to use pausing functionality so that developers have to think twice whether it works in their case.

We could still allow Flows to join pauses anywhere along the stream through a function that internally goes through the collector or the coroutine context. It should just make clear that it's cooperative with the downstream. And still no need to make it a generic coroutines feature.

Producer does not need to be tightly coupled with a PausableFlowCollector Why not? See above why I think that opt-in should be preferred.

You would need to pass a reference to all the code that might need it, instead of just deriving the scope.

Could you please give a use case here? I can't think of a scenario right now.

Most likely, it is much simpler to implement.

I'd argue the opposite :) Getting a feature right that's used across all coroutine functionality will likely need a lot of considerations. Starting with API, over how it interacts with all the other coroutine functionality, over how it behaves in many cases (like merging two pausable coroutine contexts when using withContext, launch, etc.) and so on. I see a lot of rabbit holes lurking there.

Only using subscriberCount is not enough, as the subscriber can be paused too: One needs 'unpaused subscriber count'. (I tend to forget about this too, but it's crucial for big flow graphs)

Yes, you're right. That's why I've said that it's just a stupid implementation for explaining the idea and that more functionality can be built on top then, for example properly communicating downstream pauses upstream.

If my idea was integrated into the library, SharedFlow would already work as a pausingFlow (i.e. making the wrapped flow aware whether it has any (not paused again!) collectors) but unlike PausableFlowCollector or subscriptionCount which are only available directly to the code that is using the sharedFlow this will provide it in coroutineContext of the entire upstream. Any coroutine, anywhere in the producer can access it, and supend itself until the context it lives in becomes unpaused. (simply by calling awaitUnpaused()) Even code in libraries, that has no idea about you using it in some sharedFlow it has no reference to.

~I thought about that too. But making SharedFlow pausing by default or even only allow for pausing is not a good idea. What if I do want upstream collections to be dropped instead of paused?~ I think I misunderstood you here. You merely write that a SharedFlow provides the ability to pause upstream. But how/where would I actually set isPaused? Where's the MutableStateFlow?

And as mentioned before, I consider it a good thing if pausing is opt-in rather than automagically there. Also, the likeliness that downstream supports/uses pausing is probably low.

I have already provided essentially all necessary implementation in the first post, except for changes to ShareFlow implementation and a few utility methods, like Flow.pausedWhen(isPaused:StateFlow) that would make upstream paused when either this isPaused is true, or the one provided from downstream context is - one needs unsafe flow, to implement this, as it changes context, though. There is obviously a few more, but none too complicated. (I can share what I'm using (in production, btw.) but it's still very rough.)

Yeah, something like pausedWhen or pausedWhile would definitely make sense.

You also write "It will also automatically join pauses before and after emissions." - I think this is not always a good idea, but if you want it, you can use awaitingUnpaused() above (but nobody is forced to use it), or alternative that even awaits before the emit (but I explained in the comment, why I think it's a bad idea to generally pause before the emit). It does not use finally, as I think a failure should propagate immediately regardless.

I'm open to not have emit automatically join pauses. I haven't thought deeply about what default behavior makes most sense: Join by default or don't join by default with opposite = opt-in. I don't know what you refer to by failure propagation. My code joins pauses before emissions to avoid emissions while paused and joins pauses after emissions to avoid work when the Flow was paused as a result of the emission. emit can block so there can be quite some time between before and after.

Regarding the difference in outputs: The flow starts paused, because there are no collectors yet. pausingStateIn works as shareIn(...,started=Eagerly) and only starts paused, instead of starting lazily. If it did start lazily, the flow would start unpaused.

You're right. With my approach it should also start paused as the shared Flow defaults to Eagerly. I haven't considered that yet.

How would, with your suggestion, intermediate collectors pause upstream collectors? They cannot set isPaused of CoroutinePausing.

I wonder if @elizarov is already getting gray hair from this discussion 😁

Maartyl commented 3 years ago

@fluidsonic

General point: CoroutinePausing is provided when building a CoroutineScope. It should propagate in a similar way a Job would.

Just because a lifecycle is considered paused doesn't mean that all coroutines in that lifecycle's scope should be paused.

This would be a very valid point, I thought, except, when a scope (lifecycle is just a special case) is paused, all 'child scopes' should also be paused by default. The same way cancellation works. I think it should pause everything under it. However, just like coroutines can do withContext(NonCancellable) they could do withContext(NonPausing) (in my current impl, I'm calling that val pausingAlwaysFalse ) As for a lifecycle, I think everything under a lifecycleScope should be paused when the lifecycle is stopped. If it should not be paused, it probably should not be a child of that scope. (Either it should use viewModelScope, or something even more removed from the LifecycleOwner)

"is paused because X" and "is paused because Y".

I don't see how that would matter. It is running in a scope, that is paused.

What if two parent contexts pause based on different scenarios?

(like merging two pausable coroutine contexts when using withContext, launch, etc.)

Properly having 2 parent contexts (parent scopes) is not possible in Kotlin. This is a sad thing, but there is no support for it. I don't think we can, or even should, solve this for a single CoroutineContext.Element.

(Having no default merging of parent contexts has worse problems than this: For example, it makes ContinuationInterceptor nearly unusable for anything but Dispatchers, as any common Dispatcher switch will override it.) (Jobs do not combine at all, etc.)

[All Flow related.] Such a solution or something similar could be provided for Flows at first and if useful still be extended to all coroutines later on.

That tends to be a very hard thing to do, and pausing is such a general concept, that I can see it popping up in multiple places independently. I would definitely not want to end up with 4 different common pausing mechanisms in different places. It's probably better to take care and do the first one carefully, and orthogonal.

I am not completely against implementing pausing only for (hot) Flows. That is the main place where I need it too. But, I'm afraid, it will be the wrong long term decision.

There have been requests for pausing Jobs, for example. This would probably be enough for the people who needed that. (as an example) and Android already has a pausing Dispatcher (I think it's unusable, but it's another place where someone implemented unrelated, incompatible pausing.)

I don't know what you refer to by failure propagation.

Exceptions.

avoid emissions while paused

That is not really a problem. Pausing is just a 'hint' anyway. It is worse to emit stale data, once the flow is unpaused. (in my opinion anyway)

[simple to implement] Getting a feature right that's used across all coroutine functionality will likely need a lot of considerations

It definitely needs a lot of consideration. It will still be 'easier' than writing an overload for every flow operator, so it has a variant, that can propagate pausing; or rewriting the whole Flow codebase.

properly communicating downstream pauses upstream

So it must be cooperative across the entire Flow.

I agree this is necessary. Your way seems to need to rewriting huge amounts of code. (maybe not?) I think, either the FlowCollector interface would need to be extended (as you suggested), or... not sure, but either overloading all operators, or changing all existing. I don't think the provided basic idea can well be built on... and I don't think that a feature that requires rewriting the whole codebase has much chance of being integrated...

How do you propose flatMap would fix things? The main issue is that different parts of the flow are completely unrelated, and I cannot put something deep 'inside' the flow (upstream) to pause it there. Any pausing would just be local, at the very 'end' of the flow. (without some propagation). (What is collectPausable ? Collect with extra argument isPaused:StateFlow ?)

I agree, that val isPaused:StateFlow<Boolean> could be placed on FlowCollector instead of in the context, but I think it will be a lot less versatile, and a lot more work to implement and use. (apart from thinking things through) It could be on FlowCollector as a @InternalCoroutinesApi. (but I think it's probably cleaner on context (i.e. runs in "paused context") and even cleaner as orthogonal)

Therefor I'd not make it too easy to use pausing functionality so that developers have to think twice whether it works in their case.

I don't know if I agree with this... I think it's quite obvious that a flow is paused only when... it's paused... (on the contrary, it would be confusing if flows were paused for no reason) Pausing only works, when some is set in the context, or 'in the scope' (which includes downstream) - That seems perfectly sensible to me, and easy to learn. It's the same as cancelling (coroutines will not be cancelled, if nothing cancels them, yet people use it freely).

You would need to pass a reference to all the code that might need it, instead of just deriving the scope.

Could you please give a use case here? I can't think of a scenario right now.

Originally, it seemed like you did not want any automatic propagation through the flow. If there is, then it's fine. (still extra work to get it from the flow to the place that does the expensive work, but works)

SharedFlow provides the ability to pause upstream. But how/where would I actually set isPaused? Where's the MutableStateFlow?

The MutableStateFlow would be private in the SharedFlow implementation, and then 'merged' with CoroutinePausing from context to shareIn. (StateFlow, btw. should should just pass through normal context, as it is always considered 'unpaused' - no need to merge onto it) Here is an example (untested, partial) implementation: https://gist.github.com/Maartyl/ee0d88b4f9ecc5f816deeaccc80bf4e2

And as mentioned before, I consider it a good thing if pausing is opt-in rather than automagically there.

It is only available, not forced. I agree that forcing pausing is a horrible idea. Code that does not use it, will never be affected by it, unless it's using some function, that does want to be aware of pausing, and I believe it should not be hidden from functions, they are running in a paused context.

How would, with your suggestion, intermediate collectors pause upstream collectors? They cannot set isPaused of CoroutinePausing.

They will change context of the upstream flow. (e.g. (incorrect, but the idea)) https://gist.github.com/Maartyl/b40a07e6cb265dd9037a8d5745ebc0bd

likeliness that downstream supports/uses pausing is probably low.

For now, but people will realize it's usefulness over time. It can be provided by libraries, and downstream can be other SharedFlow too, etc.

I wonder if @elizarov is already getting gray hair from this discussion 😁

... Why? :D

((

I thought about that too. But making SharedFlow pausing by default or even only allow for pausing is not a good idea. What if I do want upstream collections to be dropped instead of paused?

'Dropped' ? Do you mean, cancelled, and recollected? I'm not saying SharedFlow should not support that at all, but, even if it only supported passing CoroutinePausing to wrapped flow, you can implement recollecting with recollectWhenUnpaused in my first post. Pausing by default will never hurt anything, as pausing-aware code should be paused, when nobody is subscribed to the hot flow (SharedFlow). )) (//response to android stuff will be in another comment)

Maartyl commented 3 years ago

difference between "is paused because X" and "is paused because Y".

This is an interesting idea... Can anyone think of a situation, where it would be useful, and actually change when code should or should not be paused?

My thoughts: The coroutine (or upstream, if limited to Flow) is already running in some context, the function called by someone for some reason. (It is not called by 2 'callers' - it would be 2 different 'contexts' and 'calls') - If it is a Flow collected in 2 places, they are either independent (cold) or it's a SharedFlow, where each is a different subscriber, and this coroutine must run when at least one subscriber is unpaused, regardless of 'reason'...

Maartyl commented 3 years ago

(this comment is about my beliefs and opinions)

Having two features, both good on their own, but that combine well, tends to be a sign of good design.

Flow already has this beautiful property, that downstream provides execution context for upstream.

It makes sense to "pause execution context". (to me, at least) - And it would combine very nicely, because Flow is already made for this.

(very immature thought, probably not a good idea) A "paused execution context" can even awaitUnpaused() automatically in some places, that are 'sure' to be safe, similar to how cancellation works today. (Main problems I have thought about are: code under a Mutex; and suspension points, that are expected to {not suspend / be fast} at least in some context known by the programmer (e.g. await() is probably not safe, but yield() likely is))

Making the CoroutineContext.Key private and specific to the flow library, and only allow interacting with it through some flow API ... I'm not saying it's bad by any means, but it looks less beautiful... I'm definitely not the right person to have the final say on what design will end up being the best. :D Changing the API is similar, but probably even more work than that.

This is definitely something that needs to be thought through properly. I'm not saying it will be easy to introduce, or that it should be easy. It must be considered in all scenarios. (and, sadly, I do not have enough insight) BUT I believe it's best to first think a lot about design, and then usually implement only something quite simple and abstract, that ends up solving a lot by combining well with the rest of the features. Coroutines and Flow are definitely beautiful features with such design. (Flow is literally just a higher order function, with some rules on how the 'callback' will be invoked, and all this is built on it. :D)

(btw. I would not want to tie pausing with Jobs. They are both in context, but probably should not be 'one thing' bound together. I mean, isPaused being a property on a Job. It feels like 'complecting' two things, that don't need to be, even though I sometimes make comments about their similarities (e.g. cancelling before))

psteiger commented 3 years ago

As I commented in #2194, to achieve the use case of Motivation 2: Lifecycle (Android), what I ended up doing is, instead of relying on pausable coroutines such as the ones launched with LifecycleCoroutinesScope.launchWhenStarted {}, using coroutines that get canceled on onStop() and recreated on onStart() to leverage SharedFlow.subscriptionCount (or SharedFlows launched with shareIn(CoroutineScope, SharingStarted.WhileSubscribed(), replay)):

class Observer<T>(
    lifecycleOwner: LifecycleOwner,
    private val flow: Flow<T>,
    private val collector: suspend (T) -> Unit
) : DefaultLifecycleObserver {

    var job: Job? = null

    override fun onStart(owner: LifecycleOwner) {
        job = owner.lifecycleScope.launch {
            flow.collect {
                collector(it)
            }
        }
    }

    override fun onStop(owner: LifecycleOwner) {
        job?.cancel()
        job = null
    }

    init {
        lifecycleOwner.lifecycle.addObserver(this)
    }
}

inline fun <reified T> Flow<T>.observe(
    lifecycleOwner: LifecycleOwner,
    noinline collector: suspend (T) -> Unit
) = Observer(lifecycleOwner, this, collector)

inline fun <reified T> Flow<T>.observeIn(
    lifecycleOwner: LifecycleOwner
) = Observer(lifecycleOwner, this, {})

Then I launch such coroutine with:

sharedFlow.observe(lifecycleOwner) {
    // ...
}

In my opinion, it would be nice, however, to have a built-in, more generalistic, non-Android-specific concept of pausable coroutines that integrates well with all the coroutines library, as proposed in this issue.

tom-pratt commented 3 years ago

@psteiger this solution makes good sense to me. It seems like this is more often what you want than the existing asLiveData() implementation. Are you aware of any edge cases where your solution is not desirable or causes unnecessary work? Does it cancel and restart on rotation? Does it work with a SingleLiveEvent type of SharedFlow (replay=0) without potentially missing events if they are emitted exactly while activity is being recreated? I was wondering if it's worth implementing something like observeWhileCreated (wrapper around asLiveData) and observeWhileStarted (your implementation).

Others: why is pausing preferable over cancelling and recreating? It feels like shareIn + MutableSharedFlow/subscriptionCount + psteigers start/stop collection cover all the use cases i can think of.

psteiger commented 3 years ago

Hi @tom-pratt ,

Are you aware of any edge cases where your solution is not desirable or causes unnecessary work?

No edge cases that I know of so far.

Does it cancel and restart on rotation?

That depends solely on the SharingStarted parameter used in shareIn(). A delay, if any, can be configured there.

Does it work with a SingleLiveEvent type of SharedFlow (replay=0) without potentially missing events if they are emitted exactly while activity is being recreated?

Hum... that's a tough one. I think the first collector of a shared flow with replay 0 is guaranteed, by design, to get all emitted values. Only subsequent collectors can miss events. So, if a shared flow with replay 0 emits some value during activity recreation and there is no collector, the first collector to (re)appear will receive it. But don't quote me on that, I'd need to do some testing to be 100% sure no events would be lost.

I was wondering if it's worth implementing something like observeWhileCreated (wrapper around asLiveData) and observeWhileStarted (your implementation).

I think having built-in extension functions would be nice as this is such a common use case on Android.

Others: why is pausing preferable over cancelling and recreating? It feels like shareIn + MutableSharedFlow/subscriptionCount + psteigers start/stop collection cover all the use cases i can think of.

Maartyl commented 3 years ago

@tom-pratt , everyone

If a SharedFlow is observed (@psteiger method) from multiple places, it is possible, that only one will get the event, and not the rest.

Does it cancel and restart on rotation?

If you mean configuration change, then any solution will necessarily have to recollect, as the bound view is destroyed and new created. - Only the SharedFlow inside ViewModel will survive. (as it replaces LiveData, that had to be stored on ViewModel as well, even for asLiveData())

Configuration change (=rotation) (Activity being recreated) is less related. (The lifecycle will be destroyed, and new collect will run in a new scope anyway.)

Pausing, however, can deal with configuration change (=rotation) better! Without timeouts, The whole SharedFlow 'graph' inside ViewModel will be cancelled, and recreated on a configuration change. This will trigger recomputing everything, even if some replay is available. (as the underlaying flow was cancelled, and must be recollected now)

I agree that CoroutinePausing is mostly a systemic "performance optimization" which also provides a nice 'API' for some problems (but those could of course be solved in another way, but that is true about everything, and all libraries)

I agree it's not outright needed for Android, but I consider the observe=recollect solution to be more of a workaround, than a proper solution.

As for using recollecting, I have a similar solution (more similar to what @fluidsonic posted): Lifecycle.launchRestarting(atLeast:Lifecycle.State){} that works similarly to what @psteiger posted, but for any suspend block, not just a Flow.collect, and for any minimal State, not just Start. (also takes a timeout and an identifying instance of my logger for unhandled exceptions, etc.)

Examples where I think CouroutinePausing is useful

Room

Imagine the following (slightly extreme, to be obvious) scenario: You have an expensive query, that takes 20 seconds to compute. - Then, you have a large pipeline, that depends on that result, and computes many other things, collected from many places. - Computing that takes another 40 seconds.

Scenario 1

Activity only becomes Stopped for a second (user steps outsdie and returns / another activity is shown over it / ...)

Why timeout is problem (on top of complicating design) you can imagine you have a source that emits a lot.

Scenario 2

Activity is rotated, forcing configuration change, and destroying and recreating the Activity.

... I don't have the time to write the whole detail, but you can imagine.

Animations

Another place where I needed pausing was animations (reacting to user action). - In the end, we removed most animations for budget reasons, but I think it would have worked well. - They would be controlled by coroutines, not even Flow. (Not animations per se, stuff was moving around in non-trivial ways without user directly causing it, but sometimes it had to be paused. - Calling it animations is close enough, and I can imagine if some were doing different kinds of animatsion, it would be useful as well.)

I cannot fully vouche for this, though, as I did not end up using it. (I am using pausing with Room)

Finally

Most importantly, I think the implementation is simple enough, that it definitely would be worthwhile. (but of course, there might be some crucial insight I'm missing.) - I think only the following is needed:

(((* regarding sanitizing Context, it might even be fine to 'leak' the merged CoroutinePausing downstream. It will always 'encompass' the downstream-outside pausing, and can only be more paused. - Which might even be useful, considering it is running inside callback of the flow: That block could be seen as paused too... - It would probably be weird to have this sole exception, though. - There could probably be problems with this, that I haven't thought about as well.)))

cbeyls commented 2 years ago

I just discovered this proposal recently, after writing an article about issues and limitations when using Kotlin Flow on Android. I invite you to read it, I think it's a good introduction to the issues this proposal would help solving on Android.

sonphan12 commented 2 years ago

There is so much useful information in this discussion. If this proposal is somehow approved, I hope there will be a detailed article about it.

kapaseker commented 2 years ago

I actually need this feature, just like: Activity paused -> flow pause collecting Activity resumed -> flow resume collecting

EDIT: so sorry, "launch when start" feet my needs. But I also want to control flow's pausing and resuming.

kworth commented 1 year ago

I just discovered this proposal recently, after writing an article about issues and limitations when using Kotlin Flow on Android. I invite you to read it, I think it's a good introduction to the issues this proposal would help solving on Android.

@cbeyls where do things now stand:

  1. Considering your part 2 (2022) https://bladecoder.medium.com/smarter-shared-kotlin-flows-d6b75fc66754
  2. Considering it's now 2023 (and therefore anything that has come after your part 2)