Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

SharedFlow that emits once (but buffers if there are no collectors) #3002

Open matejdro opened 2 years ago

matejdro commented 2 years ago

Is there a way to achieve a SharedFlow that will only emit once (replay = 0 behavior), BUT it would also buffer events until at least one collector is collecting them?

Use case for this would be sending one-off events to the UI (for example to display a one-off popup to the user). I want events to be ephemeral (if flow is collected again, they should not be replayed, so replay needs to stay at zero), but I also want events delivered in case nobody is collecting at the moment (maybe UI is in the background, so flows are not collected. But they would start getting collected as soon as UI comes into foreground)

From LiveData world, this would be similar to the SingleLiveEvent.

My current solution is to enable replay, but manually clear the replay cache after every item is emitted downstream:

private val _eventFlow = MutableSharedFlow<T>(replay = 999)
val eventFlow = _eventFlow.onEach { _eventFlow.resetReplayCache() }

This works, but I do not really like this solution, since it performs clearing on every item and it's probably not that efficient.

Another solution is using channels:

private val _eventChannel = Channel<T>(Channel.BUFFERED)
val eventFlow get() = _eventChannel.receiveAsFlow()

While above works, it only emits to one collector (if multiple collectors are awaiting for the UI events, only one will get the events), which is likely fine in the UI scenario I described above (only one screen is listening for events).

Is there a better way to achieve this that I am missing? Would it make sense to add parameter such as that to the MutableSharedFlow?

psteiger commented 2 years ago

What you can also do is waiting (suspending) until there is at least 1 collector before emitting to such flow.

I haven't put much thought about it, but beware race conditions. E.g. what if a subscription appears before the check and disappears before the emit?

suspend fun <T> MutableSharedFlow<T>.emitWhenSubscribedTo(value: T) {
    // fast path
    if (subscriptionCount.value > 0) {
        emit(value)
        return
    }
    // slow path
    subscriptionCount.first { it > 0 } // suspend until true
    emit(value)
}
jemshit commented 2 years ago

I need this too

elizarov commented 2 years ago

The solution with the channel is how you are supposed to it.

circusmagnus commented 2 years ago

The solution with the channel is how you are supposed to it.

Channel will not work in case of multiple subscribers - only the first subscriber will receive an event.

In such a case - an event emitter, which is being subsribed by multiple Android Fragments / Composables / Activities - I use an event-wrapper construct.

Event wraper:

/** Event designed to be delivered only once to a concrete entity,
 * but it can also be delivered to multiple different entities.
 *
 * Keeps track of who has already handled its content.
 */

class OneTimeEvent<out T>(private val content: T) {

    private val handlers = CopyOnWriteArraySet<String>()

    /** @param asker Used to identify, whether this "asker" has already handled this Event.
     *
     * @return Event content or null if it has been already handled by asker
     */
    fun getIfNotHandled(asker: String): T? = if (handlers.add(asker)) content else null

}

Entities are identified by String. So that, if your fragment disappears after rotation and new fragment instance subcribes - it will identify itself by the same String id and will not receive already handled events again.

Such wrapped events can be emitted by ordinary SharedFlow with a non zero replay capacity - so SharedFlow will buffer events even if no one is listening to it.

It is helpful to use custom Flow operator to easily unwrap such OneTimeEvents:

fun <T> Flow<OneTimeEvent<T>>.filterNotHandledBy(consumerId: String): Flow<T> = transform { event ->
    event.getIfNotHandled(consumerId)?.let { emit(it) }
}

It will filter a Flow of OneTimeEvent emitting only non-handled events further down the stream and unpacking "real" events from the OneTimeEvent wrapper.


Still, if there is no need for multiple subscribers to such an event flow - it is much easier and problem-free to just use a Channel.

zeeshanrasool91 commented 2 years ago

@circusmagnus can you share 1 usage example for your proposed solution

circusmagnus commented 2 years ago

Sure thing. Interface to expose for clients

interface SharedEvents<out T> {

    fun getFor(consumerId: String): Flow<T>
}

Class into which one can push new events:

class EventQueue<T>(capacity: Int) : SharedEvents<T> {

    private val innerQueue = MutableSharedFlow<OneTimeEvent<T>>(replay = capacity)

    suspend fun push(event: T) {
        innerQueue.emit(OneTimeEvent(event))
    }

    override fun getFor(consumerId: String): Flow<T> = innerQueue.filterNotHandledBy(consumerId)
}

Usage on the event origin side:

        private val eventQueue = EventQueue<Event>(capacity = 64)
        val events: SharedEvents<Event>
           get() = eventQueue
    ...
   eventQueue.push(someEvent)

Usage on the client side (multiple subscribers allowed):

companion object {
        private const val TAG = "SomeFragment"
        }

    someViewModel.events
            .getFor(TAG) // TAG 
            .onEach { event -> event.handle() }
            .launchWhileStarted(lifecycle)

I have used it most commonly, when we have an android activity and a collection of fragments attached to it. One fragment allows the user to login (say, after 2fa auth is done and auth tokens are received) and both activity and other fragments should do something in response to being logged in - but they should react only once.

Nek-12 commented 1 year ago

Looks like I have exactly this problem too, but the conversation seems to be progressing slowly. Is there any workaround for this, or has anyone found a solution? I see this issue has a lot of upvoters too

danielesegato commented 1 year ago

Hi everyone, I've written this gist with a SharedFlow that emits to multiple subscribers but buffer if there are no collectors at any given time. I called it ChannelSharedFlow, not the best name but it does what promises.

https://gist.github.com/danielesegato/160fabdcba5f563f1a171012377ea041

I'm not sure if this is what OP intended. I kinda wish this was part of the kotlinx.coroutine library.

circusmagnus commented 1 year ago

I think, that it is difficult to incorporate such solution to public library, no matter, what you do, trying to emit events to multiple subscribers, which may disappear and reappear has some fundamental problems, which are unsolvable for all cases.

Say we have a 2 scenarios: A. Android orientation change - several subscribing activities / fragments stop collecting and, after some small delay, new instances of them start collect again. B. After user clicks a button, a completely new view appears on the screen and starts listening for shared events

Solution with EventWrapper (SharedFlow with non-zero replay buffer underneath): Scenario A: new Instances of Act/Fragments will receive all events (each one will get all events), which were produced, while they were "absent". But only those, which were not handled already by them, when they were collecting before. Sounds good, works well Scenario B: A completely new View may receive events, which were produced far back in time and are completely irrelevant now. But they were not obviously handled by this completely new View, so they need to be handled now. Not good. And how much of a buffer we should have? How much of a history should we replay? Hard to guess.

Solution with ChannelSharedFlow (SharedFlow with zero replay, but collecting from a Channel) : Scenario A: First appearing instance of Activity/Fragment will cause a Channel to be drained of events (which appeared during "no -collector time") and published. However other Activities/Fragments are likely to appear just a few milliseconds later and they will not receive those events, which appeared during "no -collector time". That is not good, they will miss events, which they were meant to handle. Scenario B: A completely new View will only receive new events (unless it is the first collector to appear) That should be ok. This View is new, it should not get those old events anyway.

So IMHO either solution works fine in some use cases, but not in all of them.

Check out Manuel Vivo article on One-Off event antipatterns: https://medium.com/androiddevelopers/viewmodel-one-off-event-antipatterns-16a1da869b95

I think he is right. One cannot guarentee only-once delivery, no-missed events and handle mutliple disappear-appear subscribers at the same time. Or at least not without timestamps. event versioning and such things, which are already handled by specialized solutions like Spark or Kafka

Either we may have events with one producer-one consumer implemented via a Channel with buffer. With all the Channel guarentees (and gotchas).

Or, if we want to handle multiple subscribers, which may disappear and reappear, then publishing a State (via StateFlow), rather than One-Off events is the solution. State, unlike Event, is always valid until it is replaced by newer State. So there is no problem with delivering it twice. Or delivering it too late.

danielesegato commented 1 year ago

Thanks for your answer @circusmagnus It is indeed a tricky problem.

I believe scenario A and B are intrinsically separate. I cannot think of a scenario where a fragment recreating and a single Button would want to receive the same event flow.

In most cases I had the need to receive events rather than state it was to show errors or something that recently happened (ex. someone joining a session).

And for these cases it is important not to lose data while there's no subscription. But it is also important to have a zero-replay. You don't want to show an error shown minutes ago just because the user rotated the app.

Is this where clearReplayCache should be used? (Aka replay = 1 but after a few seconds of delivering clearing it?)

I think it would be nice if this was automatic. The scenario is common enough in my opinion to be part of the coroutine library.

The ChannelSharedFlow try to cover this use case. And of course it is very specific.

As you notice, it doesn't guarantee that all reattaching subscribers will get the event received while detached, only the first subscriber will. Which means it is probably not suited for multiple subscribers unless you accept this limitation.

There's somewhat something common to all these scenarios: the subscriber knows what it wants to receive rather than the producer. But I don't think there's a way to communicate it to the producer.

Would it be easier to keep data for a grace period in the replay cache during which it is considered "fresh" and replayed to all reattaching subscribers if the event is still fresh?

I'm not sure it seems like you'll never be able to find the perfect grace period duration.

I feel like my implementation cover a need and would be nice if it could remember which subscriber already received an item and avoid resending it while still providing it to another that didn't yet get it.

Would be even better if the SharedFlow could somehow distinguish between a returning subscription and a new one and decide if it should be replaying events based on that.

I wouldn't know how to implement such a SharedFlow to be honest.

mo0rti commented 1 year ago

I had a problem like this with ShareFlow. The workaround was by adding a delay(1) Update: adding yield() works for me. Thanks to @pablichjenkov

suspend fun mainSharedFlow(
    coroutineScope: CoroutineScope
) {
    val dataFlow = MutableSharedFlow<Int>()

    // First consumer
    coroutineScope.launch {
        dataFlow.collect {
            println("Consumer 1: $it")
        }
    }

    // Second consumer
    coroutineScope.launch {
        dataFlow.collect {
            println("Consumer 2: $it")
        }
    }

    yield()  // Added this to fix the issue

    // Emit data
    dataFlow.emit(1)
    dataFlow.emit(2)
    dataFlow.emit(3)
}

and code in

override fun onCreate(savedInstanceState: Bundle?) {

        // ....

        lifecycleScope.launch {
            repeatOnLifecycle(Lifecycle.State.STARTED) {
                mainSharedFlow(this)
            }
        }
}
pablichjenkov commented 1 year ago

Instead of using delay you can give it a try calling yield

dkhalanskyjb commented 1 year ago

Note that the workaround will only work reliably for single-threaded dispatchers.