Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
13.06k stars 1.85k forks source link

throttleFirst Flow operator #1446

Open sum-elier opened 5 years ago

sum-elier commented 5 years ago

Is there a Flow equivalent for this Reactive stream operator? I guess it might be derived from the sample operator.

Is it possible to implement this operator using only the publicly exposed api? I tried to modify the sample operator implementation but since some parts of it are internal it isn't possible.

qwwdfsad commented 5 years ago

Is there a Flow equivalent for this Reactive stream operator?

No, but we have plans to implement its analogue (actually, the whole family of time-based sampling).

Is it possible to implement this operator using only the publicly exposed api?

Modulo some cancellation details. For ad-hoc solution you can replace scopedFlow with coroutineScope { flow { ... } }

sum-elier commented 5 years ago

I see, thanks. What are your plans regarding custom operator implementation? I am still learning Flow API, but this makes me think that not every operator could be implemented because of internal API, is it?

elizarov commented 5 years ago

Every operator should be functionally implementable via some public API. Internal API is only needed for highly-efficient (optimized) implementations. In particular, throttleFirst is relative easy to implement. See here quite a straightforward implementation: https://pl.kotl.in/kKgI484X-

sum-elier commented 5 years ago

Thanks for the response. Your implementation seems to work alright, but if I were to use it with TestCoroutineScope I wouldn't be able to advanceTime during tests. Is there a way to solve this?

PS : I came with this implementation also for throttleFirst, would it be enough or is it missing something (e.g. coroutineScope?)

@FlowPreview
@ExperimentalCoroutinesApi
fun <T> Flow<T>.throttleFirst(windowDuration: Long): Flow<T> = flow {
   var lastEmissionTime = 0L
   collect { upstream ->
      val currentTime = System.currentTimeMillis()
      val mayEmit = currentTime - lastEmissionTime > windowDuration
      if (mayEmit)
      {
         lastEmissionTime = currentTime
         emit(upstream)
      }
   }
}
elizarov commented 5 years ago

@fjna You'll need additional code to support TestCoroutineScope. Instead of using System.currentTimeMillis() you shall use the following snippet:

val time = (coroutineContext[ContinuationInterceptor] as? DelayController)?.currentTime ?: System.currentTimeMillis()

As for your implementation it looks fine apart from the code style (extra new later before {). It has different behavior which might be exactly what you wanted, though.

Btw, that's the key challenge in providing time-related operators out-of-the-box, since there are some many subtly different ways to implement them.

zach-klippenstein commented 5 years ago

It seems like it would be appropriate for all the time based operators to use the standard library's new Clock API, since it's already multiplatform and uses the correct time source for time measurement (currentTimeMillis can move backwards). It might even make sense for CoroutineDispatchers in general to be associated with a Clock, which could (I think, if the testing support used the API as well) eliminate the need for the special-casing for test context support:

val clock = (coroutineContext[ContinuationInterceptor] as? CoroutineDispatcher)?.clock ?: MonoClock

This would obviously be a big, ambitious change involving coroutine internals, but for now at least I think the time operators could still use Clock internally and to abstract out the test context support.

elizarov commented 5 years ago

@zach-klippenstein Yes. Created a separate issue for it: #1499

pavlospt commented 4 years ago

Is there any activity related to this issue going on 😄 ?

elizarov commented 4 years ago

Is there any activity related to this issue going on 😄 ?

Not yet.

lukas1 commented 4 years ago

@elizarov It looked a little bit difficult to prepare your code snipped for tests, or at least I didn't find any easy way, so I've tried to implement it a little bit differently. Maybe this can be useful for other folks too!

I'm worried that maybe I have overlooked something, though. Do you think this implementation could work? Are there any inherent disadvantages to it? (such as leaking some scope or something similar).

EDIT: One limitation I am aware of, is that when collecting an indefinite flow (such as BroadcastChannel), after first item is emitted, and nobody closes the channel in the meantime, the internal coroutine needs time to get finished. In tests then, one needs to move time of TestCoroutineDispatcher, otherwise unfinished coroutine exception is thrown in test. That is definitely not ideal for use in the Coroutines library itself, but for my project that's an acceptable limitation.

fun <T> Flow<T>.throttleFirst(windowDuration: Long): Flow<T> {
    var job: Job = Job().apply { complete() }

    return onCompletion { job.cancel() }.run {
        flow {
            coroutineScope {
                collect { value ->
                    if (!job.isActive) {
                        emit(value)
                        job = launch { delay(windowDuration) }
                    }
                }
            }
        }
    }
}

and I have following tests for it:

    @Test
    fun throttleFirst() = runBlockingTest {
        val testDispatcher = TestCoroutineDispatcher()
        val values = mutableListOf<Int>()
        val flow = (1..10).asFlow().onEach { delay(200) }
        flow
            .throttleFirst(500)
            .flowOn(testDispatcher)
            .onEach { values.add(it) }
            .launchIn(this)

        testDispatcher.advanceTimeBy(2000)

        assertEquals(listOf(1, 4, 7, 10), values)
    }

    @Test
    fun throttleFirstWithError() = runBlockingTest {
        val testDispatcher = TestCoroutineDispatcher()
        val values = mutableListOf<Int>()
        val flow = (1..10).asFlow()
            .onEach { delay(200) }
            .onEach { if (it == 2) throw IllegalStateException() }
        flow
            .throttleFirst(500)
            .flowOn(testDispatcher)
            .onEach { values.add(it) }
            .catch {  }
            .launchIn(this)

        testDispatcher.advanceTimeBy(400)

        assertEquals(listOf(1), values)
    }
hoc081098 commented 2 years ago

please check my lib https://github.com/hoc081098/FlowExt

thibseisel commented 2 years ago

To whoever still looking for the Flow equivalent of throttleLatest, here is how to implement them by composing existing operators:

fun <T> Flow<T>.throttleLatest(delayMillis: Long): Flow<T> = this
    .conflate()
    .transform {
        emit(it)
        delay(delayMillis)
    }

Many thanks to the person that gave me this solution on the Kotlin Slack.

flamewave000 commented 1 year ago

Also to whoever is looking for this, I've made a simple helper method called throttle for creating an intermediate flow that essentially does the same as @thibseisel's solution.

fun <T> Flow<T>.throttle(periodMillis: Long): Flow<T> {
    if (periodMillis < 0) return this
    return flow {
        conflate().collect { value ->
            emit(value)
            delay(periodMillis)
        }
    }
}

Can be used as follows:

flow {
    for (num in 1..15) {
        emit(num)
        delay(25)
    }
}.throttle(100)
 .onEach { println(it) }
 .collect()
 // This will print 1, 5, 9, 13, 15

Not sure what difference it would make compared to the above solution, but meh.

PenzK commented 1 year ago

Every operator should be functionally implementable via some public API. Internal API is only needed for highly-efficient (optimized) implementations. In particular, throttleFirst is relative easy to implement. See here quite a straightforward implementation: https://pl.kotl.in/kKgI484X-

Why windowStartTime += delta / windowDuration * windowDuration but not just windowStartTime += delta?

dkhalanskyjb commented 1 year ago

This way, the moments when the emissions are allowed are [startTime, startTime + windowDuration, startTime + windowDuration * 2, startTime + windowDuration * 3, ...]. If it was just windowStartTime += delta, the emission times would be [startTime, timeOfFirstEmission + windowDuration, timeOfSecondEmission + windowDuration, ...].

Andrew0000 commented 1 year ago

Opened this topic in 2023, because there is still no throttleLatest in the standard kotlinx.coroutines lib (1.6.4).

@thibseisel thank you for your concise implementation.

@elizarov please consider to change your attitude, Kotlin was advertised as a concise language, I guess many developers came to Kotlin because they didn't like a Java's verbosity. Also many of them look at Coroutines/Flow as a replacement of RxJava (it doesn't mean it's better, but advertisement is definitely higher). So it's really surprising when you have to write such basic operators every time like if you find yourself in 2010. By the way it's really convenient how Kotlin team implemented hints about other rx operators. Thank you for this.

p.s. For anyone who is not sure about the solution, I wrote a quick test (one more time consumer, because we wouldn't spend time for testing of std kotlinx.coroutines lib): https://gist.github.com/Andrew0000/6d4477642cfadc7822742bd7986e79f4

Mikkelet commented 3 months ago

To whoever still looking for the Flow equivalent of throttleLatest, here is how to implement them by composing existing operators:

fun <T> Flow<T>.throttleLatest(delayMillis: Long): Flow<T> = this
    .conflate()
    .transform {
        emit(it)
        delay(delayMillis)
    }

Many thanks to the person that gave me this solution on the Kotlin Slack.

This approach will just delay subsequent emits by the timelimit, not filter them out.

If you want to throttle user inputs, such as double taps, this approach does NOT work because their second tap will still register, albeit with delay

dkhalanskyjb commented 3 months ago

@Mikkelet, that's incorrect. Please run this code:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() {
    val flow = MutableSharedFlow<Int>()
    runBlocking {
        launch {
            repeat(12) {
                delay(100)
                flow.emit(it)
            }
        }
        launch {
            withTimeout(1500) {
                flow.throttleLatest(500).collect {
                    println(it)
                }
            }
        }
    }
}

fun <T> Flow<T>.throttleLatest(delayMillis: Long): Flow<T> = this
    .conflate()
    .transform {
        emit(it)
        delay(delayMillis)
    }

https://pl.kotl.in/sKI0zkHq9

Andrew0000 commented 3 months ago

@Mikkelet take a look at the test: https://gist.github.com/Andrew0000/6d4477642cfadc7822742bd7986e79f4

Mikkelet commented 3 months ago

@Mikkelet, that's incorrect. Please run this code:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() {
    val flow = MutableSharedFlow<Int>()
    runBlocking {
        launch {
            repeat(12) {
              delay(100)
                flow.emit(it)
            }
        }
        launch {
            withTimeout(1500) {
                flow.throttleLatest(500).collect {
                    println(it)
                }
            }
        }
    }
}

fun <T> Flow<T>.throttleLatest(delayMillis: Long): Flow<T> = this
    .conflate()
    .transform {
        emit(it)
        delay(delayMillis)
    }

https://pl.kotl.in/sKI0zkHq9

I tried setting repeat number to 2 and still got two events. Shouldnt I only get one? Or am I misunderstanding it? :S

dkhalanskyjb commented 3 months ago

@Mikkelet, I think I was misunderstanding your point; in that case, sorry. If you set the number to 2, or 3, or 4, you will get two events. Events do get filtered out: at most one event in the time window after an emission will be preserved. If you want it to be at most zero, then yes, that solution will not work. You probably want this one, then: https://github.com/Kotlin/kotlinx.coroutines/issues/1446#issuecomment-625244176

freaksgit commented 1 week ago

@Mikkelet for dropping events that are coming during time window you can combine dropIfBusy() + delay(...)

fun <T> Flow<T>.dropIfBusy(): Flow<T> = channelFlow {
    collect { trySend(it) }
}.buffer(0)
fun <T> Flow<T>.throttleLatest(delayMillis: Long): Flow<T> = this
    .dropIfBusy()
    .transform {
        emit(it)
        delay(delayMillis)
    }