faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

'ThrowableQueue' object has no attribute 'put_nowait_enhanced' #170

Closed PJ-Schulz closed 3 years ago

PJ-Schulz commented 3 years ago

Hello,

I get a strange Error when I start my app twice. On the first execution the app runs correctly. But when I restart, it crashes with an AttributeError: 'ThrowableQueue' object has no attribute 'put_nowait_enhanced'.
It occurs when restoring state from changelog Topic.

Does anyone have an idea what this could be due to?

Steps to reproduce

table = app.Table("Table", partitions=1, default=tuple, key_type=int)
trigger = app.channel()

@app.task()
async def on_start():
    print("App startet")
    await trigger.send(key="Hello", value="World")

@app.agent(trigger)
async def bar(stream):
    async for _ in stream:
        for x in range(10):
            table[x] = (555, 100000, False)
        print(f"{len(table)}   {table.keys()}")

Full traceback

[2021-07-12 10:31:45,574] [30156] [INFO] [^---Recovery]: Highwater for active changelog partitions:
┌Highwater - Active────────┬───────────┬───────────┐
│ topic                    │ partition │ highwater │
├──────────────────────────┼───────────┼───────────┤
│ foo.test-Table-changelog │ 0         │ 9         │
└──────────────────────────┴───────────┴───────────┘ 
[2021-07-12 10:31:45,605] [30156] [INFO] [^---Recovery]: active offsets at start of reading:
┌Reading Starts At - Active┬───────────┬────────┐
│ topic                    │ partition │ offset │
├──────────────────────────┼───────────┼────────┤
│ foo.test-Table-changelog │ 0         │ -1     │
└──────────────────────────┴───────────┴────────┘ 
[2021-07-12 10:31:45,609] [30156] [INFO] [^---Recovery]: Restoring state from changelog topics... 
[2021-07-12 10:31:45,610] [30156] [INFO] [^---Recovery]: Resuming flow... 
[2021-07-12 10:31:45,611] [30156] [INFO] [^---Fetcher]: Starting... 
[2021-07-12 10:31:45,617] [30156] [ERROR] [^--Consumer]: Drain messages raised: AttributeError("'ThrowableQueue' object has no attribute 'put_nowait_enhanced'") 
Traceback (most recent call last):
  File "/home/philipp/repos/tide2use/.venv/lib/python3.7/site-packages/faust/transport/consumer.py", line 1124, in _drain_messages
    await callback(message)
  File "/home/philipp/repos/tide2use/.venv/lib/python3.7/site-packages/faust/transport/conductor.py", line 274, in on_message
    return await get_callback_for_tp(message.tp)(message)
  File "/home/philipp/repos/tide2use/.venv/lib/python3.7/site-packages/faust/transport/conductor.py", line 140, in on_message
    queue.put_nowait_enhanced(
AttributeError: 'ThrowableQueue' object has no attribute 'put_nowait_enhanced'
[2021-07-12 10:31:45,619] [30156] [ERROR] [^---Fetcher]: Crashed reason=AttributeError("'ThrowableQueue' object has no attribute 'put_nowait_enhanced'") 
Traceback (most recent call last):
  File "/home/philipp/repos/tide2use/.venv/lib/python3.7/site-packages/mode/services.py", line 779, in _execute_task
    await task
  File "/home/philipp/repos/tide2use/.venv/lib/python3.7/site-packages/faust/transport/consumer.py", line 176, in _fetcher
    await consumer._drain_messages(self)
  File "/home/philipp/repos/tide2use/.venv/lib/python3.7/site-packages/faust/transport/consumer.py", line 1124, in _drain_messages
    await callback(message)
  File "/home/philipp/repos/tide2use/.venv/lib/python3.7/site-packages/faust/transport/conductor.py", line 274, in on_message
    return await get_callback_for_tp(message.tp)(message)
  File "/home/philipp/repos/tide2use/.venv/lib/python3.7/site-packages/faust/transport/conductor.py", line 140, in on_message
    queue.put_nowait_enhanced(
AttributeError: 'ThrowableQueue' object has no attribute 'put_nowait_enhanced'
[2021-07-12 10:31:45,621] [30156] [INFO] [^Worker]: Stopping... 
[2021-07-12 10:31:45,622] [30156] [INFO] [^-App]: Stopping... 
[2021-07-12 10:31:45,622] [30156] [INFO] [^---Fetcher]: Stopping... 
[2021-07-12 10:31:45,622] [30156] [INFO] [^-App]: Wait for streams... 
[2021-07-12 10:31:45,623] [30156] [INFO] [^--TableManager]: Stopping... 
[2021-07-12 10:31:45,623] [30156] [INFO] [^---Fetcher]: Stopping... 
[2021-07-12 10:31:45,623] [30156] [INFO] [^---Recovery]: Stopping... 
[2021-07-12 10:31:45,624] [30156] [INFO] [^---Recovery]: Done reading from changelog topics 
[2021-07-12 10:31:45,624] [30156] [INFO] [^---Recovery]: Recovery complete 
[2021-07-12 10:31:45,624] [30156] [WARNING] Error in recovery  
[2021-07-12 10:31:45,625] [30156] [INFO] [^---Recovery]: Restore complete! 
[2021-07-12 10:31:45,625] [30156] [INFO] [^---Table: Table]: Stopping... 
[2021-07-12 10:31:45,626] [30156] [INFO] [^----Store: memory:]: Stopping... 
[2021-07-12 10:31:45,626] [30156] [INFO] [^-App]: Flush producer buffer... 
[2021-07-12 10:31:45,627] [30156] [INFO] [^---Conductor]: Stopping... 
[2021-07-12 10:31:45,628] [30156] [INFO] [^--AgentManager]: Stopping... 
[2021-07-12 10:31:45,628] [30156] [INFO] [^---Agent: __main__.bar]: Stopping... 
[2021-07-12 10:31:45,629] [30156] [INFO] [^----OneForOneSupervisor: (1@0x7f6200745d10)]: Stopping... 
[2021-07-12 10:31:45,631] [30156] [INFO] [^--ReplyConsumer]: Stopping... 
[2021-07-12 10:31:45,631] [30156] [INFO] [^--LeaderAssignor]: Stopping... 
[2021-07-12 10:31:45,632] [30156] [INFO] [^--Consumer]: Stopping... 
[2021-07-12 10:31:45,634] [30156] [INFO] [^---AIOKafkaConsumerThread]: Stopping... 
[2021-07-12 10:31:45,656] [30156] [INFO] LeaveGroup request succeeded 
[2021-07-12 10:31:47,155] [30156] [INFO] [^--Web]: Stopping... 
[2021-07-12 10:31:47,156] [30156] [INFO] [^---Server]: Stopping... 
[2021-07-12 10:31:47,157] [30156] [INFO] [^--Web]: Cleanup 
[2021-07-12 10:31:47,159] [30156] [INFO] [^--CacheBackend]: Stopping... 
[2021-07-12 10:31:47,161] [30156] [INFO] [^--Producer]: Stopping... 
[2021-07-12 10:31:47,182] [30156] [INFO] [^---ProducerBuffer]: Stopping... 
[2021-07-12 10:31:47,183] [30156] [INFO] [^--Monitor]: Stopping... 
[2021-07-12 10:31:47,184] [30156] [INFO] [^Worker]: Gathering service tasks... 
[2021-07-12 10:31:47,184] [30156] [INFO] [^Worker]: Gathering all futures... 
[2021-07-12 10:31:48,186] [30156] [INFO] [^Worker]: Closing event loop 
[2021-07-12 10:31:48,187] [30156] [CRITICAL] [^Worker]: We experienced a crash! Reraising original exception...

Versions

The error also occurs with version 0.6.9

PJ-Schulz commented 3 years ago

If I use Faust==1.10.4. The application also runs the first time I start it. But when I restart, the application hangs and does nothing. No exception is thrown either.

diazharizky commented 3 years ago

I faced the same problem here, the error description was exactly as I had. Do you have your problem solved?

bobh66 commented 3 years ago

If you have faust 1.10.4 then you are running the original code base and not this fork.

You want to pip install "faust-streaming" which will pull in the latest version of this fork and all of the patches associated with it.

One of those changes is to use "mode-streaming" instead of "mode" so you won't see this problem.

Hope this helps