confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
267 stars 3.15k forks source link

Kafka Consumer has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [executor-Heartbeat] #4457

Open ginwakeup opened 1 year ago

ginwakeup commented 1 year ago

Description

From today, Kafka started removing consumers because of a heartbeat error.

The error specifically is:

2023-10-05 15:07:20,881 INFO [GroupCoordinator 3]: Dynamic Member with unknown member id joins group *** in Empty state. Created a new member id ******* for this member and add to the group. (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-10]
2023-10-05 15:07:20,881 INFO [GroupCoordinator 3]: Preparing to rebalance group *** in state PreparingRebalance with old generation 2 (__consumer_offsets-29) (reason: Adding new member **** with group instance id None; client reason: not provided) (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-10]
2023-10-05 15:07:23,881 INFO [GroupCoordinator 3]: Stabilized group *** generation 3 (__consumer_offsets-29) with 1 members (kafka.coordinator.group.GroupCoordinator) [executor-Rebalance]
2023-10-05 15:07:33,882 INFO [GroupCoordinator 3]: Member *** in group*** has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [executor-Heartbeat]
2023-10-05 15:07:33,883 INFO [GroupCoordinator 3]: Preparing to rebalance group *** in state PreparingRebalance with old generation 3 (__consumer_offsets-29) (reason: removing member **** on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator) [executor-Heartbeat]
2023-10-05 15:07:33,883 INFO [GroupCoordinator 3]: Group ****2 with generation 4 is now empty (__consumer_offsets-29) (kafka.coordinator.group.GroupCoordinator) [executor-Heartbeat]

I tried to set session.timeout.ms: 30000

with no luck.

We use Strimzi for deployment, here's the full k8s deployment file:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: ***
spec:
  kafka:
    version: 3.4.0 #Apache Kafka version
    replicas: 4
    resources:
      requests:
        cpu: 1000m
        memory: 3Gi
      limits:
        cpu: 2000m
        memory: 6Gi
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
        authentication:
          type: scram-sha-512
      - name: tls
        port: 9093
        type: internal
        tls: false
      - name: external
        port: 9094
        type: loadbalancer
        tls: false
        authentication:
          type: scram-sha-512
        configuration:
          bootstrap:
            loadBalancerIP: ****
          brokers:
          - broker: 0
            loadBalancerIP: ***
          - broker: 1
            loadBalancerIP: ***
          - broker: 2
            loadBalancerIP: ***
          - broker: 3
            loadBalancerIP: ***
    authorization:
      type: simple
    config:
      auto.create.topics.enable: "true"
      offsets.topic.replication.factor: 4
      transaction.state.log.replication.factor: 4
      transaction.state.log.min.isr: 2
      default.replication.factor: 4
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.4.0"
      num.recovery.threads.per.data.dir: 12
      num.network.threads: 4
      num.io.threads: 12
      session.timeout.ms: 30000
    readinessProbe:
      initialDelaySeconds: 30
      timeoutSeconds: 30
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
        class: ***-retain
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 100Gi
      deleteClaim: false
      class: ***-retain
  entityOperator:
    topicOperator: {}
    userOperator: {}

How to reproduce

I am not entirely sure. We are just spawning a new Consumer using Kafka Python, on Python 3.7.3 The Consumer uses an account which has access to all topics using ACL:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: myuser
  labels:
    strimzi.io/cluster: ***
spec:
  authentication:
    type: scram-sha-512
    password:
      valueFrom:
        secretKeyRef:
          name: myuser-secret
          key: password
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: "*"
          patternType: literal
        operations:
          - All
        host: "*"
      - resource:
          type: group
          name: "*"
          patternType: literal
        operations:
          - All
        host: "*"

      - resource:
          type: cluster
          name: "*"
          patternType: literal
        operations:
          - All
        host: "*"

I can see the Consumer Group being created through Kafka UI, but after the rebalance is completed we see the error and the group is empty.

The python script simply hangs and there's no polling.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

ginwakeup commented 1 year ago

Some more info, we noticed that this does not happens with other topics, but only with a topic that has 179963 messages and 361722 total offsets. Could it be the size of it? It's currently taking 2GB of space. segment.bytes: 1073741824 max.message.bytes: 1048588 delete.retention.ms: 86400000 retention.bytes: -1 segment.ms: 604800000.

After setting:

heartbeat_interval_ms=1000,
session_timeout_ms=20000

on the consumer, the group is now live and the consumer is connected, but the consumer does not get any message from Kafka

gchicoye commented 10 months ago

I've got the same kind of issues... Any news?

Akshayjain60165 commented 5 months ago

Hey, I am getting the same issue, but I am able to poll after 30 mins, any solution that worked for you?

helloHKTK commented 5 months ago

您好:    很高兴收到您的来信,我会尽快阅读并回复您!祝您工作顺利、生活愉快!

Akshayjain60165 commented 5 months ago

Thanks for your reply, I can explain you my exact issue, I have multiple topics with I consume from a single groupId, whenever I rollout restart my service my consumption gets stop for 30 mins and after 30 mins its automatically starts consumption, when I see the logs it showing me that dynamic member joins the group and after some time I am able to see the log of removing it from the group (kafka.coordinator.group.GroupCoordinator) [executor-Heartbeat]. Please help me with this.

HNitzanH commented 5 months ago

I am having the same issue, with only 1 topic that contains 1 message, I am using the exact consumer code from the example you provided

Akshayjain60165 commented 5 months ago

I am getting message that topic has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [executor-Heartbeat] and then again joining and can see rejoin group. what can be the issue ?

progiri commented 5 months ago

I had same issue after down up consumers container. But after 15 mins it's started work. I don't know why it's start get this error and why it's started work after 15 mins.