faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

LeaveGroup request after broker_max_poll_interval hangs app #138

Open PJ-Schulz opened 3 years ago

PJ-Schulz commented 3 years ago

Hello

I have an error after changing the package faust to faust-streaming. I have a timer that sends a message to a channel at a cyclic interval. An agent iterates over this channel and puts the message in a table. After a certain time, the assigned partititon for the table is revoked. This causes the app to hang and nothing is executed, neither the timer nor the agent iterating over the channel.

The error occurs because the app does not make a poll request in the specified time interval. The only poll requests made by the app are those made at startup to restore the Table from changelog topic. The time interval specified here refers to the setting: broker_max_poll_interval(default 1000s). If I set this to 120 seconds, the operation will occur after exactly 120 seconds.

If I iterate over with another agent over a different topic at the same time, this error never occurs because a poll is called cyclically by the agent and the app runs indefinitely. However, I would like to run my app without the second agent. From my point of view, it can't be right that a LeaveGroupRequest is executed for the app and the app hangs.

To fix this Error the chngelog-topic for the table would have to poll cyclically. The time for this cycle must be less than the time set for broker_max_poll_interval.

This error only occurs with faust-streaming package, not with faust!

Unfortunately, I don't know how to implement this without an agent in my code. Does anyone have the same usecase or the same Error? It is possible to change the behavior of the poll stratety for the table-changelog Topic in the faust source Code

My Code:

import faust
from random import random

app = faust.App(
    'xyz.app03', 
    broker='xxx',
    broker_credentials={...},
    topic_disable_leader=True,
    topic_replication_factor=3)

table = app.Table('table', partitions=1, default=str)
topic = app.topic('xyz.topic')
queue = app.channel()

@app.timer(10.0)
async def job():
    await queue.send(key="Hello World", value=str(random()))

@app.agent(queue)
async def iterrate(stream):
    async for key, value in stream.items():
        table[key] = value
        print(table[key])

Versions

patkivikram commented 3 years ago

I think this is because of aiokafka and could be related to https://github.com/aio-libs/aiokafka/issues/720

oneryalcin commented 2 years ago

hi @PJ-Schulz , @patkivikram were you able to find a better solution for this issue? Latest version of aiokafka doesn't seem to be fixed

wbarnha commented 1 year ago

This error only occurs with faust-streaming package, not with faust!

This may be due to https://github.com/robinhood/aiokafka having different functionality from https://github.com/aio-libs/aiokafka