deephaven / deephaven-core

Deephaven Community Core
Other
237 stars 79 forks source link

Kafka consumer will never be a member of a consumer group - The coordinator is not aware of this member #4317

Open lzwaan opened 11 months ago

lzwaan commented 11 months ago

Deephaven's kafka consumer will never be a member of a consumer group When deephaven instances wil consume from the same kafka topic and have their group.id set, they will never be an active member. This allows normally for parallel processing of the data in those topics. see https://developer.confluent.io/courses/architecture/consumer-group-protocol/

When you use redpanda console you will see there will be no active member in a consumer group. So when you use 2 deephaven instances to consume the both will read the same kafka data, while the should devide the workload if they have the same group id.

Config deephaven consumer. De group instance id, session timeout ms and heartbeat interval ms doesn't fix the issue

kafka_config = {
    'bootstrap.servers': 'http://redpanda-0:9092',
    'group.id': 'cacti',
    'group.instance.id':  'consumer01',
    'session.timeout.ms': '90000',
    'heartbeat.interval.ms': '30000'
}

results =  kc.consume(
    kafka_config,
    KAFKA_TOPIC,
    key_spec=kc.KeyValueSpec.IGNORE,
    value_spec=kc.simple_spec('value'),
    table_type=kc.TableType.append()
)
Screenshot 2023-08-13 at 13 10 01

when i started 2 consumers: 1 deephaven intance and 1 vector consumer i see the data on both consumers but received an error mesage on the deephaven consumer:

"a.cacti.log.avro:ALL | c.c.i.ConsumerCoordinator | [Consumer instanceId=consumer01, clientId=consumer-cacti-consumer01, groupId=cacti] Offset commit failed on partition public.client.dda.cacti.log.avro-1 at offset 0: The coordinator is not aware of this member."

With deephaven consumer and a vector.dev consumer, you will see at least the last one, but still the deephaven consumer is not active member of this group, so you can never scale this way

Screenshot 2023-08-13 at 12 12 47

With only deephaven as a consumer -> no active consumer group members With only deephaven as a consumer

Expected results Seeing both consumers in the redpanda console

I also tried this on our confluent kafka cluster with the same bad results. If i try a simpel confluent kafka python consumer, you will get good results and you will see it as a part of the consumer group when group.id has been set.

So the problem is really been isolated to deephavens kafka consumer, as other type of consumers are working fine. You will find the info as well on the slack channel where we tried together to fix the issue.

deephaven-kafka-consumer

lzwaan commented 11 months ago

The way consumers maintain membership in a consumer group and ownership of the partitions assigned to them is by sending heartbeats to a Kafka broker designated as the group coordinator (this broker can be different for different consumer groups). As long as the consumer is sending heartbeats at regular intervals, it is assumed to be alive, well, and processing messages from its partitions. Heartbeats are sent when the consumer polls (i.e., retrieves records) and when it commits records it has consumed. Please see this good article: https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html

jcferretti commented 11 months ago

We believe the root cause of this issue is that DHC's implementation in KafkaIngester always uses the assign call in KafkaConsumer. To support consumer group auto balancing of partitions, it would have to use subscribe. This requires code changes in the implementation and some changes in the user-visible APIs of DHC's consumeToTable (Java/Groovy) and consume (python) so that the way to specify partitions can be made to imply "let the broker pick them for me based on my consumer group". This should check that a consumer group is actually set in the Kafka Properties being passed to the broker. Note that right now the consume python call has a default value for the partitions argument that implies all partitions; this would have to change too (the Java/groovy API is more evident in not supporting this scenario because it requires you to pass explicitly an ALL_PARTITIONS constant or a list).

We will put this feature in the backlog and will get to it as soon as priorities and resources allow us to; we will ensure this issue is updated with news in this area.

PS: Note that a user would need to use callbacks in the kafka consume API to ensure Kafka offsets are commited for this to work as intended "end to end"; the KafkaIngester/consumeToTable/consume implementation(s) can't possibly know when the use of the data from the table is "done", so it can't just commit after updating the table (but maybe that could be an option? @devinrsmith).