aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.16k stars 233 forks source link

UnknownMemberIdError causes consumer to lock up #727

Closed patkivikram closed 3 years ago

patkivikram commented 3 years ago

Describe the bug When we have multiple consumers running in Kubernetes join a group using faust(manual commits), we run into an issue where the group is stuck in a constant rebalance with the following logs. These logs are tailed from all the consumer. As you can see 2 consumers get -1 as the generation and the other 2 get the correct value The ones that get -1 are the ones that receive the following error during rebalance February 25th 2021, 12:19:40.754 OffsetCommit failed for group app due to group error ([Error 25] UnknownMemberIdError: app), will rejoin

February 25th 2021, 12:19:40.770 Successfully synced group app with generation -1 February 25th 2021, 12:19:40.770 Successfully synced group app with generation -1 February 25th 2021, 12:19:40.770 Successfully synced group app with generation 372 February 25th 2021, 12:19:40.770 Successfully synced group app with generation 372 February 25th 2021, 12:19:40.757 OffsetCommit failed for group app due to group error ([Error 25] UnknownMemberIdError: app), will rejoin February 25th 2021, 12:19:40.754 OffsetCommit failed for group app due to group error ([Error 25] UnknownMemberIdError: app), will rejoin February 25th 2021, 12:19:40.753 OffsetCommit failed for group app due to group error ([Error 25] UnknownMemberIdError: app), will rejoin February 25th 2021, 12:19:40.752 Elected group leader -- performing partition assignments using faust February 25th 2021, 12:19:40.752 OffsetCommit failed for group app due to group error ([Error 25] UnknownMemberIdError: app), will rejoin February 25th 2021, 12:19:40.751 Joined group 'app' (generation 372) with member_id faust-0.4.8rc6-30b24070-fbc9-4acd-9a45-9c1e05ce992e February 25th 2021, 12:19:40.748 Joined group 'app' (generation 372) with member_id faust-0.4.8rc6-f5f75be5-1b2c-431f-8d63-b198069eb0a5 February 25th 2021, 12:19:40.748 Joined group 'app' (generation 372) with member_id faust-0.4.8rc6-72990a08-0a25-43e8-89a1-27175c9ebcbe February 25th 2021, 12:19:40.748 Joined group 'app' (generation 372) with member_id faust-0.4.8rc6-90829dc4-4bc5-4d31-bf7b-1e5bc6a71aa5

Expected behaviour A clear and concise description of what you expected to happen.

February 25th 2021, 12:19:40.752 OffsetCommit failed for group app due to group error ([Error 25] UnknownMemberIdError: app), will rejoin Should not happen and -1 as the generation should not happen either

Environment (please complete the following information):

Reproducible example

# Add a short Python script or Docker configuration that can reproduce the issue.
patkivikram commented 3 years ago

@tvoinarovskyi just to clarify the log is from 4 consumers tailed together. 2 of them get generation as 372 and the other 2 get it as -1. The ones that get -1 are the ones which run into February 25th 2021, 12:19:40.752 OffsetCommit failed for group app due to group error ([Error 25] UnknownMemberIdError: app), will rejoin

vangheem commented 3 years ago

@patkivikram fwiw, it seemed like I was facing the same issue as you.

This change has been tested on an active cluster and seems to fix the problem: https://github.com/aio-libs/aiokafka/pull/747

patkivikram commented 3 years ago

thats Awesome!

patkivikram commented 3 years ago

@ods is the new release in pypi?

ods commented 3 years ago

@ods is the new release in pypi?

Sorry for delay, I'm working on it

ods commented 3 years ago

Release is ready

ostetsenko commented 2 years ago

ods pwilczynskiclearcode

Hello, I catch this error again.

aiokafka==0.7.2 python==3.9.12

Used 2 pods for consumption. We have a load just at 9 am for 1 hour every day. enable_auto_commit=False (by faust-streaming framework)

Heartbeat called _maybe_leave_group()

# If consumer is idle (no records consumed) for too long we need
# to leave the group
idle_time = self._subscription.fetcher_idle_time
if idle_time < self._max_poll_interval:
    sleep_time = min(
        sleep_time,
        self._max_poll_interval - idle_time)
else:
    await self._maybe_leave_group()

Logs from pod1 (pod1 continued to process messages):

 {"name": "aiokafka.consumer.group_coordinator", "levelno": 20, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 359, "message": "LeaveGroup request succeeded", "@timestamp": "2022-06-07T15:35:02.033104", "level": "INFO", "@version": "1"}
{"name": "aiokafka.consumer.group_coordinator", "levelno": 20, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 384, "message": "Revoking previously assigned partitions frozenset({TopicPartition(topic='topic1', partition=0), TopicPartition(topic='topic2', partition=0), ....}) for group group-id-07-06-22", "@timestamp": "2022-06-07T15:35:02.033347", "level": "INFO", "@version": "1"}

Logs from pod2 (pod2 locked up):

 {"name": "aiokafka.consumer.group_coordinator", "levelno": 20, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 359, "message": "LeaveGroup request succeeded", "@timestamp": "2022-06-07T15:35:02.033104", "level": "INFO", "@version": "1"}
{"name": "aiokafka.consumer.group_coordinator", "levelno": 20, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 384, "message": "Revoking previously assigned partitions frozenset({TopicPartition(topic='topic1', partition=0), TopicPartition(topic='topic2', partition=0), ....}) for group group-id-07-06-22", "@timestamp": "2022-06-07T15:35:02.033347", "level": "INFO", "@version": "1"}
{"name": "aiokafka.consumer.group_coordinator", "levelno": 40, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 1043, "message": "OffsetCommit failed for group group-id-07-06-22 due to group error ([Error 25] UnknownMemberIdError: group-id-07-06-22), will rejoin", "@timestamp": "2022-06-07T15:35:02.150061", "level": "ERROR", "@version": "1"}
{"name": "aiokafka.consumer.group_coordinator", "levelno": 40, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 1052, "message": "OffsetCommit failed for group group-id-07-06-22 due to group error ([Error 25] UnknownMemberIdError: group-id-07-06-22), will rejoin", "@timestamp": "2022-06-07T15:35:02.150212", "level": "ERROR", "@version": "1"}
Logs from 07/06/202
ostetsenko commented 2 years ago

Update:

See the same lock-up. But: UnknownMemberIdError didn't happen (all offsets were committed successfully). The last logs for the 2 pods are:

{"name": "aiokafka.consumer.group_coordinator", "levelno": 20, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 359, "message": "LeaveGroup request succeeded", "@timestamp": "2022-06-08T07:16:39.423066", "level": "INFO", "@version": "1"}
{"name": "aiokafka.consumer.group_coordinator", "levelno": 20, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 384, "message": "Revoking previously assigned partitions frozenset({TopicPartition(topic='topic1', partition=0), TopicPartition(topic='topic2', partition=0), ....}) for group group-id-07-06-22", "@timestamp": "2022-06-08T07:16:39.423410", "level": "INFO", "@version": "1"}

Looks like the reason is here or some code after this call

sam-orca commented 2 years ago

@ostetsenko : have you got the fixed ? Or the error is gone away for you?

ostetsenko commented 2 years ago

@sam-orca: Workaround: Parallel thread monitors a periodic task and restarts the service when the task wasn't run in TTL (the main thread was locked)