dbrattli / aioreactive

Async/await reactive tools for Python 3.11+
MIT License
350 stars 24 forks source link

rx.delay frequently destroys tasks #33

Closed shawnkoh closed 6 months ago

shawnkoh commented 1 year ago

Hi, i'm unsure if this is the intended behaviour, but rx.delay causes asyncio to frequently throw the following error.

if i'm not mistaken, rx.delay(x: seconds) is supposed to be an operator that delays the parent for x seconds.

this is how i think rx.delay should behave:

def setup():
    return pipe(
        rx.interval(0, 2),
        rx.flat_map(many),
        rx.subscribe_async(observer),
    )

async def observer(x):
    print(x)
async def delayed(x, y):
    await asyncio.sleep(y * 0.1)
    return (x, y)

def many(x: int):
    return rx.merge_seq(
        [
            pipe(
                delayed(x, 1),
                rx.from_async,
            ),
            pipe(
                delayed(x, 2),
                rx.from_async,
            ),
        ]
    )

however, when we use rx.delay instead of calling asyncio.sleep

async def delayed(x, y):
    return (x, y)

def many(x: int):
    return rx.merge_seq(
        [
            pipe(
                delayed(x, 1),
                rx.from_async,
                rx.delay(0.1),
            ),
            pipe(
                delayed(x, 2),
                rx.from_async,
                rx.delay(0.2),
            ),
        ]
    )

the exceptions are thrown

(0, 1)
(0, 2)
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-17' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-18' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
(1, 1)
(1, 2)
(2, 1)
(2, 2)
(3, 1)
(3, 2)
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-43' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-44' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-56' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending name='Task-57' coro=<delay.<locals>.subscribe_async.<locals>.worker.<locals>.loop() done, defined at /Users/shawnkoh/repos/ninjacado/.venv/lib/python3.10/site-packages/expression/core/fn.py:59> wait_for=<Future pending cb=[Task.task_wakeup()]>>
(4, 1)
(4, 2)
shawnkoh commented 1 year ago

A workaround i've been using is this function

@curry_flipped(1)
def sleep(
    source: AsyncObservable[_TSource],
    seconds: float,
    sleep_on_start: bool = False,
) -> AsyncObservable[_TSource]:
    started = sleep_on_start

    @curry_flipped(1)
    async def _sleep(source, seconds: float):
        nonlocal started
        if started:
            await asyncio.sleep(seconds)
        else:
            started = True
        return source

    return pipe(
        source,
        rx.map_async(_sleep(seconds)),
    )

example:

def setup(token: str):
    session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None))

    return pipe(
        rx.interval(0, 2),
        rx.flat_map(
            lambda _: pipe(
                range(14),
                rx.from_iterable,
                rx_helpers.sleep(0.1),
                rx.map(lambda y: arrow.now().shift(days=y)),
                rx.map_async(
                    schedules(token, session),
                ),
            )
        ),
        rx.subscribe_async(rx.AsyncAnonymousObserver()),
    )