faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

self._committed_offset.update(committable_offsets) if did_commit==False #316

Open ostetsenko opened 2 years ago

ostetsenko commented 2 years ago

Checklist

Used 2 pods for consumption. We have a load just at 9 am for 1 hour every day. enable_auto_commit=False (by faust-streaming framework)

Heartbeat called _maybe_leave_group()

# If consumer is idle (no records consumed) for too long we need
# to leave the group
idle_time = self._subscription.fetcher_idle_time
if idle_time < self._max_poll_interval:
    sleep_time = min(
        sleep_time,
        self._max_poll_interval - idle_time)
else:
    await self._maybe_leave_group()

Logs from pod1 (pod1 continued to process messages):

 {"name": "aiokafka.consumer.group_coordinator", "levelno": 20, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 359, "message": "LeaveGroup request succeeded", "@timestamp": "2022-06-07T15:35:02.033104", "level": "INFO", "@version": "1"}
{"name": "aiokafka.consumer.group_coordinator", "levelno": 20, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 384, "message": "Revoking previously assigned partitions frozenset({TopicPartition(topic='topic1', partition=0), TopicPartition(topic='topic2', partition=0), ....}) for group group-id-07-06-22", "@timestamp": "2022-06-07T15:35:02.033347", "level": "INFO", "@version": "1"}

Logs from pod2 (pod2 locked up):

 {"name": "aiokafka.consumer.group_coordinator", "levelno": 20, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 359, "message": "LeaveGroup request succeeded", "@timestamp": "2022-06-07T15:35:02.033104", "level": "INFO", "@version": "1"}
{"name": "aiokafka.consumer.group_coordinator", "levelno": 20, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 384, "message": "Revoking previously assigned partitions frozenset({TopicPartition(topic='topic1', partition=0), TopicPartition(topic='topic2', partition=0), ....}) for group group-id-07-06-22", "@timestamp": "2022-06-07T15:35:02.033347", "level": "INFO", "@version": "1"}
{"name": "aiokafka.consumer.group_coordinator", "levelno": 40, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 1043, "message": "OffsetCommit failed for group group-id-07-06-22 due to group error ([Error 25] UnknownMemberIdError: group-id-07-06-22), will rejoin", "@timestamp": "2022-06-07T15:35:02.150061", "level": "ERROR", "@version": "1"}
{"name": "aiokafka.consumer.group_coordinator", "levelno": 40, "pathname": "/app/.local/lib/python3.9/site-packages/aiokafka/consumer/group_coordinator.py", "filename": "group_coordinator.py", "exc_info": null, "exc_text": null, "lineno": 1052, "message": "OffsetCommit failed for group group-id-07-06-22 due to group error ([Error 25] UnknownMemberIdError: group-id-07-06-22), will rejoin", "@timestamp": "2022-06-07T15:35:02.150212", "level": "ERROR", "@version": "1"}
Logs from 07/06/202

After research, I see it when did_commit is False by UnknownMemberIdError. Looks like need to add tab before these lines

Versions

wbarnha commented 2 years ago

If you have a solution for this, could you please file a PR?