faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

'ThrowableQueue' object has no attribute 'put_nowait_enhanced' #268

Closed tigrus closed 2 years ago

tigrus commented 2 years ago

Checklist

Steps to reproduce

Put any invalid message to topic?

Expected behavior

Error and message skip..

Actual behavior

Critical crash

Full traceback

Traceback (most recent call last):
  File "/Users/Fine/Work/M/NotificationService/venv/lib/python3.8/site-packages/mode/services.py", line 779, in _execute_task
    await task
  File "/Users/Fine/Work/M/NotificationService/venv/lib/python3.8/site-packages/faust/transport/consumer.py", line 176, in _fetcher
    await consumer._drain_messages(self)
  File "/Users/Fine/Work/M/NotificationService/venv/lib/python3.8/site-packages/faust/transport/consumer.py", line 1170, in _drain_messages
    await self.wait_first(
  File "/Users/Fine/Work/M/NotificationService/venv/lib/python3.8/site-packages/mode/services.py", line 692, in wait_first
    f.result()  # propagate exceptions
  File "/Users/Fine/Work/M/NotificationService/venv/lib/python3.8/site-packages/faust/transport/conductor.py", line 276, in on_message
    return await get_callback_for_tp(message.tp)(message)
  File "/Users/Fine/Work/M/NotificationService/venv/lib/python3.8/site-packages/faust/transport/conductor.py", line 142, in on_message
    queue.put_nowait_enhanced(
AttributeError: 'ThrowableQueue' object has no attribute 'put_nowait_enhanced'

Versions

Cave-Johnson commented 2 years ago

Make sure you are using mode-streaming rather than mode. Its a forked version designed for faust-streaming afaik

tigrus commented 2 years ago

I'd to replace faust with aiokafka in the end.