apple / swift-async-algorithms

Async Algorithms for Swift
Apache License 2.0
3.03k stars 151 forks source link

Buffering Throttle #330

Open thecoolwinter opened 1 month ago

thecoolwinter commented 1 month ago

I know there's ongoing discussion about throttle semantics, but this is an API I can see being useful even while the final throttle API is being nailed down. Also correct me if this is already possible with the existing API.

One use case for throttle is to limit the reception of new items in the stream, while still receiving all items produced by the stream. For instance, receiving a stream of bytes and only processing the accumulated bytes for every time interval.

This is different from both the existing stream.buffer and stream._throttle apis because the elements of the resulting stream would be an array of the original elements. Existing buffering streams produce a single element in each iteration, and throttle only produces the last(ish) element received. This would produce all elements received in the time period since the last produced value.

A potential API might look like:

extension AsyncSequence {
    public func throttleBuffering<C: Clock>(for interval: C.Instant.Duration, clock: C, bufferPolicy: BufferPolicy) -> _BufferingThrottledStream<Element>
}

Where the _BufferingThrottledStream produces arrays of Element at each time interval if there are any elements to produce. It would also need to have a buffering policy similar to the existing buffering stream iterators.

This could maybe use some of the already implemented buffering streams and every time interval grab all buffered elements and send them downstream.

thecoolwinter commented 1 month ago

I suppose a workaround right now is to use a chunked stream with a timer. However this still sends events when there's no items being sent from the producer:

for await events in stream.chunked(by: .repeating(every: .milliseconds(250), clock: .continuous)) {
    // Accumulated data
}