dbrattli / aioreactive

Async/await reactive tools for Python 3.10+
MIT License
361 stars 25 forks source link

Cannot run filter example #12

Closed nmiculinic closed 3 years ago

nmiculinic commented 6 years ago
"""Example to show how to split a stream into two substreams."""
import asyncio

from aioreactive.core import subscribe, AsyncAnonymousObserver

from aioreactive.core import AsyncObservable
from aioreactive.operators import pipe as op

async def main():
    xs = AsyncObservable.from_iterable(range(10))

    # Split into odds and evens
    odds = xs | op.filter(lambda x: x % 2 == 1)
    evens = xs | op.filter(lambda x: x % 2 == 0)

    async def mysink(value):
        print(value)

    await subscribe(odds, AsyncAnonymousObserver(mysink))
    await subscribe(evens, AsyncAnonymousObserver(mysink))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

upon running: AttributeError: module 'aioreactive.operators.pipe' has no attribute 'filter'

nmiculinic commented 6 years ago

If I change pipe source to something like this it works, but it's kinda hacky

from typing import Callable

from aioreactive.core import AsyncObservable
from functools import wraps, partial
import importlib
import sys
import aioreactive.operators
def pipe(source: AsyncObservable, *args: Callable[[AsyncObservable], AsyncObservable]) -> AsyncObservable:
    for op in args:
        source = op(source)
    return source

_this_module = sys.modules[__name__]

def setup(x):
    def fn(*args, **kwargs):
        ff = eval(f"aioreactive.operators.{x}")
        return partial(ff, *args, **kwargs)
    setattr(_this_module, x , fn)

for x in ["concat", "debounce", "delay", "filter", "map", ]:
    setup(x)
dbrattli commented 6 years ago

Sorry, have done some refactoring so the correct way to import partially applied operators for the example is:

from aioreactive.core import Operators as op

Have fixed the example code in master.