roveo / streamz_redis

A Redis plugin for Streamz
4 stars 1 forks source link

from_redis_consumer_group not registered as a streamz source #2

Open schwab opened 3 years ago

schwab commented 3 years ago

In python 3.9, I'm getting the following error when trying to access .from_redis_consumer_group....

AttributeError: type object 'Source' has no attribute 'from_redis_consumer_group'

To reproduce, simply install streamz-redis using pip install git+https://github.com/roveo/streamz_redis.git

Which completes successfully with ... Requirement already satisfied: heapdict in /home/mcstar/mdmetrix_muzero/env_3.9/lib/python3.9/site-packages (from zict->streamz@ git+https://github.com/python-streamz/streamz.git->streamz-redis==0.1.0) (1.0.1) Building wheels for collected packages: streamz-redis Building wheel for streamz-redis (setup.py) ... done Created wheel for streamz-redis: filename=streamz_redis-0.1.0-py3-none-any.whl size=18607 sha256=e0867c94e1c3c43cb5197057d0fe39969809bce6015a41483605f6fdfcaf1350 Stored in directory: /tmp/pip-ephem-wheel-cache-31xmawmt/wheels/c1/a1/a6/25929d85f380ca9c92680c4f10d18615304dcecb68bd6fe25f Successfully built streamz-redis Then open python3 and do from streamz import Source

Source.from_redis_consumer_group(streams="x.... Traceback (most recent call last): File "", line 1, in AttributeError: type object 'Source' has no attribute 'from_redis_consumer_group'

schwab commented 3 years ago

As a work-around I also to just create an instance of the redis source directly by calling from_redis_consumer_group since that should should be a stream object, but this fails, with the error...

File "/home/mcstar/mdmetrix_muzero/env_3.9/lib/python3.9/site-packages/streamz_redis/sources/from_redis_consumer_group.py", line 68, in __init__ super().__init__(client_params=client_params, **kwargs) File "/home/mcstar/mdmetrix_muzero/env_3.9/lib/python3.9/site-packages/streamz_redis/sources/base.py", line 22, in __init__ super().__init__(ensure_io_loop=True, **kwargs) File "/home/mcstar/mdmetrix_muzero/env_3.9/lib/python3.9/site-packages/streamz/sources.py", line 39, in __init__ super().__init__(ensure_io_loop=True, **kwargs) TypeError: streamz_redis.base.RedisNode.__init__() got multiple values for keyword argument 'ensure_io_loop'

Code to repro...


from streamz_redis.sources import from_redis_consumer_group
source = from_redis_consumer_group(streams=[QUEUE_STREAM_NAME], group_name=COHORT_READ_GROUP, consumer_name=id )
    (
        source.map(prepare)
        .partition(1)
        .map(lambda x: analyze_cohort(x))
        .sink_to_redis_list(COMPLETED)
    )