bitnami / containers

Bitnami container images
https://bitnami.com
Other
3.35k stars 4.82k forks source link

[bitnami/kafka] kafka-console-consumer.sh not work with SASL_SSL #67185

Closed hznuyyh closed 4 months ago

hznuyyh commented 4 months ago

Name and Version

bitnami/kafka:3.7

What architecture are you using?

amd64

What steps will reproduce the bug?

Using image bitnami/kafka:3.7 and docker compose

Docker compose configuration:

version: '2'

services:
  kafka:
    image: 'kafka:3.7'
    hostname: kafak.example.com
    ports:
      - '9092:9092'
    environment:
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CFG_LISTENERS=SASL_SSL://:9092,CONTROLLER://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
      - KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://:9092
      - KAFKA_CLIENT_USERS=user
      - KAFKA_CLIENT_PASSWORDS=password
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
      - KAFKA_CONTROLLER_USER=controller_user
      - KAFKA_CONTROLLER_PASSWORD=controller_password
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      - KAFKA_INTER_BROKER_USER=controller_user
      - KAFKA_INTER_BROKER_PASSWORD=controller_password
      - KAFKA_CERTIFICATE_PASSWORD=IQ65KHcr4VsS0TLO
      - KAFKA_TLS_TYPE=JKS # or PEM
      - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=SCRAM-SHA-512  
      - KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM= 
    volumes:
      # Both .jks and .pem files are supported
      # - './kafka.keystore.pem:/opt/bitnami/kafka/config/certs/kafka.keystore.pem:ro'
      # - './kafka.keystore.key:/opt/bitnami/kafka/config/certs/kafka.keystore.key:ro'
      # - './kafka.truststore.pem:/opt/bitnami/kafka/config/certs/kafka.truststore.pem:ro'
      - './keystore/kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro'
      - './truststore/kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro'

Kafka is Running.

I Used Saram-Golang Client to connect,It's worked and return the success message to me.

But When I used kafka-console-producer.sh

kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test --producer.config /opt/bitnami/kafka/config/producer.properties

returns

[2024-05-21 06:27:10,316] WARN [Consumer clientId=console-consumer, groupId=console-consumer-51739] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2024-05-21 06:27:10,477] WARN [Consumer clientId=console-consumer, groupId=console-consumer-51739] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

Kafka Log:

[2024-05-21 06:26:21,576] INFO [SocketServer listenerType=BROKER, nodeId=0] Failed authentication with /127.0.0.1 (channelId=127.0.0.1:9092-127.0.0.1:42938-0) (SSL handshake failed) (org.apache.kafka.common.network.Selector)
[2024-05-21 06:26:22,069] INFO [SocketServer listenerType=BROKER, nodeId=0] Failed authentication with /127.0.0.1 (channelId=127.0.0.1:9092-127.0.0.1:42940-0) (SSL handshake failed) (org.apache.kafka.common.network.Selector)

Looking at this file /opt/bitnami/kafka/config/producer.properties, I'm not sure if the problem is caused by the absence of the KAFKA_CLIENT_USERS and KAFKA_CLIENT_PASSWORDS

And Here is my saram-go client demo

SaramSource

package main

import (
    "crypto/tls"
    "crypto/x509"
    "flag"
    "log"
    "os"
    "os/signal"
    "strings"
    "time"

    "github.com/IBM/sarama"
)

func init() {
    sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
}

var (
    brokers       = flag.String("brokers", "x.x.x.x:9092,", "The Kafka brokers to connect to, as a comma separated list")
    version       = flag.String("version", sarama.DefaultVersion.String(), "Kafka cluster version")
    userName      = flag.String("username", "user", "The SASL username")
    passwd        = flag.String("passwd", "password", "The SASL password")
    algorithm     = flag.String("algorithm", "sha512", "The SASL SCRAM SHA algorithm sha256 or sha512 as mechanism")
    topic         = flag.String("topic", "default_topic", "The Kafka topic to use")
    certFile      = flag.String("certificate", "", "The optional certificate file for client authentication")
    keyFile       = flag.String("key", "", "The optional key file for client authentication")
    caFile        = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
    tlsSkipVerify = flag.Bool("tls-skip-verify", true, "Whether to skip TLS server cert verification")
    useTLS        = flag.Bool("tls", true, "Use TLS to communicate with the cluster")
    mode          = flag.String("mode", "produce", "Mode to run in: \"produce\" to produce, \"consume\" to consume")
    logMsg        = flag.Bool("logmsg", true, "True to log consumed messages to console")

    logger = log.New(os.Stdout, "[Producer] ", log.LstdFlags)
)

func createTLSConfiguration() (t *tls.Config) {
    t = &tls.Config{
        InsecureSkipVerify: *tlsSkipVerify,
    }
    if *certFile != "" && *keyFile != "" && *caFile != "" {
        cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
        if err != nil {
            log.Fatal(err)
        }

        caCert, err := os.ReadFile(*caFile)
        if err != nil {
            log.Fatal(err)
        }

        caCertPool := x509.NewCertPool()
        caCertPool.AppendCertsFromPEM(caCert)

        t = &tls.Config{
            Certificates:       []tls.Certificate{cert},
            RootCAs:            caCertPool,
            InsecureSkipVerify: *tlsSkipVerify,
        }
    }
    return t
}

func main() {
    flag.Parse()

    if *brokers == "" {
        log.Fatalln("at least one broker is required")
    }
    splitBrokers := strings.Split(*brokers, ",")

    version, err := sarama.ParseKafkaVersion(*version)
    if err != nil {
        log.Panicf("Error parsing Kafka version: %v", err)
    }

    if *userName == "" {
        log.Fatalln("SASL username is required")
    }

    if *passwd == "" {
        log.Fatalln("SASL password is required")
    }

    conf := sarama.NewConfig()
    conf.Producer.Retry.Max = 1
    conf.Producer.RequiredAcks = sarama.WaitForAll
    conf.Producer.Return.Successes = true
    conf.Version = version
    conf.ClientID = "sasl_scram_client"
    conf.Metadata.Full = true
    conf.Net.SASL.Enable = true
    conf.Net.SASL.User = *userName
    conf.Net.SASL.Password = *passwd
    conf.Net.SASL.Handshake = true

    //if *algorithm == "sha512" {
    //  conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
    //  conf.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
    //} else if *algorithm == "sha256" {
    //  conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
    //  conf.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
    //
    //} else {
    //  log.Fatalf("invalid SHA algorithm \"%s\": can be either \"sha256\" or \"sha512\"", *algorithm)
    //}

    if *useTLS {
        conf.Net.TLS.Enable = true
        conf.Net.TLS.Config = createTLSConfiguration()
    }

    if *mode == "consume" {
        consumer, err := sarama.NewConsumer(splitBrokers, conf)
        if err != nil {
            panic(err)
        }
        log.Println("consumer created")
        defer func() {
            if err := consumer.Close(); err != nil {
                log.Fatalln(err)
            }
        }()
        log.Println("commence consuming")
        partitionConsumer, err := consumer.ConsumePartition(*topic, 0, sarama.OffsetOldest)
        if err != nil {
            panic(err)
        }

        defer func() {
            if err := partitionConsumer.Close(); err != nil {
                log.Fatalln(err)
            }
        }()

        // Trap SIGINT to trigger a shutdown.
        signals := make(chan os.Signal, 1)
        signal.Notify(signals, os.Interrupt)

        consumed := 0
    ConsumerLoop:
        for {
            log.Println("in the for")
            select {
            case msg := <-partitionConsumer.Messages():
                log.Printf("Consumed message offset %d\n", msg.Offset)
                if *logMsg {
                    log.Printf("KEY: %s VALUE: %s", msg.Key, msg.Value)
                }
                consumed++
            case <-signals:
                break ConsumerLoop
            }
        }

        log.Printf("Consumed: %d\n", consumed)

    } else {
        syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
        if err != nil {
            logger.Fatalln("failed to create producer: ", err)
        }
        partition, offset, err := syncProducer.SendMessage(&sarama.ProducerMessage{
            Topic: *topic,
            Value: sarama.StringEncoder("test_message" + time.Now().Format(time.RFC3339)),
        })
        if err != nil {
            logger.Fatalln("failed to send message to ", *topic, err)
        }
        logger.Printf("wrote message at partition: %d, offset: %d", partition, offset)
        _ = syncProducer.Close()
    }
    logger.Println("Bye now !")
}

Also, note that line 98-106 of the demo, algorithm RAM-Sha-512 does not take effect. I have looked at other issues and others have encountered this problem

What is the expected behavior?

The ability to connect directly to the kafka cluster using scripts within the container

What do you see instead?

Here is all config

[2024-05-21 06:25:48,533] INFO KafkaConfig values: 
    advertised.listeners = SASL_SSL://:9092
    alter.config.policy.class.name = null
    alter.log.dirs.replication.quota.window.num = 11
    alter.log.dirs.replication.quota.window.size.seconds = 1
    authorizer.class.name = 
    auto.create.topics.enable = true
    auto.include.jmx.reporter = true
    auto.leader.rebalance.enable = true
    background.threads = 10
    broker.heartbeat.interval.ms = 2000
    broker.id = 0
    broker.id.generation.enable = true
    broker.rack = null
    broker.session.timeout.ms = 9000
    client.quota.callback.class = null
    compression.type = producer
    connection.failed.authentication.delay.ms = 100
    connections.max.idle.ms = 600000
    connections.max.reauth.ms = 0
    control.plane.listener.name = null
    controlled.shutdown.enable = true
    controlled.shutdown.max.retries = 3
    controlled.shutdown.retry.backoff.ms = 5000
    controller.listener.names = CONTROLLER
    controller.quorum.append.linger.ms = 25
    controller.quorum.election.backoff.max.ms = 1000
    controller.quorum.election.timeout.ms = 1000
    controller.quorum.fetch.timeout.ms = 2000
    controller.quorum.request.timeout.ms = 2000
    controller.quorum.retry.backoff.ms = 20
    controller.quorum.voters = [0@kafka:9093]
    controller.quota.window.num = 11
    controller.quota.window.size.seconds = 1
    controller.socket.timeout.ms = 30000
    create.topic.policy.class.name = null
    default.replication.factor = 1
    delegation.token.expiry.check.interval.ms = 3600000
    delegation.token.expiry.time.ms = 86400000
    delegation.token.master.key = null
    delegation.token.max.lifetime.ms = 604800000
    delegation.token.secret.key = null
    delete.records.purgatory.purge.interval.requests = 1
    delete.topic.enable = true
    early.start.listeners = null
    eligible.leader.replicas.enable = false
    fetch.max.bytes = 57671680
    fetch.purgatory.purge.interval.requests = 1000
    group.consumer.assignors = [org.apache.kafka.coordinator.group.assignor.UniformAssignor, org.apache.kafka.coordinator.group.assignor.RangeAssignor]
    group.consumer.heartbeat.interval.ms = 5000
    group.consumer.max.heartbeat.interval.ms = 15000
    group.consumer.max.session.timeout.ms = 60000
    group.consumer.max.size = 2147483647
    group.consumer.min.heartbeat.interval.ms = 5000
    group.consumer.min.session.timeout.ms = 45000
    group.consumer.session.timeout.ms = 45000
    group.coordinator.new.enable = false
    group.coordinator.rebalance.protocols = [classic]
    group.coordinator.threads = 1
    group.initial.rebalance.delay.ms = 3000
    group.max.session.timeout.ms = 1800000
    group.max.size = 2147483647
    group.min.session.timeout.ms = 6000
    initial.broker.registration.timeout.ms = 60000
    inter.broker.listener.name = SASL_SSL
    inter.broker.protocol.version = 3.7-IV4
    kafka.metrics.polling.interval.secs = 10
    kafka.metrics.reporters = []
    leader.imbalance.check.interval.seconds = 300
    leader.imbalance.per.broker.percentage = 10
    listener.security.protocol.map = CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    listeners = SASL_SSL://:9092,CONTROLLER://:9093
    log.cleaner.backoff.ms = 15000
    log.cleaner.dedupe.buffer.size = 134217728
    log.cleaner.delete.retention.ms = 86400000
    log.cleaner.enable = true
    log.cleaner.io.buffer.load.factor = 0.9
    log.cleaner.io.buffer.size = 524288
    log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
    log.cleaner.max.compaction.lag.ms = 9223372036854775807
    log.cleaner.min.cleanable.ratio = 0.5
    log.cleaner.min.compaction.lag.ms = 0
    log.cleaner.threads = 1
    log.cleanup.policy = [delete]
    log.dir = /tmp/kafka-logs
    log.dirs = /bitnami/kafka/data
    log.flush.interval.messages = 9223372036854775807
    log.flush.interval.ms = null
    log.flush.offset.checkpoint.interval.ms = 60000
    log.flush.scheduler.interval.ms = 9223372036854775807
    log.flush.start.offset.checkpoint.interval.ms = 60000
    log.index.interval.bytes = 4096
    log.index.size.max.bytes = 10485760
    log.local.retention.bytes = -2
    log.local.retention.ms = -2
    log.message.downconversion.enable = true
    log.message.format.version = 3.0-IV1
    log.message.timestamp.after.max.ms = 9223372036854775807
    log.message.timestamp.before.max.ms = 9223372036854775807
    log.message.timestamp.difference.max.ms = 9223372036854775807
    log.message.timestamp.type = CreateTime
    log.preallocate = false
    log.retention.bytes = -1
    log.retention.check.interval.ms = 300000
    log.retention.hours = 168
    log.retention.minutes = null
    log.retention.ms = null
    log.roll.hours = 168
    log.roll.jitter.hours = 0
    log.roll.jitter.ms = null
    log.roll.ms = null
    log.segment.bytes = 1073741824
    log.segment.delete.delay.ms = 60000
    max.connection.creation.rate = 2147483647
    max.connections = 2147483647
    max.connections.per.ip = 2147483647
    max.connections.per.ip.overrides = 
    max.incremental.fetch.session.cache.slots = 1000
    message.max.bytes = 1048588
    metadata.log.dir = null
    metadata.log.max.record.bytes.between.snapshots = 20971520
    metadata.log.max.snapshot.interval.ms = 3600000
    metadata.log.segment.bytes = 1073741824
    metadata.log.segment.min.bytes = 8388608
    metadata.log.segment.ms = 604800000
    metadata.max.idle.interval.ms = 500
    metadata.max.retention.bytes = 104857600
    metadata.max.retention.ms = 604800000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    min.insync.replicas = 1
    node.id = 0
    num.io.threads = 8
    num.network.threads = 3
    num.partitions = 1
    num.recovery.threads.per.data.dir = 1
    num.replica.alter.log.dirs.threads = null
    num.replica.fetchers = 1
    offset.metadata.max.bytes = 4096
    offsets.commit.required.acks = -1
    offsets.commit.timeout.ms = 5000
    offsets.load.buffer.size = 5242880
    offsets.retention.check.interval.ms = 600000
    offsets.retention.minutes = 10080
    offsets.topic.compression.codec = 0
    offsets.topic.num.partitions = 50
    offsets.topic.replication.factor = 1
    offsets.topic.segment.bytes = 104857600
    password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
    password.encoder.iterations = 4096
    password.encoder.key.length = 128
    password.encoder.keyfactory.algorithm = null
    password.encoder.old.secret = null
    password.encoder.secret = null
    principal.builder.class = class org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
    process.roles = [controller, broker]
    producer.id.expiration.check.interval.ms = 600000
    producer.id.expiration.ms = 86400000
    producer.purgatory.purge.interval.requests = 1000
    queued.max.request.bytes = -1
    queued.max.requests = 500
    quota.window.num = 11
    quota.window.size.seconds = 1
    remote.log.index.file.cache.total.size.bytes = 1073741824
    remote.log.manager.task.interval.ms = 30000
    remote.log.manager.task.retry.backoff.max.ms = 30000
    remote.log.manager.task.retry.backoff.ms = 500
    remote.log.manager.task.retry.jitter = 0.2
    remote.log.manager.thread.pool.size = 10
    remote.log.metadata.custom.metadata.max.bytes = 128
    remote.log.metadata.manager.class.name = org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager
    remote.log.metadata.manager.class.path = null
    remote.log.metadata.manager.impl.prefix = rlmm.config.
    remote.log.metadata.manager.listener.name = null
    remote.log.reader.max.pending.tasks = 100
    remote.log.reader.threads = 10
    remote.log.storage.manager.class.name = null
    remote.log.storage.manager.class.path = null
    remote.log.storage.manager.impl.prefix = rsm.config.
    remote.log.storage.system.enable = false
    replica.fetch.backoff.ms = 1000
    replica.fetch.max.bytes = 1048576
    replica.fetch.min.bytes = 1
    replica.fetch.response.max.bytes = 10485760
    replica.fetch.wait.max.ms = 500
    replica.high.watermark.checkpoint.interval.ms = 5000
    replica.lag.time.max.ms = 30000
    replica.selector.class = null
    replica.socket.receive.buffer.bytes = 65536
    replica.socket.timeout.ms = 30000
    replication.quota.window.num = 11
    replication.quota.window.size.seconds = 1
    request.timeout.ms = 30000
    reserved.broker.max.id = 1000
    sasl.client.callback.handler.class = null
    sasl.enabled.mechanisms = [PLAIN, SCRAM-SHA-256, SCRAM-SHA-512]
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.principal.to.local.rules = [DEFAULT]
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism.controller.protocol = SCRAM-SHA-512
    sasl.mechanism.inter.broker.protocol = PLAIN
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    sasl.server.callback.handler.class = null
    sasl.server.max.receive.size = 524288
    security.inter.broker.protocol = PLAINTEXT
    security.providers = null
    server.max.startup.time.ms = 9223372036854775807
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    socket.listen.backlog.size = 50
    socket.receive.buffer.bytes = 102400
    socket.request.max.bytes = 104857600
    socket.send.buffer.bytes = 102400
    ssl.allow.dn.changes = false
    ssl.allow.san.changes = false
    ssl.cipher.suites = []
    ssl.client.auth = required
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = 
    ssl.engine.factory.class = null
    ssl.key.password = [hidden]
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = /opt/bitnami/kafka/config/certs/kafka.keystore.jks
    ssl.keystore.password = [hidden]
    ssl.keystore.type = JKS
    ssl.principal.mapping.rules = DEFAULT
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = /opt/bitnami/kafka/config/certs/kafka.truststore.jks
    ssl.truststore.password = [hidden]
    ssl.truststore.type = JKS
    telemetry.max.bytes = 1048576
    transaction.abort.timed.out.transaction.cleanup.interval.ms = 10000
    transaction.max.timeout.ms = 900000
    transaction.partition.verification.enable = true
    transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
    transaction.state.log.load.buffer.size = 5242880
    transaction.state.log.min.isr = 1
    transaction.state.log.num.partitions = 50
    transaction.state.log.replication.factor = 1
    transaction.state.log.segment.bytes = 104857600
    transactional.id.expiration.ms = 604800000
    unclean.leader.election.enable = false
    unstable.api.versions.enable = false
    unstable.metadata.versions.enable = false
    zookeeper.clientCnxnSocket = null
    zookeeper.connect = null
    zookeeper.connection.timeout.ms = null
    zookeeper.max.in.flight.requests = 10
    zookeeper.metadata.migration.enable = false
    zookeeper.metadata.migration.min.batch.size = 200
    zookeeper.session.timeout.ms = 18000
    zookeeper.set.acl = false
    zookeeper.ssl.cipher.suites = null
    zookeeper.ssl.client.enable = false
    zookeeper.ssl.crl.enable = false
    zookeeper.ssl.enabled.protocols = null
    zookeeper.ssl.endpoint.identification.algorithm = HTTPS
    zookeeper.ssl.keystore.location = null
    zookeeper.ssl.keystore.password = null
    zookeeper.ssl.keystore.type = null
    zookeeper.ssl.ocsp.enable = false
    zookeeper.ssl.protocol = TLSv1.2
    zookeeper.ssl.truststore.location = null
    zookeeper.ssl.truststore.password = null
    zookeeper.ssl.truststore.type = null
 (kafka.server.KafkaConfig)
javsalgar commented 4 months ago

Hi!

Normally a "TLS handshake error" has to do with incorrect certificates set or maybe untrusted certificates. How did you create the certs?

hznuyyh commented 4 months ago

Hi!

Normally a "TLS handshake error" has to do with incorrect certificates set or maybe untrusted certificates. How did you create the certs?

I used the script mentioned in Readme. kafka-generate-ssl.sh

I set up the cluster according to the steps in Readme , and the cluster can run normally now.

And when I used the demo provided by Sarma, I was able to connect correctly and complete Produce and Consume. But when I use Kafka-console-producer.sh, Kafka returns the above error

dgomezleon commented 4 months ago

Hi @hznuyyh ,

I can see 2 things in your docker-compose:

  1. There is a typo in the hostname --> hostname: kafak.example.com
  2. You are not using bitnami kafka.

If you are facing the issue with kafka image, you probably should ask for support. In case it only happens with bitnami image, please let us know.

You could also take a look at this post in case it helps.

hznuyyh commented 4 months ago

this post

Hi @dgomezleon Thank you for your reply

  1. I have fixed the hostname problem correctly
  2. I'm using a bitnami image, I've added a tag kafka:3.7,so it;s
    
    kafka:
    image: 'kafka:3.7'

image label => : "Labels": { "com.vmware.cp.artifact.flavor": "sha256:c50c90cfd9d12b445b011e6ad529f1ad3daea45c26d20b00732fae3cd71f6a83", "org.opencontainers.image.base.name": "docker.io/bitnami/minideb:bookworm", "org.opencontainers.image.created": "2024-05-04T09:25:23Z", "org.opencontainers.image.description": "Application packaged by VMware, Inc", "org.opencontainers.image.documentation": "https://github.com/bitnami/containers/tree/main/bitnami/kafka/README.md", "org.opencontainers.image.licenses": "Apache-2.0", "org.opencontainers.image.ref.name": "3.7.0-debian-12-r4", "org.opencontainers.image.source": "https://github.com/bitnami/containers/tree/main/bitnami/kafka", "org.opencontainers.image.title": "kafka", "org.opencontainers.image.vendor": "VMware, Inc.", "org.opencontainers.image.version": "3.7.0" },


I re-generated the certificate and tried again
I used

openssl s_client -debug -connect kafka.example.com:9092 -tls1_2

Came to check the certificate. It returned for me

Start Time: 1716953619 Timeout : 7200 (sec) Verify return code: 19 (self signed certificate in certificate chain) Extended master secret: yes

It seems that I used my own cert. I think it is caused by this problem.
I tried to add it *ssl.endpoint.identification.algorithm=* in consumer.properties

I have no name!@kafka:/$ cat /opt/bitnami/kafka/config/consumer.properties

Licensed to the Apache Software Foundation (ASF) under one or more

contributor license agreements. See the NOTICE file distributed with

this work for additional information regarding copyright ownership.

The ASF licenses this file to You under the Apache License, Version 2.0

(the "License"); you may not use this file except in compliance with

the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software

distributed under the License is distributed on an "AS IS" BASIS,

WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

See the License for the specific language governing permissions and

limitations under the License.

see org.apache.kafka.clients.consumer.ConsumerConfig for more details

list of brokers used for bootstrapping knowledge about the rest of the cluster

format: host1:port1,host2:port2 ...

bootstrap.servers=localhost:9092

consumer group id

group.id=test-consumer-group

What to do when there is no initial offset in Kafka or if the current

offset does not exist any more on the server: latest, earliest, none

auto.offset.reset=

ssl.keystore.type=JKS ssl.truststore.type=JKS ssl.key.password=IQ65KHcr4VsS0TLO ssl.keystore.location=/opt/bitnami/kafka/config/certs/kafka.keystore.jks ssl.truststore.location=/opt/bitnami/kafka/config/certs/kafka.truststore.jks ssl.keystore.password=IQ65KHcr4VsS0TLO ssl.truststore.password=IQ65KHcr4VsS0TLO ssl.endpoint.identification.algorithm=

But it did not take effect.
The kafka log is still returned by this command

I have no name!@kafka:/$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config /opt/bitnami/kafka/config/consumer.properties [2024-05-29 03:39:43,744] WARN [Consumer clientId=console-consumer, groupId=test-consumer-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2024-05-29 03:39:43,920] WARN [Consumer clientId=console-consumer, groupId=test-consumer-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2024-05-29 03:39:44,189] WARN [Consumer clientId=console-consumer, groupId=test-consumer-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2024-05-29 03:39:44,510] WARN [Consumer clientId=console-consumer, groupId=test-consumer-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) [2024-05-29 03:39:45,057] WARN [Consumer clientId=console-consumer, groupId=test-consumer-group] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

[2024-05-29 03:28:05,010] INFO [SocketServer listenerType=BROKER, nodeId=0] Failed authentication with /127.0.0.1 (channelId=127.0.0.1:9092-127.0.0.1:49458-5) (SSL handshake failed) (org.apache.kafka.common.network.Selector) [2024-05-29 03:28:06,024] INFO [SocketServer listenerType=BROKER, nodeId=0] Failed authentication with /127.0.0.1 (channelId=127.0.0.1:9092-127.0.0.1:49474-5) (SSL handshake failed) (org.apache.kafka.common.network.Selector) [2024-05-29 03:28:07,050] INFO [SocketServer listenerType=BROKER, nodeId=0] Failed authentication with /127.0.0.1 (channelId=127.0.0.1:9092-127.0.0.1:49482-5) (SSL handshake failed) (org.apache.kafka.common.network.Selector)


The steps for generating my certificate are as follows
1. bash kafka-generate-ssl.sh
2. Set *password* where you enter your password,
3. yes/no Type *y* when we need,
4. For other steps, enter *enter*,
5. common name and first/last name are set to *kafka.example.com*
dgomezleon commented 4 months ago

Hi @hznuyyh ,

I noticed that KAFKA_CLIENT_LISTENER_NAME=SASL_SSL is missing in docker-compose.yaml (we will update the README.md) with this. Also, note that KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=SCRAM-SHA-512 is duplicated (but this is not taking effect).

This docker-compose.yaml worked for me:

version: '2'

services:
  kafka:
    image: 'bitnami/kafka:3.7'
    hostname: kafka.example.com
    ports:
      - '9092:9092'
    environment:
      # KRaft
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka.example.com:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,CLIENT:SASL_SSL
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://:9092
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      # SSL
      - KAFKA_CERTIFICATE_PASSWORD=my_pass
      - KAFKA_TLS_TYPE=JKS
      - KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
      # SASL
      - KAFKA_CLIENT_USERS=user
      - KAFKA_CLIENT_PASSWORDS=password
      - KAFKA_CLIENT_LISTENER_NAME=SASL_SSL
      - KAFKA_CONTROLLER_USER=controller_user
      - KAFKA_CONTROLLER_PASSWORD=controller_password
      - KAFKA_INTER_BROKER_USER=controller_user
      - KAFKA_INTER_BROKER_PASSWORD=controller_password
    volumes:
      - './certs/kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro'
      - './certs/kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro'

I hope it helps

hznuyyh commented 4 months ago

Hi @hznuyyh ,

I noticed that KAFKA_CLIENT_LISTENER_NAME=SASL_SSL is missing in docker-compose.yaml (we will update the README.md) with this. Also, note that KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=SCRAM-SHA-512 is duplicated (but this is not taking effect).

This docker-compose.yaml worked for me:

version: '2'

services:
  kafka:
    image: 'bitnami/kafka:3.7'
    hostname: kafka.example.com
    ports:
      - '9092:9092'
    environment:
      # KRaft
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka.example.com:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,CLIENT:SASL_SSL
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://:9092
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      # SSL
      - KAFKA_CERTIFICATE_PASSWORD=my_pass
      - KAFKA_TLS_TYPE=JKS
      - KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
      # SASL
      - KAFKA_CLIENT_USERS=user
      - KAFKA_CLIENT_PASSWORDS=password
      - KAFKA_CLIENT_LISTENER_NAME=SASL_SSL
      - KAFKA_CONTROLLER_USER=controller_user
      - KAFKA_CONTROLLER_PASSWORD=controller_password
      - KAFKA_INTER_BROKER_USER=controller_user
      - KAFKA_INTER_BROKER_PASSWORD=controller_password
    volumes:
      - './certs/kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro'
      - './certs/kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro'

I hope it helps

@dgomezleon

Thank you very much. It worked. And this parameter must be specified when Kafka-console-consumer-sh is used

--bootstrap-server kafka.example.com:9092

I think this is related to the CommonName specified when the certificate was generated. Anyway, setting KAFKA_CLIENT_LISTENER_NAME=SASL_SSL is a valid practice

In addition to this configuration, KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=SCRAM-SHA-512 I don't quite understand what it does, it seems to work on port 9093, but when I set SCRAM-SHA-512 it doesn't work, and the Kafka cluster won't start properly, it has to be set to PLAIN

I see there are other issues41415 not sure if it is the same problem

dgomezleon commented 4 months ago

I'm glad it worked for you.

As my mate @migruiz4 mentioned here, SCRAM is not supported.

migruiz4 commented 4 months ago

Hi @hznuyyh,

As mentioned in #41415 , controller-to-controller communications do not support SCRAM at the moment.

This issue has been reported to the Kafka upstream project, so meanwhile only PLAIN mechanism can be used.

hznuyyh commented 4 months ago

@migruiz4 @dgomezleon Thank you very much. I understand. This Issue can be closed