robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

module 'faust.worker' has no attribute 'Worker' #694

Closed mattsunsjf closed 3 years ago

mattsunsjf commented 3 years ago

Checklist

Steps to reproduce

I just started to explore Managed Kafka in Confluent Cloud and Faust. I have the Kafka topics created in Confluent Cloud. With a new Python project, I used the sample program on Faust doc (https://github.com/robinhood/faust#faust-is). However, I'm not able to run this program. The error is included below.

FYI, this is my code, it's essentially the same as the sample program in the doc.

import faust

app = faust.App(
    id='myapp',
    broker=BROKER_URL,
    store='rocksdb://'
)
class Order(faust.Record):
    from_name: str
    to_name: int

@app.agent(value_type=Order)
async def order(orders):
    async for order in orders:
        # process infinite stream of orders.
        print(f'Order for {order.account_id}: {order.amount}')
if __name__ == '__main__':
    app.main()

Expected behavior

I expect that the program is able to run.

Actual behavior

Tell us what happened instead.

Full traceback

Traceback (most recent call last):
  File "/System/Volumes/Data/space/workspace/playground/project_util/kafka.py", line 29, in <module>
    topic = app.topic('topic1', value_type=Greeting)
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/faust/app/base.py", line 781, in topic
    return self.conf.Topic(
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/faust/app/base.py", line 1796, in conf
    self._configure()
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/faust/app/base.py", line 1740, in _configure
    conf = self._load_settings(silent=silent)
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/faust/app/base.py", line 1752, in _load_settings
    return self.Settings(appid, **self._prepare_compat_settings(conf))
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/faust/types/settings.py", line 768, in __init__
    self.Worker = cast(Type[_WorkerT], Worker or WORKER_TYPE)
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/faust/types/settings.py", line 796, in __setattr__
    object.__setattr__(self, key, value)
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/faust/types/settings.py", line 1204, in Worker
    self._Worker = symbol_by_name(Worker)
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/mode/utils/imports.py", line 269, in symbol_by_name
    **kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 994, in _gcd_import
  File "<frozen importlib._bootstrap>", line 971, in _find_and_load
  File "<frozen importlib._bootstrap>", line 955, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/faust/worker.py", line 19, in <module>
    from aiokafka.structs import TopicPartition
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/aiokafka/__init__.py", line 3, in <module>
    from .abc import ConsumerRebalanceListener
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/aiokafka/abc.py", line 2, in <module>
    from kafka import ConsumerRebalanceListener as BaseConsumerRebalanceListener
  File "/System/Volumes/Data/space/workspace/playground/project_util/kafka.py", line 29, in <module>
    topic = app.topic('topic1', value_type=Greeting)
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/faust/app/base.py", line 781, in topic
    return self.conf.Topic(
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/faust/app/base.py", line 1796, in conf
    self._configure()
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/faust/app/base.py", line 1740, in _configure
    conf = self._load_settings(silent=silent)
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/faust/app/base.py", line 1752, in _load_settings
    return self.Settings(appid, **self._prepare_compat_settings(conf))
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/faust/types/settings.py", line 768, in __init__
    self.Worker = cast(Type[_WorkerT], Worker or WORKER_TYPE)
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/faust/types/settings.py", line 796, in __setattr__
    object.__setattr__(self, key, value)
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/faust/types/settings.py", line 1204, in Worker
    self._Worker = symbol_by_name(Worker)
  File "/Users/jifewo/.virtualenvs/playground/lib/python3.6/site-packages/mode/utils/imports.py", line 275, in symbol_by_name
    return cast(_T, getattr(module, attribute_name))
AttributeError: module 'faust.worker' has no attribute 'Worker'

Versions