Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.05k stars 1.85k forks source link

[Proposal] Primitive or Channel that guarantees the delivery and processing of items #2886

Open manuelvicnt opened 3 years ago

manuelvicnt commented 3 years ago

Due to the prompt cancellation guarantee changes that landed in Coroutines 1.4, Channels cannot be used as mechanism that guarantees delivery and acknowledges successful processing between producer and consumer in order to guarantee that an item was handled exactly once.

There's a need for another primitive (or improvements to Channel) that also guarantees that the item was successfully processed by the consumer.


Use case: UI Events in Android

In Android development, ViewModel-like classes sometimes need to inform the View that it needs to perform an action. For example, showing a Toast. This is needed, for example, when stateful Views don't expose its internal state—as it happens with most View APIs.

Channels cannot be used as a way to communicate these types of events to the View because those events could be lost under certain circumstances. That is: 1) the producer (ViewModel) sends the event, 2) the consumer (View) receives the event which is scheduled for dispatch, but then, 3) the consumer is cancelled. The event was received but never processed.

The onUndelivered handler support recently added to Channels doesn't work in this case because it messes up with the events ordering.

elizarov commented 3 years ago

An important clarification. It did not become broken in kotlinx.coroutines 1.4 with the introduction of prompt cancellation. It never worked in the first place.

Before we used to have atomic cancellation. So what was happening with UI Events in Android with channels is that one piece of code sends an event to a channel, which is being listened to by some UI. Now, while this event is being dispatched, the corresponding view gets destroyed, but, because of atomic cancellation, the view still receives the event. This typically led to the crash of the Android application when it was trying to show some notification on an already-destroyed view.

After 1.4, with prompt cancellation, the same scenario leads to the event being ignored (lost) instead of crashing the app.

Neither is good, so we need a solution. I'll list a few different things we can do in the follow-up responses.

elizarov commented 3 years ago

The most trivial solution is to rely on the specifics of how view lifecycle works in Android and on Dispatchers.Main.immediate. The solution has a few additional tweaks in addition to the channels:

  1. You send events to the channel from either Dispatcher.Main or from Dispatchers.Main.immediate. That is, instead of simple channel.send(event) you do Dispatchers.Main.immediate { channel.send(event) }.

  2. You receive events from Dispatchers.Main.immediate (sic!). That is, do something like:

    channel.receiveAsFlow()
        .onEach { event -> processEventInUI(event) }
        .launchIn(Dispatchers.Main.immediate) // immediate is the key here!

The idea behind this trick is that views can be destroyed only in between events, but due to the details of Main.immediate the event that was posted from Main will get processed in the same event, so the unhappy path of losing event can never happen.

elizarov commented 3 years ago

Another solution is to introduce a dedicated primitive (not a channel!) that can handle these kinds of "exactly once events" despite cancellation. The idea is that the receiver has to get a permit to receive an event first and, if it is not cancelled, retrieve the event from the underlying buffer. This idea can be directly implemented on top of a synchronized array deque and a semaphore (credit goes to @ndkoval). You can write a trivial implementation of such ExactlyOnceEventBus:

import kotlinx.coroutines.sync.*

class ExactlyOnceEventBus<T> {
    private val buffer = ArrayDeque<T>()
    private val semaphore = Semaphore(Int.MAX_VALUE, Int.MAX_VALUE)

    fun send(event: T) {
        synchronized(buffer) { buffer.add(event) }
        semaphore.release()
    }

    suspend fun receive(): T {
        semaphore.acquire()
        return synchronized(buffer) { buffer.removeFirst() }
    }
}

With this implementation, eventBus.send(event) can be called from any thread, while eventBus.receive() will consume an event only when it was not cancelled.

The implementation is trivial. The challenge is that it is still something that many developers might have to repeat over and over, so it might make sense to provide some ready-to-use primitive with it. Since we expected that people would like to use it as a Flow it looks like providing some kind of a special ExactlyOnceEventFlow (name TBD) in kotlinx.coroutines might be a good idea.

manuelvicnt commented 3 years ago

Thanks @elizarov ! Something like ExactlyOnceEventBus would be awesome to have as a short-term solution (or even long-term to make Channels simpler). Our team could even help maintaining the code in case you're busy.

We didn't want Flow to be exposed as a type (e.g. ExactlyOnceEventFlow) since that interferes with the backing semantics of the EventBus or Channel. Imagine that a collector receives the event, and then filters it out using the Flow.filter operator, that event would be lost.

Do you think this is something that could go to the next version of coroutines? Thank you!

elizarov commented 3 years ago

We'll need help with the design. If it's not a Flow, then what operators it needs to define? Just having a receive function (as in the trivial implementation above) seems to be very low-level. We'll need some examples of real-life code that is having this exactly-once problem to see what kind of code patterns are used there so that we can design a convenient set of operations.

PierluigiFimiano commented 1 year ago

Hi,

Just a question, does a ShareFlow with an infinite buffer solve the problem?

ivanbartsov commented 1 year ago

@PierluigiFimiano I don't think it does. https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-shared-flow/

image

If I read this correctly, this basically means that using a infinitely buffered SharedFlow as a queue of events only works if there's a subscriber at all times. If at any point all subscribers disconnect, the buffer does nothing and the next subscriber will only get the latest replay items -- so delivery guarantee of each produced item is not met.

PierluigiFimiano commented 1 year ago

Yes you are right, but what about channel with conflated or unlimited capacity? In any case the behaviour is a little bit tricky

gmk57 commented 1 year ago

Channels were considered a solution to this problem in the past, but they still may lose events in some cases, as explained in the first posts of this thread. And having more capacity does not help here: channels just don't have the mechanics to reliably know if the event was processed.

PierluigiFimiano commented 1 year ago

Got it! The channels can be cancelled while processing an event that's the problem! Ok I think that I'll adopt the solution proposed by @elizarov, executing send and receive in the main thread immidiate. Thank you!

volo-droid commented 1 year ago

Imagine that a collector receives the event, and then filters it out using the Flow.filter operator, that event would be lost.

@manuelvicnt what's exactly bad with collector filtering out events it doesn't want to handle?

IMHO Flow looks like a perfect replacement for the code that migrates away from the existing LiveData solutions for handling UI Events. If Android team decides it's too flexible (or error-prone) for the discussed scenario then some simpler androidx interface can be built on top of the proposed ExactlyOnceEventFlow.

tPl0ch commented 11 months ago

If I read this correctly, this basically means that using a infinitely buffered SharedFlow as a queue of events only works if there's a subscriber at all times. If at any point all subscribers disconnect, the buffer does nothing and the next subscriber will only get the latest replay items -- so delivery guarantee of each produced item is not met.

@ivanbartsov wouldn't it be enough to always attach something like a NullSubscriber, that is always active, but does nothing with the messages, to make sure there is always a subscriber present?

gmk57 commented 11 months ago

I guess this won't help: new ("real") subscribers will only get items from replay, not from buffer. Buffer is used for subscribers which are present, but "not ready to accept the new value" (suspended) at the time when it's emitted.