ReactiveX / RxPY

ReactiveX for Python
https://rxpy.rtfd.io
MIT License
4.72k stars 356 forks source link

Rolling throttle operator #696

Closed matiboy closed 5 months ago

matiboy commented 11 months ago

Consider dealing with an API that says "50 calls per 10 seconds, rolling".

I therefore wish to have an operator that accomplishes the following:

  1. items that are emitted within the time frame and less than max number should emit immediately: e.g. -1-2-3 with max 3 items in time frame --------- should be -1-2-3-
  2. overflow items should emit as soon as they can: -1-2-3----4 with max 2 in timeframe ------- is -1-2---3-4 meaning that 3 and 4 are as early as possible, without having more than 2 items within 7 ticks (my example time frame)
  3. "double overflow" should wait as long as needed e.g. -1-2-3-4-5 if max is 2 and time frame is ------- should be -1-2---3-4----5 so that within any time frame of 7 ticks, there are never more than 2 items.

I've actually written the operator and tests and would be happy to add it to the library (if authors show interest), but mostly I'd like to know if it's already there and I missed it. Can this be achieved with one of the "throttle", or "window" operators?

My solution for reference:

from threading import Lock
from typing import TypeVar, Callable
from reactivex import Observable
from reactivex.scheduler import TimeoutScheduler
from datetime import timedelta, datetime

_T = TypeVar("_T")

def throttle_rolling(
    max_number: int, timespan: timedelta
) -> Callable[[Observable[_T]], Observable[_T]]:
    """Throttles the source Observable to max_number per timespan
    If a value comes in, it is emitted immediately. If too many values come in, they are emitted as soon as they do not violate the max_number constraint.
    """

    def _throttle_rolling(source: Observable[_T]) -> Observable[_T]:
        mutex = Lock()
        emitted_timestamps: list[datetime] = []

        def subscribe(observer, scheduler=None):
            _scheduler = scheduler or TimeoutScheduler()

            def on_next(value):
                nonlocal emitted_timestamps
                # Clear out old values
                n_values_ago = None
                with mutex:
                    emitted_timestamps = [
                        x for x in emitted_timestamps if x > (_scheduler.now - timespan)
                    ]
                    # If we get here, we have too many values; so look at the oldest emit, "max_number"-away from now and set to emit timespan later than that nearest value
                    if len(emitted_timestamps) >= max_number:
                        n_values_ago = emitted_timestamps[
                            len(emitted_timestamps) - max_number
                        ]
                if n_values_ago is None:
                    emit_time = _scheduler.now
                    _scheduler.invoke_action(lambda *_: observer.on_next(value))
                else:
                    emit_time = n_values_ago + timespan
                    _scheduler.schedule_absolute(
                        emit_time, lambda *_: observer.on_next(value)
                    )
                # We want to keep the real emission time, not when source emitted
                emitted_timestamps.append(emit_time)

            return source.subscribe(
                on_next=on_next,
                on_error=observer.on_error,
                on_completed=observer.on_completed,
            )

        return Observable(subscribe)

    return _throttle_rolling