wbarnha / kafka-python-ng

Fork for Python client for Apache Kafka
https://wbarnha.github.io/kafka-python-ng/
Apache License 2.0
67 stars 8 forks source link

KIP-345 Static membership implementation #137

Closed wbarnha closed 6 months ago

wbarnha commented 6 months ago

Hello!

I understand that the project is not actively maintained due to this issue nowadays but I hope this changes may help if new maintainer would be found.

I tried to implement stable static consumer group membership as it was declared in KIP-345, was released in 2.3.0 and mentioned in #2147.

This behaviour now would be reached by setting the next two values:

'group_instance_id': 'name',
'leave_group_on_close': False,

After that group rebalancing won't be triggered on consumer restarts or new deployments with the same group_instance_id, but developers should set the value of session_timeout_ms (and other related) big enough.

The second param leave_group_on_close is required because of this issue.

So, now you can see in a broker's log something like that:

# On group join

INFO [GroupCoordinator 1001]: Static Member with unknown member id joins group group-id in Empty state. Created a new member id instance-id-9e63faa7-a270-456d-93fb-7236a0d47a34 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 1001]: Adding new static member Some(instance-id) to group group-id with member id instance-id-9e63faa7-a270-456d-93fb-7236a0d47a34. (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 1001]: Preparing to rebalance group group-id in state PreparingRebalance with old generation 17 (__consumer_offsets-47) (reason: Adding new member instance-id-9e63faa7-a270-456d-93fb-7236a0d47a34 with group instance id Some(instance-id)) (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 1001]: Stabilized group group-id generation 18 (__consumer_offsets-47) (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 1001]: Assignment received from leader for group group-id for generation 18 (kafka.coordinator.group.GroupCoordinator)

 # After rejoining the group

INFO [GroupCoordinator 1001]: Static member Some(instance-id) of group group-id with unknown member id rejoins, assigning new member id instance-id-3a11b72d-b9c4-4919-adc2-0ebaf5adcfdf, while old member id instance-id-9e63faa7-a270-456d-93fb-7236a0d47a34 will be removed. (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 1001]: Static member which joins during Stable stage and doesnt affect selectProtocol will not trigger rebalance. (kafka.coordinator.group.GroupCoordinator)

Also I didn't touch functionality that this KIP does not implement, as you can see in JoinGroupRequest / SyncGroupRequest / LeaveGroupRequest.

I would be glad to hear what should I do else to make this PR merged.


This change is Reviewable