Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

Create way to get notified when emit or tryEmit is called on a SharedFlow whose buffer is already full #2526

Open zjuhasz opened 3 years ago

zjuhasz commented 3 years ago

I'm working on a library for sensor monitoring and using SharedFlows for data streams from the sensors. In general it's not OK to miss sensor updates and the program should be configured so the buffer never fills up. Occasionally suspending on buffer overflow can be OK but often times even that is problematic, especially if it is happening often. Basically the buffer filling up means something in the hardware setup needs to be adjusted or the update rate needs to be reduced so the program can keep up. So there are many situations where I am OK with occasionally suspending or occasionally dropping a value but I need to be able to monitor when this is happening so either:

The problem is there is only a single situation in which you can get any feedback about whether or not the buffer is full when attempting to emit and that is when onBufferOverflow is set to suspend and tryEmit is called. I need to always have access to this feedback mechanism in some way. Either through return value of tryEmit or registering a callback on the SharedFlow or watching another Flow that emits notifications. Of course return value on emit wouldn't work because the caller would be suspended.

elizarov commented 3 years ago

Can you, please, clarify why tryEmit is not working for you? E.g. you can write your own version of emit that does not all the monitoring you need:

suspend fun <T> SharedFlow<T>.monitoredEmit(value: T) {
    if (tryEmit(value)) return // ok, no problem
    callTheValueWasBufferedCallbackHere()
    emit(value)
}

Will it work for you?

zjuhasz commented 3 years ago

This wouldn't work in situations where BufferOverflow is set to BufferOverflow.DROP_OLDEST or BufferOverflow.DROP_LATEST. I'm looking for a universal method of notifying when the send rate is overflowing the buffer regardless of the buffer overflow strategy.

herriojr commented 2 years ago

+1

AmsterdamFilho commented 1 year ago

+1

rnett commented 1 month ago

Having a way to get notified of rejected ot hanging emits would be nice. However, for the use case of tracking metrics of shared flows to monitor your app, I think it would be even nicer to have a way to track the size of the buffer, as mentioned here. Something analogous to SharedFlow.subscriptionCount but for the number of buffered items would be awesome.

Honestly, this might be useful for buffers in general, even beyond shared flows.