vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
797 stars 34 forks source link

Rate limit / throttle operator #70

Open Jc2k opened 3 years ago

Jc2k commented 3 years ago

I am combining multiple streams with a merge. The output of this stream is a summary "state" of a system. I.e. each item from the stream is a summary of the state of a system at that point in time. The resulting output is fed into a websocket and used to drive a react app. This react app then shows a handy live updating dashboard. This is working really well, but in the more extreme cases i don't really want to send 100's of events a second down the websocket, so i thought a throttle operator would be a handy tool to have in my belt.

The requirement is to start a timer when an item is received from the source, carry on iterating and when that timer expires only send the most recently received item from the source. And the timer should not be running unless there are 1 or more events waiting to send.

My poc looks like this:

@operator(pipable=True)
async def throttle(source, delay=0.5):
    async with streamcontext(source) as streamer:
        cur = None
        next = asyncio.create_task(aiostream.aiter_utils.anext(streamer))
        waiter = None
        aws = {next}

        while True:
            done, aws = await asyncio.wait(aws, return_when=asyncio.FIRST_COMPLETED)

            if next in done:
                cur = await next
                next = asyncio.create_task(aiostream.aiter_utils.anext(streamer))
                aws.add(next)

                if not waiter:
                    waiter = asyncio.create_task(asyncio.sleep(delay))
                    aws.add(waiter)

            if waiter and waiter in done:
                yield cur

                waiter = None

It works, but it feels like its relying on asyncio primitives too much, and that maybe there is a more idiomatic way to do it with aiostream. Can you think of any cleaner ways to implement this. The calls to anext() especially make me feel like i've overlooked something.

vxgmichel commented 3 years ago

Right, the debounce operator in ReactiveX!

Here's my attempt at implementing it, can you confirm it works as you expect?


import asyncio
from collections import deque
from aiostream import streamcontext, aiter_utils, operator

@operator(pipable=True)
async def debounce(source, delay=0.5):
    loop = asyncio.get_event_loop()

    # Stream context
    async with streamcontext(source) as streamer:

        # Get first item
        try:
            item = await aiter_utils.anext(streamer)
        except StopAsyncIteration:
            return

        # Set of pending tasks, its length should never exceed 1
        pending = set()

        # Loop over items
        while True:

            # Compute future deadline THEN yield the item
            deadline = loop.time() + delay
            yield item

            # Run until deadline
            queue = deque(maxlen=1)
            timeout = deadline - loop.time()
            while timeout > 0:

                # Wait for next item with timeout
                if not pending:
                    pending = {aiter_utils.anext(streamer)}
                done, pending = await asyncio.wait(pending, timeout=timeout)

                # Add the current item to a deque with capacity 1
                if done:
                    try:
                        queue.append(done.pop().result())
                    except StopAsyncIteration:
                        return

                # Recompute the timeout
                timeout = deadline - loop.time()

            # Get next item
            if queue:
                item = queue.popleft()
            else:
                assert pending
                try:
                    item = await pending.pop()
                except StopAsyncIteration:
                    return

And the corresponding test module:

from aiostream import stream, pipe

import pytest

@pytest.mark.asyncio
async def test():
    xs = stream.empty() | debounce.pipe(.1) | pipe.list()
    assert await xs == []

    xs = stream.just(0) | debounce.pipe(.1) | pipe.list()
    assert await xs == [0]

    xs = stream.range(3) | debounce.pipe(.1) | pipe.list()
    assert await xs == [0]

    xs = (
        stream.chain(
            stream.empty() | pipe.delay(0.3),
            stream.range(3),
            stream.empty() | pipe.delay(0.3),
            stream.range(10, 13),
            stream.empty() | pipe.delay(0.3),
            stream.range(20, 23),
            stream.empty() | pipe.delay(0.3),
        )
        | debounce.pipe(0.1)
        | pipe.list()
    )
    assert await xs == [0, 2, 10, 12, 20, 22]
Jc2k commented 3 years ago

Yep, can confirm it's working a treat! Thanks a lot! Hadn't even noticed the timeout option on wait(), that makes things quite a lot nicer (i was having to call create_task so that i could compare the contents of done to what i had pushed into wait(), which auto-wraps things in tasks).

I'm relieved to see your approach still uses anext and asyncio.wait. I was half wondering if you were going to do something like a merge(source, delay(just(TIMEOUT_SENTINEL))) and then iterate till you hit the sentinel. I tried something like that but struggled as the stream context had already been entered or something similar.

Is this something that you think belongs in aiostream, or is there a place for recipes it could go?

vxgmichel commented 3 years ago

I'm relieved to see your approach still uses anext and asyncio.wait. I was half wondering if you were going to do something like a merge(source, delay(just(TIMEOUT_SENTINEL))) and then iterate till you hit the sentinel.

Ouch, that hurts my brain just thinking about it :sweat_smile:

Is this something that you think belongs in aiostream, or is there a place for recipes it could go?

I think it does belong to aiostream, especially since there is a direct ReactiveX equivalent. I think it would fit nicely in the timing category. Fell free to make a PR if you feel like contributing, otherwise I'll do it next week.

An extra question: should the debounce stream yield the last cached item before closing? In other terms, should we add the guarantee that the last produced item is always forwarded? And if yes, do we want the same behavior when the producer raised an exception?

Jc2k commented 3 years ago

An extra question: should the debounce stream yield the last cached item before closing? In other terms, should we add the guarantee that the last produced item is always forwarded? And if yes, do we want the same behavior when the producer raised an exception?

Oooh, now that is hurting my brain 😆

For my use case, neither are that important as my stream is "infinite" it should only close if it is cancelled, probably because the websocket went away. And if there was an exception there are various places where i recover and reconnect, so missing items is not important.

But for more general use cases, to me the following feel natural:

The last of those was the most problematic, i think. An error is an error, and I think some sort of partial recovery approach could be surprising and confusing. But if we did yield before an error and a user was using this operator to e.g. rate-limit their output to a third party API then if we did return the last item before the error it would have to respect the timeout. So it would catch the error, wait till the end of the timeout, yield the last item, then immediately re-raise the error.

Jc2k commented 3 years ago

I've tried it on a larger dataset and with a smaller delay and I think i've found a corner case. I'm hitting the assert pending. The only explanation I have is:

Haven't had chance to try it with the test case ^ yet but maybe something like this?

@operator(pipable=True)
async def debounce(source, delay=0.1):
    """
    https://github.com/vxgmichel/aiostream/issues/70
    """

    loop = asyncio.get_event_loop()

    # Stream context
    async with streamcontext(source) as streamer:

        # Get first item
        try:
            item = await aiter_utils.anext(streamer)
        except StopAsyncIteration:
            return

        # Set of pending tasks, its length should never exceed 1
        pending = set()

        # Loop over items
        while True:

            # Compute future deadline THEN yield the item
            deadline = loop.time() + delay
            yield item

            if not pending:
                pending = {aiter_utils.anext(streamer)}

            # Run until deadline
            queue = deque(maxlen=1)
            timeout = deadline - loop.time()
            while timeout > 0:
                # Wait for next item with timeout
                done, pending = await asyncio.wait(pending, timeout=timeout)

                # Add the current item to a deque with capacity 1
                if done:
                    try:
                        queue.append(done.pop().result())
                    except StopAsyncIteration:
                        return

                if not pending:
                    pending = {aiter_utils.anext(streamer)}

                # Recompute the timeout
                timeout = deadline - loop.time()

            # Get next item
            if queue:
                item = queue.popleft()
            else:
                assert pending
                try:
                    item = await pending.pop()
                except StopAsyncIteration:
                    return

Basically the same, but ensure that pending always contains something, even if the inner while loop is skipped.

vxgmichel commented 3 years ago

But for more general use cases, to me the following feel natural: [...]

Good points! I agree with most of it, except for delaying the last item. I don't think it's a problem to send two items right after another and ignore the debounce window if the stream is closed right after that.

I've tried it on a larger dataset and with a smaller delay and I think i've found a corner case. I'm hitting the assert pending. The only explanation I have is: [...]

Good catch, I totally missed it. Here's another version of debounce which should address both issues:

import asyncio
from collections import deque
from aiostream import streamcontext, aiter_utils, operator

@operator(pipable=True)
async def debounce(source, delay=0.5):
    loop = asyncio.get_event_loop()

    # Stream context
    async with streamcontext(source) as streamer:

        # Set of pending `anext` task, length should never exceed 1
        pending = set()

        # Cache for the last value while waiting for the deadline
        queue = deque(maxlen=1)

        # Loop over items
        while True:

            # Get next item
            try:
                if queue:
                    item = queue.popleft()
                elif pending:
                    item = await pending.pop()
                else:
                    item = await aiter_utils.anext(streamer)
            # Streamer is exhausted
            except StopAsyncIteration:
                return

            # Compute future deadline THEN yield the item
            deadline = loop.time() + delay
            yield item

            # Run until deadline
            timeout = deadline - loop.time()
            while timeout > 0:

                # Wait for next item with timeout
                if not pending:
                    pending = {aiter_utils.anext(streamer)}
                done, pending = await asyncio.wait(pending, timeout=timeout)

                # The `anext` task has ended before the timeout
                if done:
                    # Add the current item to a deque with capacity 1
                    try:
                        queue.append(done.pop().result())
                    # Simply break out of the loop
                    # (and let the next `anext` call raise a new `StopAsyncIteration`)
                    except StopAsyncIteration:
                        break

                # Recompute the timeout
                timeout = deadline - loop.time()

# Testing

from aiostream import stream, pipe

import pytest

@pytest.mark.asyncio
async def test():
    xs = stream.empty() | debounce.pipe(0.1) | pipe.list()
    assert await xs == []

    xs = stream.just(0) | debounce.pipe(0.1) | pipe.list()
    assert await xs == [0]

    xs = stream.range(3) | debounce.pipe(0.1) | pipe.list()
    assert await xs == [0, 2]

    async def slow_consume(x):
        await asyncio.sleep(0.2)
        return x

    xs = (
        stream.range(3)
        | debounce.pipe(0.1)
        | pipe.map(slow_consume, task_limit=1)
        | pipe.list()
    )
    assert await xs == [0, 1, 2]

    xs = (
        stream.chain(
            stream.empty() | pipe.delay(0.3),
            stream.range(3),
            stream.empty() | pipe.delay(0.3),
            stream.range(10, 13),
            stream.empty() | pipe.delay(0.3),
            stream.range(20, 23),
            stream.empty() | pipe.delay(0.3),
        )
        | debounce.pipe(0.1)
        | pipe.list()
    )
    assert await xs == [0, 2, 10, 12, 20, 22]
Masynchin commented 2 years ago

Fell free to make a PR if you feel like contributing, otherwise I'll do it next week.

I see that there was some activity on project a month ago. What is the current state of library?

vxgmichel commented 2 years ago

What is the current state of library?

I'm maintaining it (e.g supporting new python releases) but I'm not actively developing it. Do you need the debounce operator? I guess it can easily be added to the lib since the implementation here already has tests.

I'll do it next week.

Eh, must have slipped my mind :sweat_smile:

Masynchin commented 2 years ago

I subscribed to updates of this library about a year ago. Last month I got notification about new release. Since the first look at this project I earned many experience (I am student then and now), and currently looking how I can used it or what projects I can do around this library.

I am exploring the functionality that can be used now (via docs) and functionality that can be added (via branches and issues).

So,

Do you need the debounce operator?

No, just curios about what else this library potentialy can do.

But hope I can answer "yes" after some time 😃

vxgmichel commented 2 years ago

Oh great to hear that!

Well feel free to make the PR for the debounce operator if you're interested otherwise I'll do it next week later :grin:

mikevb3 commented 2 years ago

I'm coming to confirm this is a great add on.

Was having issues with spaceout blocking some iterations, and used @vxgmichel debounce, and it was a charm! Thank you!

nirvana-msu commented 1 year ago

I second that it's an extremely valuable addition to the library!

I do have a related feature request. At the moment debounce implementation just forwards the latest value only - which is great e.g. for a stream where new value completely supercedes the previous ones. But what if I want a live-updating chart? Or what if it's a live-updating table, but each item only updates a subset of the rows. I have the same problem of having to batch updates to improve UI performance, but I don't just need the latest item - I need to aggregate all pending items in some custom way. Would be nice if debounce provided a hook to customize what is being yielded and/or how the pending item(s) are being aggregated.

e.g. there could be an optional reduce argument (function taking two items and combining them) that would be invoked whenever an item arrives and there is one already pending.

vxgmichel commented 1 year ago

e.g. there could be an optional reduce argument (function taking two items and combining them) that would be invoked whenever an item arrives and there is one already pending.

Sounds like a great idea! I guess you'd also need an optional initial argument like in functools.reduce. Sadly I don't have bandwidth to work on this at the moment, but I'd gladly review a pull request if you were to create one. Thanks :)

nirvana-msu commented 1 year ago

@vxgmichel indeed, another optional argument is needed to produce initial value - although unlike in functools.reduce, it should be a callable rather than value itself, as it would need to be produced for each new batch of items (and the value could be mutable, such as e.g. an empty list that reduce would accumulate updates into).

I'm also short on time for a proper PR, but here's what I ended up with. It's still nearly the same as your previous version, the only changes are:

The latter is because without it, an exception like the following is thrown in Python 3.11:

  File "/<...>/lib/python3.11/site-packages/aiostream/aiter_utils.py", line 175, in __aexit__
    await self._aiterator.athrow(typ, value, traceback)
  <...>
  File "<...>/aiostream.py", line 58, in debounce
    done, pending = await asyncio.wait(pending, timeout=timeout)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<...>/.pyenv/versions/3.11.0/lib/python3.11/asyncio/tasks.py", line 415, in wait
    raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
TypeError: Passing coroutines is forbidden, use tasks explicitly.

Implicit wrapping into task was removed in 3.11 in this commit.

Code:

import asyncio
from collections import deque

from aiostream import streamcontext, aiter_utils, operator

@operator(pipable=True)
async def debounce(source, delay=0.5, reduce=None, initializer=None):
    """
    Ensure a given delay passes between any two consecutive items yielded from the stream.

    :param source: Source stream
    :param delay: Minimum delay to enforce between any two consecutive items, seconds
    :param reduce: Optional callable to aggregate current result with new item. If not provided, the latest item is kept
    :param initializer: Optional callable to produce initial value for reduce operation. If not provided, first item
             itself is assumed to be the first result.
    """
    loop = asyncio.get_event_loop()

    # Stream context
    async with streamcontext(source) as streamer:

        # Set of pending `anext` task, length should never exceed 1
        pending = set()

        # Cache for the last value while waiting for the deadline
        queue = deque(maxlen=1)

        # Loop over items
        while True:

            # Get next item
            try:
                if queue:
                    item = queue.popleft()
                else:
                    if pending:
                        item = await pending.pop()
                    else:
                        item = await aiter_utils.anext(streamer)

                    if reduce is not None and initializer is not None:
                        item = reduce(initializer(), item)
            # Streamer is exhausted
            except StopAsyncIteration:
                return

            # Compute future deadline THEN yield the item
            deadline = loop.time() + delay
            yield item

            # Run until deadline
            timeout = deadline - loop.time()
            while timeout > 0:

                # Wait for next item with timeout
                if not pending:
                    pending = {asyncio.create_task(aiter_utils.anext(streamer))}
                done, pending = await asyncio.wait(pending, timeout=timeout)

                # The `anext` task has ended before the timeout
                if done:
                    # Add the current item to a deque with capacity 1, aggregating with previous result if needed
                    try:
                        item = done.pop().result()
                        if reduce is None or (initializer is None and not queue):
                            queue.append(item)  # no custom aggregation requested, or first item and no initializer
                        elif queue:
                            queue.append(reduce(queue.popleft(), item))  # aggregate with previous result
                        else:
                            queue.append(reduce(initializer(), item))  # first item, and initializer is given

                    # Simply break out of the loop
                    # (and let the next `anext` call raise a new `StopAsyncIteration`)
                    except StopAsyncIteration:
                        break

                # Recompute the timeout
                timeout = deadline - loop.time()
nirvana-msu commented 1 year ago

Actually, the overhead of asyncio.create_task is enormous.. my code became like an order of magnitude times slower after adding the above debounce conflation, and after some time debugging I figured out it is all due to all those extra asyncio.create_task wrappers.. @vxgmichel is this expected? There must be a more efficient way to await with a timeout, right?

Haven't tried this on Python 3.10 but I would think it's just as slow as asyncio.wait was effectively wrapping coroutine with a task internally?

UPD. I should have probably used ensure_future instead of create_task, but it seems just as slow..

nirvana-msu commented 1 year ago

To give some numbers - my old code (without wrapping each awaitable with ensure_future) was able to process a few hundred thousand items per second. With ensure_future wrappers it takes closer to a minute, so it makes it at least 50x slower.

Fundamentally, there is no need to add a timeout for awaiting each single item. We only need one asyncio timeout per each batch (i.e. until the following deadline) - the coroutine that we wrap with ensure_future / asyncio.wait shouldn't just await a single item, it should await as many as possible until a timeout. I'm gonna try re-writing this - will post the result.

Jc2k commented 1 year ago

In Home Assistant we tend to try and prefer asyncio.timeout over asyncio.wait, but its only available in 3.11 (we use the async_timeout library on <=3.10). This doesn't need to create futures at all.

nirvana-msu commented 1 year ago

@Jc2k while asyncio.timeout is nice and can trigger a timeout without much overhead, I don't think it could replace asyncio.wait here? The problem is that if timeout is triggered while awaiting the next anext, you won't have the item that it should have yielded. While you can keep the reference to this anext, you still can't get that item because awaiting the second time would throw RuntimeError: cannot reuse already awaited __anext__()/asend(). I don't think that can be solved without coming back to asyncio.wait, or am I missing something here?

Come to think of it, there is no way to avoid the slowness of wrapping each anext item with a task, is there? Because it's just not possible to await a coroutine twice.

nirvana-msu commented 1 year ago

For anyone who still needs this, here's what I ended up with. It doesn't create tasks (it only creates one), so for my use-case with a very large throughput it's at least an order of magnitude faster. ensure_future / create_task really are ridiculously slow.

The core of triggering a timeout is scheduling a callback with loop.call_at, but the callback has to be separated from the main loop that iterates item stream (otherwise we'd need the tasks to effectively be able to 'pause' and 'resume' awaiting the next stream item). Which means we need two places that may produce conflated data (the main item stream loop, and the callback) - hence the design with asyncio.Queue (kinda like mpsc channel):

import asyncio
from collections import deque

from aiostream import streamcontext, operator

@operator(pipable=True)
async def debounce(source, delay=0.5, reduce=None, initializer=None):
    """
    Ensure a given delay passes between any two consecutive items yielded from the stream.

    :param source: Source stream
    :param delay: Minimum delay to enforce between any two consecutive items, seconds
    :param reduce: Optional callable to aggregate current result with new item. If not provided, the latest item is kept
    :param initializer: Optional callable to produce initial value for reduce operation. If not provided, first item
             itself is assumed to be the first result.
    """
    # Queue of items to yield
    queue = asyncio.Queue()

    # Sentinel indicating that iteration has completed.
    # asyncio.Queue unfortunately provides no way to mark queue as closed, so using a sentinel item instead.
    close_sentinel = object()

    task = asyncio.ensure_future(producer(source, queue, close_sentinel, delay, reduce=reduce, initializer=initializer))

    try:
        while True:
            item = await queue.get()

            if item is not close_sentinel:
                yield item
            else:
                return  # finished
    finally:
        task.cancel()  # in case an exception was raised and task is not done yet
        try:
            await task
        except asyncio.CancelledError:
            pass

async def producer(source, queue: asyncio.Queue, close_sentinel, delay, reduce=None, initializer=None):
    timeout = Timeout(delay, close_sentinel)

    # Cache for the accumulated result while waiting for next timeout
    result = deque(maxlen=1)

    try:
        # Stream context
        async with streamcontext(source) as streamer:
            async for item in streamer:
                # Add the current item to a deque with capacity 1, accumulating as necessary
                if reduce is None or (initializer is None and not result):
                    result.append(item)  # no custom aggregation requested, or first item and no initializer
                elif result:
                    result.append(reduce(result.popleft(), item))  # aggregate with previous result
                else:
                    result.append(reduce(initializer(), item))  # first item, and initializer is given

                if timeout.handler is None:  # reschedule timeout & yield current result
                    timeout.reschedule(result, queue)
                    queue.put_nowait(result.popleft())

    finally:
        if timeout.handler is not None:
            timeout.is_last = True  # Close sentinel will be sent on next timeout

        else:
            if result:
                queue.put_nowait(result.popleft())
            queue.put_nowait(close_sentinel)

class Timeout:
    def __init__(self, delay, close_sentinel):
        self.delay = delay
        self.close_sentinel = close_sentinel

        self.loop = asyncio.get_event_loop()
        self.handler = None
        self.is_last = False

    def reschedule(self, result, queue):
        self.handler = self.loop.call_at(self.loop.time() + self.delay, self._on_timeout, result, queue)

    def _on_timeout(self, result, queue):
        if result:  # if there's a pending result
            if not self.is_last:
                self.reschedule(result, queue)
            queue.put_nowait(result.popleft())
        else:
            self.handler = None

        if self.is_last:
            queue.put_nowait(self.close_sentinel)