faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

Fatal error on startup with documentation examples #559

Closed gegnew closed 1 year ago

gegnew commented 1 year ago

Checklist

Steps to reproduce

In a fresh project, using examples from the documentation, I get a fatal worker error on startup.

Expected behavior

Not this

Actual behavior

Error: TypeError("Queue.__init__() got an unexpected keyword argument 'loop'")

Full traceback

┌ƒaµS† v1.10.4┬──────────────────────────────────────────────────────┐
│ id          │ hello-app                                            │
│ transport   │ [URL('kafka://localhost')]                           │
│ store       │ memory:                                              │
│ web         │ http://localhost:6066/                               │
│ log         │ -stderr- (warn)                                      │
│ pid         │ 9698                                                 │
│ hostname    │ lia.local                                            │
│ platform    │ CPython 3.11.5 (Darwin arm64)                        │
│ drivers     │                                                      │
│   transport │ aiokafka=1.1.6                                       │
│   web       │ aiohttp=3.8.6                                        │
│ datadir     │ /Users/g/Desktop/zeit/athena/kafka/hello-app-data    │
│ appdir      │ /Users/g/Desktop/zeit/athena/kafka/hello-app-data/v1 │
└─────────────┴──────────────────────────────────────────────────────┘
starting➢ ◜[2023-10-26 23:53:36,104] [9698] [ERROR] [^Worker]: Error: TypeError("Queue.__init__() got an unexpected keyword argument 'loop'")
Traceback (most recent call last):
  File "/Users/g/Desktop/zeit/athena/kafka/.venv/lib/python3.11/site-packages/mode/worker.py", line 273, in execute_from_commandline
    self.loop.run_until_complete(self._starting_fut)
  File "/opt/homebrew/Cellar/python@3.11/3.11.5/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/g/Desktop/zeit/athena/kafka/.venv/lib/python3.11/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/Users/g/Desktop/zeit/athena/kafka/.venv/lib/python3.11/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/Users/g/Desktop/zeit/athena/kafka/.venv/lib/python3.11/site-packages/mode/services.py", line 767, in _actually_start
    await child.maybe_start()
  File "/Users/g/Desktop/zeit/athena/kafka/.venv/lib/python3.11/site-packages/mode/services.py", line 795, in maybe_start
    await self.start()
  File "/Users/g/Desktop/zeit/athena/kafka/.venv/lib/python3.11/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/Users/g/Desktop/zeit/athena/kafka/.venv/lib/python3.11/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/Users/g/Desktop/zeit/athena/kafka/.venv/lib/python3.11/site-packages/mode/services.py", line 749, in _actually_start
    for dep in self.on_init_dependencies():
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g/Desktop/zeit/athena/kafka/.venv/lib/python3.11/site-packages/faust/app/base.py", line 567, in on_init_dependencies
    return self.boot_strategy.server()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g/Desktop/zeit/athena/kafka/.venv/lib/python3.11/site-packages/faust/app/base.py", line 277, in server
    self.kafka_consumer(),
    ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g/Desktop/zeit/athena/kafka/.venv/lib/python3.11/site-packages/faust/app/base.py", line 328, in kafka_consumer
    self.app.consumer,
    ^^^^^^^^^^^^^^^^^
  File "/Users/g/Desktop/zeit/athena/kafka/.venv/lib/python3.11/site-packages/faust/app/base.py", line 1818, in consumer
    self._consumer = self._new_consumer()
                     ^^^^^^^^^^^^^^^^^^^^
  File "/Users/g/Desktop/zeit/athena/kafka/.venv/lib/python3.11/site-packages/faust/app/base.py", line 1674, in _new_consumer
    return self.transport.create_consumer(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g/Desktop/zeit/athena/kafka/.venv/lib/python3.11/site-packages/faust/transport/base.py", line 68, in create_consumer
    return self.Consumer(self, callback=callback,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g/Desktop/zeit/athena/kafka/.venv/lib/python3.11/site-packages/faust/transport/consumer.py", line 1231, in __init__
    self._method_queue = MethodQueue(loop=self.loop, beacon=self.beacon)
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g/Desktop/zeit/athena/kafka/.venv/lib/python3.11/site-packages/mode/threads.py", line 331, in __init__
    self._queue = asyncio.Queue(loop=self.loop)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: Queue.__init__() got an unexpected keyword argument 'loop'
◠stopping➢◝%

Versions

dada-engineer commented 1 year ago

What examples did you use? Can you share?

gegnew commented 1 year ago

hmm, the error seems to have been doing pip install faust instead of pip install faust-streaming. my bad

gegnew commented 1 year ago

hmm, the error seems to have been doing pip install faust instead of pip install faust-streaming. my bad