confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
84 stars 892 forks source link

GroupCoordinator: SASL authentication error: SaslAuthenticateRequest failed: Local: Broker transport failure (after 0ms in state DOWN) #1780

Open ronchler opened 2 months ago

ronchler commented 2 months ago

Description

This message spams our log. Is it a real error, or can this be ignored?

%3|1720719719.448|FAIL|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: SASL authentication error: SaslAuthenticateRequest failed: Local: Broker transport failure (after 0ms in state DOWN)

All operations on the consumer group work and succeed, but then closing it, we get that "error" message.

I don't quite get it, but it seems that for some reason when closing the consumer group, the lib tries to (re-)auth the connection. Even though it is in the middle of terminating it.

I would gladly take any suggestion how to avoid the message.

How to reproduce

# docker-compose.yaml
---
# docker compose -f ./docker-compose.yaml run --build test
# docker compose -f ./docker-compose.yaml run --build test
# docker compose -f ./docker-compose.yaml run --build --env "DEBUG_LEVEL=broker,security" test > security.log 2>&1
# docker compose -f ./docker-compose.yaml run --build --env "DEBUG_LEVEL=all" test > all.log 2>&1
# %3|1720719719.448|FAIL|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: SASL authentication error: SaslAuthenticateRequest failed: Local: Broker transport failure (after 0ms in state DOWN)
name: bug

services:
  test:
    environment:
      - DEBUG_LEVEL=
      # - DEBUG_LEVEL=broker,security
      # - DEBUG_LEVEL=all
    build:
      dockerfile_inline: |
        FROM alpine:3.20

        RUN apk add --no-cache gcc musl-dev librdkafka-dev~=2.4 python3-dev py3-pip

        WORKDIR /app
        RUN pip install confluent-kafka==2.4.0 --break-system-packages

        # COPY test.py test.py

        CMD python test.py
    configs:
      - source: "test.py"
        target: /app/test.py
    depends_on:
      kafka:
        condition: service_healthy

  kafka:
    hostname: kafka.cloud
    image: "confluentinc/cp-kafka:7.6.1"
    environment:
      - KAFKA_NODE_ID=0
      - KAFKA_PROCESS_ROLES=controller,broker
      - KAFKA_LISTENERS=INTERNAL://:9092,CONTROLLER://:9093
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,INTERNAL:SASL_PLAINTEXT
      - KAFKA_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
      # confluent
      - CLUSTER_ID=YONsFVF4e6Jm0dNNxf9PP0
      - KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka.cloud:9092
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      # sasl support
      - KAFKA_SASL_ENABLED_MECHANISMS=SCRAM-SHA-512,PLAIN
      - KAFKA_SASL_MECHANISM_PLAINTEXT_PROTOCOL=SCRAM-SHA-512
      - KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL=SCRAM-SHA-512
      - KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
    healthcheck:
      test: [ "CMD", "nc", "-z", "kafka.cloud", "9092" ]
      interval: 10s
      retries: 5
      start_period: 20s
      timeout: 10s
    configs:
      - source: "kafka_server_jaas.conf"
        target: /etc/kafka/kafka_server_jaas.conf
    command:
      - /bin/bash
      - -c
      - |
        /etc/confluent/docker/configure
        kafka-storage format --config /etc/kafka/kafka.properties --cluster-id YONsFVF4e6Jm0dNNxf9PP0 \
          --add-scram 'SCRAM-SHA-512=[name=admin,password=admin-password]' \
          --add-scram 'SCRAM-SHA-512=[name=client,password=client-password]' \
          --ignore-formatted
        /etc/confluent/docker/run

configs:
  kafka_server_jaas.conf:
    content: |
      KafkaServer {
          org.apache.kafka.common.security.scram.ScramLoginModule required
          username="admin"
          password="admin-password";
      };

  test.py:
    content: |
      import os
      import confluent_kafka, confluent_kafka.admin

      print("confluent_kafka.version:", confluent_kafka.version())
      print("confluent_kafka.libversion:", confluent_kafka.libversion())

      cfg = {
          "bootstrap.servers": "kafka.cloud:9092",
          "sasl.mechanism": "SCRAM-SHA-512",
          "security.protocol": "SASL_PLAINTEXT",
          "sasl.username": "client",
          "sasl.password": "client-password",
      }

      if os.environ["DEBUG_LEVEL"]:
        cfg['debug'] = os.environ["DEBUG_LEVEL"]

      kafka_admin = confluent_kafka.admin.AdminClient(cfg)

      # create topics
      for topic_name, future in kafka_admin.create_topics(
          new_topics=[confluent_kafka.admin.NewTopic("topic1", num_partitions=1)]
      ).items():
          try:
            future.result()
          except:
              pass

      # create consumer
      consumer = confluent_kafka.Consumer(
          cfg.copy()
          | {
              "group.id": "MyGroup",
              "enable.auto.commit": False,
          }
      )

      topic_partition = confluent_kafka.TopicPartition(topic="topic1", partition=0)
      low_offset, high_offset = consumer.get_watermark_offsets(topic_partition)

      assert low_offset == 0
      assert high_offset == 0

Run

docker compose -f ./docker-compose.yaml run --build test
docker compose -f ./docker-compose.yaml run --build test
docker compose -f ./docker-compose.yaml run --build --env "DEBUG_LEVEL=broker,security" test > security.log 2>&1
docker compose -f ./docker-compose.yaml run --build --env "DEBUG_LEVEL=all" test > all.log 2>&1
docker compose -f ./docker-compose.yaml down

After the second "run", you should see the log message:

%3|1720719719.448|FAIL|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: SASL authentication error: SaslAuthenticateRequest failed: Local: Broker transport failure (after 0ms in state DOWN)

See security.log and all.log

Checklist

Please provide the following information: