apple / swift-async-algorithms

Async Algorithms for Swift
Apache License 2.0
3.02k stars 151 forks source link

`throttle` does not emit the latest element at interval occurrence when latest is `true` #266

Open tarbaiev-smg opened 1 year ago

tarbaiev-smg commented 1 year ago

Unlike similar operator in Combine the throttle's latest parameter is confusing, as it does not emit any latest, but a next element instead.

When latest parameter is set to true

Actual behavior:

The throttle emits the next element from the base sequence, if any, after the interval occurs. Meaning if there's no new elements emitted after the interval occurrence, the latest element is just skipped, leading to only the first element being emitted.

Expected behavior:

The throttle emits the latest cached element instantly when the interval occurs.

notcome commented 1 year ago

I think the semantics is very unclear on this one. Let's say we publish 0...99 every 100ms with a throttling window of 1 sec.

That feels very strange. I agree that at least there should be an option to produce the last element, if this element was initially skipped in the throttling window. But is there a clear semantics for this concept?

tarbaiev-smg commented 1 year ago

@notcome given the Combine and Swift Concurrency interoperability and switching from Publisher to AsyncSequence, I think AsyncAlgorithms's behavior should be consistent with Combine as much as possible. I had to switch to the Combine's throttle because of this flaw. If I had an AsyncSequence instead of a Publisher, I would need to convert it to a Publisher, apply throttle and convert back to AsyncSequence, which is annoying and inefficient.

notcome commented 1 year ago

@tarbaiev-smg So I did a little search on RxSwift, and it seems that something as simple as throttle actually has lots of subtle variations.

Anyway, it seems that adding a clock task, which is required to publish the latest element after signal stabilizing, is very tricky. The current throttle implementation has less than a hundred lines of code, whereas bounce spans near 2k LOC. Stunning…

I can't use Combine here because I am moving to concurrency infras for testing. So I end up focusing on my particular case — infinite demand, no error handling — and get away with a very simple actor. It's quite tricky though, Task.init captures your actor without warning (you don't need to write explicit self) and gets cancellation right is hard.

Kolos65 commented 1 year ago

One would indeed expect to receive the last element when using throttle. A very common use case for example is to throttle a sequence of progress values where receiving the last element (i.e. 100%) is crucial.

In this example:

var numbers = Array((0...100).reversed())
let stream = AsyncStream {
    try! await Task.sleep(nanoseconds: 50_000_000)
    return numbers.popLast()
}

Task {
    let sequence = stream
        .throttle(for: .seconds(1))
    for try await num in sequence {
        print(num)
    }
}

The output is:

0 19 38 58 77 96

Which is unexpected as the latest element is 100.

The problem is that in the AsyncThrottleSequence implementation we can wait for the next element longer then interval which can result in the latest element being 'stuck'. Wouldn't it make sense to add a timeout when awaiting the base sequence's next() so that the latest element is always emitted after interval amount of time passed?

public mutating func next() async rethrows -> Reduced? {
    var reduced: Reduced?
    let start = last ?? clock.now
    repeat {
        // Return reduced if we wait longer then interval (and reduced was not returned previously)
        guard let element = try await base.next() else {
            return nil
        }
        let reduction = await reducing(reduced, element)
        let now = clock.now
        if start.duration(to: now) >= interval || last == nil {
            last = now
            return reduction
        } else {
            reduced = reduction
        }
    } while true
}
kdubb commented 1 year ago

I also find throttle's implementation confusing. In addition to the behavior of 'latest', given infinite demand, the loop consumes available processing. For example an AsyncSequence that is simply a counter returning immediately the next iteration's value.

Maybe I am expecting something I shouldn't but throttling by rapidly consuming a stream's elements, only to discard them, seems incorrect. Throttling should handle the case of over and under producing streams without a severe performance penalty in either case.

kdubb commented 1 year ago

Just as a use case, I attempted to use throttle to sample the audio level from an AVAudioRecorder; which immediately returns the current value. During a simple test I realized it was consuming values at an unchecked pace.

FranzBusch commented 1 year ago

@kdubb I think that is the expected behaviour from throttle though. It will apply infinite demand as long as it has demand on itself. If you have a root asynchronous sequence that produces infinite values instantly then throttle will produce exactly one value at every interval.

I think you want another algorithm for what you describe here.

kdubb commented 1 year ago

The issue for me is that I was switching from a slow producing stream to an infinite producing stream and you need something that can deal with both cases. I attempted to use timer sequence and withLatest (from AsyncExtensnions) but that has the same issue with infinite streams (it loops in a similar way to throttle).

Finally, I flatMapd the timer sequence to the infinite stream to get a reasonable implementation but this all required knowing exactly the production timing of each stream.

To me the name throttle is expressly saying that it is reducing the production of the source stream; but currently it's not. If that just not how it's ever going to work then you are correct we need another algorithm.

Maybe we create one called 'polling' that uses a Clock to sleep until it acquires the next element from the source stream; and if calling next takes too long it immediately pulls. But if we have polling that works as I described, what use would there be for throttle?

kdubb commented 1 year ago

Also, adding a doc - Note: to this, and any other algorithms that will runaway with an infinitely supplying stream, would help users with their expectation and avoid unexpected performance issues when using these algorithms.

FranzBusch commented 1 year ago

@phausler just put up a PR https://github.com/apple/swift-async-algorithms/pull/292 to change the behaviour. We would appreciate if you all could check that it is doing what you expected it to do now.

tarbaiev-smg commented 1 year ago

@FranzBusch Thanks for the update! However the PR does not seem to be addressing this issue. The AsyncThrottleSequence.next() is still awaiting the next element of its base sequence even after the interval ends. This means it won't emit an actual last element of the last interval, but will emit the first element of the subsequent interval instead.

Kolos65 commented 1 year ago

Yes, while the PR addresses issues in cases where the sequence finishes (by returning nil), but as @tarbaiev-smg said, we can still await for the next element longer than interval which means that the last element wont be emitted.

Here is an updated example of the issue that uses a sequence that waits for the next element longer and is not finishing:

var numbers = Array((0...200).reversed())
let stream = AsyncStream<Int> {
   guard let number = numbers.popLast() else { return nil }
   if number == 101 {
        try! await Task.sleep(nanoseconds: 1_000_000_000_000)
    } else {
        try! await Task.sleep(nanoseconds: 50_000_000)
    }
    return number
}

Task {
    let sequence = stream
        .throttle(for: .seconds(1))
    for try await num in sequence {
        print(num)
    }
}

prints:

0
20
39
58
78
97
<long break>
<remaining numbers>

instead of:

0
20
39
58
78
97
100
<long break>
<remaining numbers>
FranzBusch commented 1 year ago

@Kolos65 Thanks for the example. Could you do me a favour an try to write a validation test that represents what currently is not working like you expect it to work. Validation tests look like this

    validate {
      "abcdefghijk|"
      $0.inputs[0].throttle(for: .steps(3), clock: $0.clock, latest: false)
      "a--b--e--h--[k|]"
    }

I am just trying to nail down if we have a problem and what it exactly is and that we are not mixing semantics of debounce with throttle here.

pyrtsa commented 1 year ago

Could you do me a favour an try to write a validation test that represents what currently is not working like you expect it to work.

Let me help there. I believe this validation test should pass but it doesn't:

    validate {
       "ab--cd-e-f---gh|"
       $0.inputs[0].throttle(for: .steps(3), clock: $0.clock, latest: true)
       "a--b--d--f---g--[h|]"
     }
Kolos65 commented 1 year ago

@FranzBusch Here is one I think matches my example:

    validate {
      "-a-b-c-d-e-f-----h-i-j-k-|"
      $0.inputs[0].throttle(for: .steps(3), clock: $0.clock, latest: true)
      "-a---c---e---f---h---j---|"
    }

actual: -a---c---e-------h---j---|

pyrtsa commented 1 year ago

I'm sorry @Kolos65, but I think there is a bug in that. Throttle should reset its timer for each output it yields, not input. If the timer lapses without further input, then the next input after that should be yielded immediately just like the initial input. Like so:

    validate {
      "-a-b-c-d-e-f-----h-i-j-k-|"
      $0.inputs[0].throttle(for: .steps(3), clock: $0.clock, latest: true)
      "-a--b--d--e--f---h--i--k-|"
    }

Notice how there's an output every 3 steps for as long as there's input at least as often; otherwise the gap between outputs can be longer than 3 but no more than necessary.

phausler commented 1 year ago

after looking into this pretty extensively; the outcome is this - I am currently in favor of removing throttle and deferring it out of the 1.0 since that is the only real remaining task at hand. And to be quite honest there is a lot of discordant expectations for what it ought to do.

I agree that the last element was definitely a bug, but the other issues stated have to do with how the interval is counted and for what context it is used. One confusing factor is that all AsyncSequences are applied as a demand of 1; in that each call to next is what drives the production of values. This comes in conflict when measuring time to the next value or the time to the previous value. Quite honestly; it doesn't seem like we have a good consensus on how that should operate.

The current implementation is rather straightforward but definitely exposes some behavioral aspects that might be surprising. But it definitely clashes easily on: "is this really a variation on a lower level thing that conjoins the debounce and throttle machinery?" or "how does this effect sendability?" or "does this secretly have a buffer under the hood?".

I welcome more clarity here; but the murkier it gets the more likely it seems that this operation (albeit currently rather simple) is not ready.

FranzBusch commented 1 year ago

I agree with this. Let's remove throttle from 1.0.0 and brainstorm again exactly what semantics we want to implement here.

tarbaiev-smg commented 1 year ago

A side topic, but I was somewhat surprised to see a 1.0.0 release already. There is no description or a changelog for it. Just wondering, are there some breaking changes to justify the major increment or was it more of a milestone thing? I'd personally like to see 1.0.0 when we already have feature parity with Combine (at least its first version as of iOS 13 SDK), and throttle is one thing which is missing currently and is very demanded by UI app developers.

FranzBusch commented 1 year ago

We have been working on a 1.0.0 for quite some time now and are just going through the final open issues that are blocking the release. The goal for 1.0.0 is not feature parity or providing all possible algorithms but to have a baseline of algorithms that are working and functional. There are many more algorithms that we want to support but the 1.0.0 is important for us to settle the foundation. Furthermore, it is important for the ecosystem since nobody can really depend on this package before the 1.0.0 is out.

tarbaiev-smg commented 1 year ago

One confusing factor is that all AsyncSequences are applied as a demand of 1; in that each call to next is what drives the production of values. This comes in conflict when measuring time to the next value or the time to the previous value. Quite honestly; it doesn't seem like we have a good consensus on how that should operate.

As I see it, based on Combine's throttle, the demand of 1 should be fine. We need to:

  1. await for the next value and the interval timeout in two parallel tasks
  2. when receiving a new value, just store it in a variable and override with new values if the latest is true
  3. when the interval ends, return the stored value or start over, if there's no value to return

Maybe we don't even need to measure time this way 🤔, I don't think we need to be very precise here fighting a race between the timeout and next value. I could try to craft a possible implementation.

mipstian commented 9 months ago

I too was/am expecting Combine's behavior!

If all I do with the base sequence is send two values immediately, I'd expect the throttled sequence to return the first immediately, and the second after one interval is elapsed. The implementation in 1.0.0 returns the first immediately and effectively drops the second.

bobspryn commented 9 months ago

Is there any update on throttle? Just was surprised by missing our last element as well when attempting to use it in a non-terminating stream.

fanwgwg commented 4 months ago

+1 Any updates on this issue so far?

drewmccormack commented 4 months ago

I think part of the problem is the overloading of terms like "throttle". I see it used to mean very different things, and the discussion above demonstrates it quite well. I think it would probably be best, given how loaded the term is, to not use it at all, coming up with new functions that are clearer.

To me, in the real world, throttling is about changing the rate of processing. In an engine, you can increase or decrease the revs by changing the throttle. What you never do is "drop" revs (unless your car has issues).

What I expected from "throttle" was that it would never drop anything in the sequence, but simply delay it, in order to maintain a maximum density of iteration. This apparently is not what it does. It seems that it indeed drops items.

What I would like to see are three different operators...

debounce - which waits for a pause in the stream before emitting the latest value, dropping anything in between. spread - which never drops anything, but ensures that they are emitted with a maximum time density. It effectively queues things up, but slows down processing. slice - which is currently throttle, but would need to produce the latest value at the end of the interval as discussed earlier

tarbaiev-smg commented 4 months ago

@drewmccormack

debounce - which waits for a pause in the stream before emitting the latest value, dropping anything in between. spread - which never drops anything, but ensures that they are emitted with a maximum time density. It effectively queues things up, but slows down processing. slice - which is currently throttle, but would need to produce the latest value at the end of the interval as discussed earlier

There are only two hard problems in computer science: naming and cache invalidation 😉. As confusing as the term throttle is, it's a well established term in reactive frameworks on Apple platforms. Replacing it with something else would make things even more confusing. And as I mentioned, Combine.Publisher and AsyncSequence essentially represent same concept in same ecosystem, so it makes sense to keep the naming aligned. throttle with the last element is the most demanded of the similar ones, as it's often used in UI applications.

drewmccormack commented 4 months ago

I'm afraid I think this is part of the problem. Apple should have come up with their own naming, instead of adopting things from half baked Facebook frameworks and academic work half a century old. They had an opportunity, and they missed it.

The naming is quite alienating to most app-level developers. Sure, if you are right into functional or reactive programming, it probably makes sense, but that is not where Apple comes from. Terms like 'debounce' mean very little to someone who has been using UIKit or AppKit (ie most developers).

The discussion above demonstrates it quite well IMO. It is pretty clear people have different ideas about what throttle means, and the accepted terminology actually makes no sense. It was a poor choice to begin with.

But even if "throttle" is the accepted term, it should be very clear that it drops items in the stream. That should at least be stated in the docs, and not left up to how much experience of the reader has with ReactNative or whatever 3rd party framework is using it.

And as far as I can tell, there is still another type of function — a true throttle that doesn't drop items — which is missing. Perhaps it is there, but I can't find it. I assumed "throttle" would be it, but seems it is missing.

Gernot commented 1 month ago

Is there any roadmap for when there'll be a solution for this? I hoped that there was a 1.1 release planned in sync with swift 6, but it seems it doesn't work like this. I don't really care how it's named, I just want an async function that lets me limit the number of values per time and let's me chose which one it filters – without omitting the last value. I can't understand why this is an issue that's open so long, if the code is done but the naming isn't.

phausler commented 1 month ago

it isn't per se a naming issue but instead a focus of folks working on it; if you want to take this up and drive a discussion on the forums to resolve it id welcome that!