Open Tinanuaa opened 2 months ago
Hi @Tinanuaa, thank you for reporting the issue. It's unclear from the report, have you tried version 0.11.0? The symptom looks pretty much like #983, fixed in the latest release.
Hi @Tinanuaa, thank you for reporting the issue. It's unclear from the report, have you tried version 0.11.0? The symptom looks pretty much like #983, fixed in the latest release.
Yes, I did test on version 0.11.0 as well, I updated the tests result in the issue description above. It seems that when the RequestTimedOutError happen, the consumer can't receive messages, so no "New Messages Received..." printed out in the console. But after change the wait_for method to use asyncio.wait_for, it can receive messages and no RequestTimedOutError printed in the console.
I'm also experiencing the same issue
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 0)for group python-model-io-event-processor.
OffsetCommit failed for group python-model-io-event-processor due to group error ([Error 25] UnknownMemberIdError: python-model-io-event-processor), will rejoin
OffsetCommit failed for group python-model-io-event-processor due to group error ([Error 25] UnknownMemberIdError: python-model-io-event-processor), will rejoin
OffsetCommit failed for group python-model-io-event-processor due to group error ([Error 25] UnknownMemberIdError: python-model-io-event-processor), will rejoin
OffsetCommit failed for group python-model-io-event-processor due to group error ([Error 25] UnknownMemberIdError: python-model-io-event-processor), will rejoin
@Tinanuaa any luck? Also tried upgrading to 0.11 with no luck
For anyone else that ends up here, it was so dumb. Just turned out to be timeout vars, try that first (added all the timeout ones under client_id
and it fixed everything)
consumer = AIOKafkaConsumer(
bootstrap_servers=[KAFKA_URL],
group_id=f"something",
client_id=f"something-{uuid.uuid4()}",
# Increase session timeout (default is 10 seconds)
session_timeout_ms=60000,
# Increase heartbeat interval (default is 3 seconds)
heartbeat_interval_ms=20000,
# Increase max poll interval (default is 5 minutes)
max_poll_interval_ms=600000,
# Enable auto commit (if not already enabled)
enable_auto_commit=True,
# Increase auto commit interval (default is 5 seconds)
auto_commit_interval_ms=15000,
# Adjust fetch max wait time
fetch_max_wait_ms=2000,
# Increase request timeout
request_timeout_ms=65000,
)````
Describe the bug I have a long running fastapi app which will start a consumer on app start and consume messages from kafka. The consumer works fine for maybe a week and then it suddenly stop receiving any messages while the consumer is not closed. It happened twice and I checked the log, the common error in the log before the consumer stopped consuming messages was
Failed fetch messages from 1001: [Error 7] RequestTimedOutError
. After I restarted the service, the consumer can pick up from the last stopped offset and reconsume all the missed messages.Expected behaviour A long running consumer can keep running for long time and consume messages.
Environment (please complete the following information):
Reproducible example
I use the following scripts to reproduce the problem. So the sleep(40) will make the request timeout, conn.send() will raise Timeout exception in version 0.10.0 and 0.11.0, which make client.send() raise RequestTimedOutError. But this not happen in aiokafka 0.7.2. I've went through these issues (#983 and #802 ), and found that in conn.send, if we use asyncio.wait_for instead of utils.wait_for, then the consumer can work after timeout. My guess is these two methods have some differences which cause the problem, I tried to find the root cause but failed, so I just post my findings here.
With the above script, I used aiokafka 0.7.2, aiokafka 0.10.0 and an updated conn.py (change line 453 from
return wait_for(fut, self._request_timeout)
toreturn asyncio.wait_for(fut, self._request_timeout)
) in aiokafka 0.10.0. The output is like belowaiokafka 0.7.2
aiokafka 0.10.0
aiokafka 0.11.0
aiokafka 0.10.0 after change wait_for to asyncio.wait_for() in conn.py::send()
aiokafka 0.11.0 after change wait_for to asyncio.wait_for() in conn.py::send()