dbrattli / aioreactive

Async/await reactive tools for Python 3.11+
MIT License
350 stars 24 forks source link

Observable from an async_generator? #27

Closed austinnichols101 closed 3 years ago

austinnichols101 commented 3 years ago

How can I create an AsyncObservable from an async_generator using aioreactive?

async def records():
    for i in range(0, 3):
        yield i
source = rx.from_(records())
source.subscribe(
    on_next=lambda x: logger.info(f"on_next: {x=}"),
    on_error=lambda x: logger.error(f"error: {x=}"),
    on_completed=lambda: logger.info("completed")
)

results in an Exception: TypeError("'async_generator' object is not iterable")

austinnichols101 commented 3 years ago

I ended up creating a AsyncSubject() -> AsyncAnonymousObserver() stream and then looping to "publish" the records:

for record in records()
    await stream.asend(value=record

I would be nice to have a from_async_generator operation :) but the approach above works just fine and is very readable.

After adding some logging, I was also able to see "Implicit synchronous back-pressure ™" in action. Very nice!