bitnami / containers

Bitnami container images
https://bitnami.com
Other
3.35k stars 4.81k forks source link

[bitnami/kafka] Cannot enable ACL authorization at KRAFT cluster #23237

Closed levissfuture closed 1 year ago

levissfuture commented 1 year ago

Name and Version

bitnami/kafka:3.3.2-debian-11-r4

What steps will reproduce the bug?

  1. Docker Swarm, 3 node KRaft Cluster
  2. Config
    
    version: "3.9"

x-kafka-deploy: &kafka-deploy mode: replicated replicas: 1 update_config: parallelism: 1 order: stop-first failure_action: rollback delay: 10s rollback_config: parallelism: 1 order: stop-first restart_policy: condition: any delay: 10s window: 30s

x-kafka-service: &kafka-service networks:

x-kafka-evn: &kafka-env ALLOW_PLAINTEXT_LISTENER: "yes" BITNAMI_DEBUG: "true" KAFKA_ENABLE_KRAFT: "yes" KAFKA_KRAFT_CLUSTER_ID: "MFdlOAZlYmM9YzE2NDQxZA" KAFKA_CFG_PROCESS_ROLES: "broker,controller" KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER" KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,EXTERNAL:SASL_SSL,INTERNAL:PLAINTEXT" KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "INTERNAL" KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093" KAFKA_CFG_METADATA_LOG_DIR: "/bitnami/kafka/metadata/" KAFKA_CFG_LISTENERS: "CONTROLLER://:9093,EXTERNAL://:9092,INTERNAL://:29092" KAFKA_CFG_SECURITY_PROTOCOL: "SASL_SSL" KAFKA_CFG_SASL_ENABLED_MECHANISMS: "PLAIN" KAFKA_CFG_SASL_INTER_BROKER_PROTOCOL: "PLAIN" KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "PLAIN" KAFKA_CFG_SUPER_USERS: 'User:CN=user;' KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" KAFKA_CFG_AUTHORIZER_CLASS_NAME: "org.apache.kafka.metadata.authorizer.StandardAuthorizer" KAFKA_CFG_EARLY_START_LISTENERS: "CONTROLLER" KAFKA_CFG_SSL_KEYSTORE_TYPE: "PEM" KAFKA_TLS_TYPE: "PEM" KAFKA_TLS_CLIENT_AUTH: "requested"

services:

kafka-1: image: bitnami/kafka:3.3.2-debian-11-r4 deploy: <<: *kafka-deploy placement: constraints:

What is the expected behavior?

I expect the cluster to start and I will be able to create ACLs for topics.

What do you see instead?

The cluster does not start due to the error specified in paragraph 4

Additional information

I think the problem is that this option does not work: "KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"" In the bitnami/kafka:3.3.2-debian-11-r5 - same problem. In the bitnami/kafka:3.2.3-debian-11-r48 - everything works well.

javsalgar commented 1 year ago

Hi,

Could you enter one of the containers and check the rendered configuration? In order to do so, you could try overriding the entrypoint and cmd with a sleep and then run inside the container /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh

levissfuture commented 1 year ago

Do you mean check server.proprerties? Here it is:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
#

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings #############################

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=CONTROLLER://:9093,EXTERNAL://:9092,INTERNAL://:29092

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=INTERNAL://kafka-1:29092,EXTERNAL://kafka-01.domain.local:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT,EXTERNAL:SASL_SSL,INTERNAL:PLAINTEXT

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/bitnami/kafka/data

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
log.retention.bytes=-1

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

allow.everyone.if.no.acl.found=true
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
auto.create.topics.enable=True
auto.leader.rebalance.enable=True

broker.rack=DC1
compression.type=producer
controller.listener.names=CONTROLLER
controller.quorum.voters=1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
default.replication.factor=1
early.start.listeners=CONTROLLER
inter.broker.listener.name=INTERNAL
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10

log.roll.hours=168

max.partition.fetch.bytes=1048576
max.request.size=1048576
metadata.log.dir=/bitnami/kafka/metadata/
min.insync.replicas=1

process.roles=broker,controller
sasl.enabled.mechanisms=PLAIN
sasl.inter.broker.protocol=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
security.protocol=SASL_SSL
ssl.keystore.type=PEM
ssl.key.password=somepassword
super.users=User:CN=user;

ssl.truststore.type=PEM
ssl.keystore.key=keyinformation
dgomezleon commented 1 year ago

Hi @levissfuture ,

I can see allow.everyone.if.no.acl.found=true is set.

Did you try with other versions for branch 3.3.x? It seems to be still valid (docs)

artur9010 commented 1 year ago

Can confirm that issue happens on all 3.3.x releases and 3.4.0. On 3.2.3 everything works fine.

edit: set KAFKA_CFG_SUPER_USERS: "User:ANONYMOUS"

dgomezleon commented 1 year ago

Thanks for the feedback. I forwarded this to the team since we plan to work on Kraft support now that feature becomes production-ready. We will update this ticket when there are updates on this.

hamartin commented 1 year ago

I can confirm this issue with 3.4.0 using KRaft mode. The setup work perfectly fine until StandardAuthorizer is enabled and super.users are set with the CN of the hosts.

I tried 2 ways of getting the CN. 1: Looking in the logs and copy pasting the principal I found there. (Unescaped with spaces)

  1. running "openssl x509 -noout -in -subject" and copy pasting the escaped version of the cn from there (the file i used to populate the keystore and truststore).
xsten commented 1 year ago

I can confirm this issue with 3.4.0 using KRaft mode. The setup work perfectly fine until StandardAuthorizer is enabled and 2. running "openssl x509 -noout -in -subject" and copy pasting the escaped version of the cn from there (the file i used to populate the keystore and truststore).

Maybe trying with "openssl x509 -in /path/to/your/certificate.pem -noout -subject -nameopt RFC2253"

post-human-world commented 1 year ago

I got same error in kubernetes after i set authorizerClassName: "org.apache.kafka.metadata.authorizer.StandardAuthorizer". If i set it to empty there is no errors anymore.

image:
  registry: docker.io
  repository: bitnami/kafka
  tag: 3.4.1-debian-11-r4
  pullPolicy: IfNotPresent
  debug: true
authorizerClassName: "org.apache.kafka.metadata.authorizer.StandardAuthorizer"
allowEveryoneIfNoAclFound: true
auth:
  clientProtocol: tls
  externalClientProtocol: tls
  interBrokerProtocol: tls
  controllerProtocol: tls
  sasl:
    mechanisms: plain,scram-sha-256,scram-sha-512
    interBrokerMechanism: plain
    jaas:
      clientUsers:
        - user
      clientPasswords: []
      interBrokerUser: admin
      interBrokerPassword: ""
      zookeeperUser: ""
      zookeeperPassword: ""
      existingSecret: ""
  tls:
    type: pem
    pemChainIncluded: false
    existingSecrets: []
    autoGenerated: true
    endpointIdentificationAlgorithm: https
migruiz4 commented 1 year ago

Hi there!

I have been investigating this issue, and I think I have been able to narrow this down to 'controller_user having to be included in the Super.users list' In my case, I was able to fix the issue by setting Super.users to something similar to:

super.users=User:controller_user,User:admin;

Otherwise, the Controller process does not have permission to write the __cluster_metadata although allow.everyone.if.no.acl.found=true is set.

out-kafka-2-1  | [2023-06-21 08:59:40,044] ERROR [ControllerApis nodeId=2] Unexpected error handling request RequestHeader(apiKey=VOTE, apiVersion=0, clientId=raft-client-1, correlationId=68, headerVersion=2) -- VoteRequestData(clusterId='abcdefghijklmnopqrstug', topics=[TopicData(topicName='__cluster_metadata', partitions=[PartitionData(partitionIndex=0, candidateEpoch=1, candidateId=1, lastOffsetEpoch=0, lastOffset=0)])]) with context RequestContext(header=RequestHeader(apiKey=VOTE, apiVersion=0, clientId=raft-client-1, correlationId=68, headerVersion=2), connectionId='172.29.0.3:9093-172.29.0.4:54942-0', clientAddress=/172.29.0.4, principal=User:controller_user, listenerName=ListenerName(CONTROLLER), securityProtocol=SASL_SSL, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=3.4.1), fromPrivilegedListener=false, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@762e532c]) (kafka.server.ControllerApis)

This may be a bug with Kafka itself and not the container image.

I'm aware that the current bitnami/kafka does not have built-in support for Controller with SASL, this has been tested with a current WIP version of the image which includes some major changes and new features.

github-actions[bot] commented 1 year ago

This Issue has been automatically marked as "stale" because it has not had recent activity (for 15 days). It will be closed if no further activity occurs. Thanks for the feedback.

github-actions[bot] commented 1 year ago

Due to the lack of activity in the last 5 days since it was marked as "stale", we proceed to close this Issue. Do not hesitate to reopen it later if necessary.

post-human-world commented 1 year ago

It is not kafka issue. I switch to confluentinc kafka and i enabled Kraft ACL. I did not get any errors and cluster started successfully

migruiz4 commented 1 year ago

Hi @post-human-world,

Could you please provide more details? What properties is your Kafka server running?

Our image does not contain any logic related to ACL, env variables such as KAFKA_CFG_SUPER_USERS are directly mapped as Kafka server properties super.users=.

I don't have deep knowledge of Kafka, but using the following deployment with the following conditions:

version: "2"

services:
  kafka-0:
    image: bitnami/kafka:3.5
    ports:
      - "9092"
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
      - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
      # Listeners
      - KAFKA_CFG_LISTENERS=SASL://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=SASL://:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=SASL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL
      # SASL
      - KAFKA_CLIENT_USERS=user
      - KAFKA_CLIENT_PASSWORDS=password
      - KAFKA_CLIENT_LISTENER_NAME=SASL
      - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
      - KAFKA_CONTROLLER_USER=controller_user
      - KAFKA_CONTROLLER_PASSWORD=controller_password
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      - KAFKA_INTER_BROKER_USER=inter_broker_user
      - KAFKA_INTER_BROKER_PASSWORD=inter_broker_password
      # ACL
      - KAFKA_CFG_SUPER_USERS=User:user;
      - KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND="true"
      - KAFKA_CFG_AUTHORIZER_CLASS_NAME=org.apache.kafka.metadata.authorizer.StandardAuthorizer
      - KAFKA_CFG_EARLY_START_LISTENERS=CONTROLLER
    volumes:
      - kafka_0_data:/bitnami/kafka
  kafka-1:
    image: bitnami/kafka:3.5
    ports:
      - "9092"
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
      - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
      # Listeners
      - KAFKA_CFG_LISTENERS=SASL://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=SASL://:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=SASL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL
      # SASL
      - KAFKA_CLIENT_USERS=user
      - KAFKA_CLIENT_PASSWORDS=password
      - KAFKA_CLIENT_LISTENER_NAME=SASL
      - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
      - KAFKA_CONTROLLER_USER=controller_user
      - KAFKA_CONTROLLER_PASSWORD=controller_password
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      - KAFKA_INTER_BROKER_USER=inter_broker_user
      - KAFKA_INTER_BROKER_PASSWORD=inter_broker_password
      # ACL
      - KAFKA_CFG_SUPER_USERS=User:user;
      - KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND="true"
      - KAFKA_CFG_AUTHORIZER_CLASS_NAME=org.apache.kafka.metadata.authorizer.StandardAuthorizer
      - KAFKA_CFG_EARLY_START_LISTENERS=CONTROLLER
    volumes:
      - kafka_1_data:/bitnami/kafka
  kafka-2:
    image: bitnami/kafka:3.5
    ports:
      - "9092"
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=2
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
      - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
      # Listeners
      - KAFKA_CFG_LISTENERS=SASL://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=SASL://:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=SASL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL
      # SASL
      - KAFKA_CLIENT_USERS=user
      - KAFKA_CLIENT_PASSWORDS=password
      - KAFKA_CLIENT_LISTENER_NAME=SASL
      - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
      - KAFKA_CONTROLLER_USER=controller_user
      - KAFKA_CONTROLLER_PASSWORD=controller_password
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      - KAFKA_INTER_BROKER_USER=inter_broker_user
      - KAFKA_INTER_BROKER_PASSWORD=inter_broker_password
      # ACL
      - KAFKA_CFG_SUPER_USERS=User:user;
      - KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND="true"
      - KAFKA_CFG_AUTHORIZER_CLASS_NAME=org.apache.kafka.metadata.authorizer.StandardAuthorizer
      - KAFKA_CFG_EARLY_START_LISTENERS=CONTROLLER
    volumes:
      - kafka_2_data:/bitnami/kafka

volumes:
  kafka_0_data:
    driver: local
  kafka_1_data:
    driver: local
  kafka_2_data:
    driver: local

The Kafka KRaft cluster will fail with the following error:

[2023-08-01 07:06:22,946] ERROR [ControllerApis nodeId=0] Unexpected error handling request RequestHeader(apiKey=VOTE, apiVersion=0, clientId=raft-client-2, correlationId=2879, headerVersion=2) -- VoteRequestData(clusterId='abcdefghijklmnopqrstug', topics=[TopicData(topicName='__cluster_metadata', partitions=[PartitionData(partitionIndex=0, candidateEpoch=23, candidateId=2, lastOffsetEpoch=0, lastOffset=0)])]) with context RequestContext(header=RequestHeader(apiKey=VOTE, apiVersion=0, clientId=raft-client-2, correlationId=2879, headerVersion=2), connectionId='172.25.0.2:9093-172.25.0.4:53688-0', clientAddress=/172.25.0.4, principal=User:controller_user, listenerName=ListenerName(CONTROLLER), securityProtocol=SASL_PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=3.5.0), fromPrivilegedListener=false, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@68ee6739]) (kafka.server.ControllerApis)

But, as previously mentioned, by adding the Controller SASL user to the list of super.users the cluster is deployed successfully:

Docker compose with super.users=User:user;User:controller_user; ```yaml version: "2" services: kafka-0: image: bitnami/kafka:3.5 ports: - "9092" environment: # KRaft settings - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093 - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv # Listeners - KAFKA_CFG_LISTENERS=SASL://:9092,CONTROLLER://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=SASL://:9092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=SASL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL # SASL - KAFKA_CLIENT_USERS=user - KAFKA_CLIENT_PASSWORDS=password - KAFKA_CLIENT_LISTENER_NAME=SASL - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN - KAFKA_CONTROLLER_USER=controller_user - KAFKA_CONTROLLER_PASSWORD=controller_password - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN - KAFKA_INTER_BROKER_USER=inter_broker_user - KAFKA_INTER_BROKER_PASSWORD=inter_broker_password # ACL - KAFKA_CFG_SUPER_USERS=User:user;User:controller_user; - KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND="true" - KAFKA_CFG_AUTHORIZER_CLASS_NAME=org.apache.kafka.metadata.authorizer.StandardAuthorizer - KAFKA_CFG_EARLY_START_LISTENERS=CONTROLLER volumes: - kafka_0_data:/bitnami/kafka kafka-1: image: bitnami/kafka:3.5 ports: - "9092" environment: # KRaft settings - KAFKA_CFG_NODE_ID=1 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093 - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv # Listeners - KAFKA_CFG_LISTENERS=SASL://:9092,CONTROLLER://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=SASL://:9092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=SASL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL # SASL - KAFKA_CLIENT_USERS=user - KAFKA_CLIENT_PASSWORDS=password - KAFKA_CLIENT_LISTENER_NAME=SASL - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN - KAFKA_CONTROLLER_USER=controller_user - KAFKA_CONTROLLER_PASSWORD=controller_password - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN - KAFKA_INTER_BROKER_USER=inter_broker_user - KAFKA_INTER_BROKER_PASSWORD=inter_broker_password # ACL - KAFKA_CFG_SUPER_USERS=User:user;User:controller_user; - KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND="true" - KAFKA_CFG_AUTHORIZER_CLASS_NAME=org.apache.kafka.metadata.authorizer.StandardAuthorizer - KAFKA_CFG_EARLY_START_LISTENERS=CONTROLLER volumes: - kafka_1_data:/bitnami/kafka kafka-2: image: bitnami/kafka:3.5 ports: - "9092" environment: # KRaft settings - KAFKA_CFG_NODE_ID=2 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093 - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv # Listeners - KAFKA_CFG_LISTENERS=SASL://:9092,CONTROLLER://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=SASL://:9092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=SASL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL # SASL - KAFKA_CLIENT_USERS=user - KAFKA_CLIENT_PASSWORDS=password - KAFKA_CLIENT_LISTENER_NAME=SASL - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN - KAFKA_CONTROLLER_USER=controller_user - KAFKA_CONTROLLER_PASSWORD=controller_password - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN - KAFKA_INTER_BROKER_USER=inter_broker_user - KAFKA_INTER_BROKER_PASSWORD=inter_broker_password # ACL - KAFKA_CFG_SUPER_USERS=User:user;User:controller_user; - KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND="true" - KAFKA_CFG_AUTHORIZER_CLASS_NAME=org.apache.kafka.metadata.authorizer.StandardAuthorizer - KAFKA_CFG_EARLY_START_LISTENERS=CONTROLLER volumes: - kafka_2_data:/bitnami/kafka volumes: kafka_0_data: driver: local kafka_1_data: driver: local kafka_2_data: driver: local ```
post-human-world commented 1 year ago

Actually i did not fully use confluentinc kafka, i rewrite the startup script so i can pass some unique settings or file. It may a bit complicated, but in this way i avoided several bugs that will occur with default startup script.

I successfully run this manifest under minikube

Servers

3 Kafka + Kafka SVC + Kafka headless SVC ```yml apiVersion: v1 kind: Service metadata: name: kafka-headless namespace: kafka spec: clusterIP: None selector: app: kafka ports: - port: 9092 targetPort: 9092 --- apiVersion: v1 kind: Service metadata: name: kafka namespace: kafka spec: selector: app: kafka ports: - port: 9092 targetPort: 9092 --- apiVersion: apps/v1 kind: StatefulSet metadata: name: kafka namespace: kafka spec: selector: matchLabels: app: kafka serviceName: kafka-headless replicas: 3 template: metadata: labels: app: kafka spec: initContainers: - name: init-kafka-config image: confluentinc/cp-kafka:7.4.1 securityContext: runAsUser: 0 command: - bash - "-c" - | set -ex ordinal=${HOSTNAME##*-} cp /etc/kafkaconfig/* /etc/kafka/kraft/ chown -R appuser:appuser /etc/kafka/kraft/ chmod -R 777 /etc/kafka/kraft/ cp /etc/kafkasecret/* /etc/kafka/certs/ chown -R appuser:appuser /etc/kafka/certs/ chmod -R 777 /etc/kafka/certs/ FILENAME=/etc/kafka/kraft/server.properties # this is for initcontainer function set_id() { # replace if exist, otherwise add to the tail if grep -P "node.id=\d+" $FILENAME then replace_string="node.id=$ordinal" sed -i -E "s/node.id=[0-9]+/$replace_string/" $FILENAME else echo "node.id=$ordinal" >> $FILENAME fi echo "node.id=$ordinal" } set_id volumeMounts: - name: config mountPath: /etc/kafka/kraft/ - name: certs mountPath: /etc/kafka/certs/ - name: kafkaconfig mountPath: /etc/kafkaconfig/ readOnly: true - name: kafkasecret mountPath: /etc/kafkasecret/ readOnly: true containers: - name: kafka image: confluentinc/cp-kafka:7.4.1 # kafka-3.5 command: - bash - "-c" - | FILENAME=/etc/kafka/kraft/server.properties # this is for kafka container function set_kafka_config() { # test with `export KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER` printenv | grep KAFKA | while read -r line ; do IFS='=' read -r first second <<< "$line" first=$(echo ${first:6} | awk '{print tolower($0)}') first=${first//_/\.} second=${second//\//\\\/} replace_string="${first//_/\.}=$second" if grep -Pq "$first=.+" $FILENAME; then sed -i -E "s/^$first=.+/$replace_string/" $FILENAME else echo "$replace_string" >> $FILENAME fi done } set_kafka_config /usr/bin/kafka-storage format -t $CLUSTER_ID -c /etc/kafka/kraft/server.properties /usr/bin/kafka-server-start /etc/kafka/kraft/server.properties ports: - containerPort: 9092 env: - name: POD_NAME valueFrom: fieldRef: fieldPath: metadata.name - name: KAFKA_PROCESS_ROLES value: broker,controller - name: KAFKA_LISTENERS value: BROKER://:9092,CONTROLLER://:9093,INTER_BROKER://:9094 - name: KAFKA_ADVERTISED_LISTENERS value: BROKER://:9092,INTER_BROKER://:9094 - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP value: BROKER:SASL_SSL,INTER_BROKER:SASL_SSL,CONTROLLER:SASL_SSL - name: KAFKA_CONTROLLER_QUORUM_VOTERS value: 0@kafka-0.kafka-headless.kafka.svc.cluster.local:9093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:9093,2@kafka-2.kafka-headless.kafka.svc.cluster.local:9093 - name: KAFKA_INTER_BROKER_LISTENER_NAME value: INTER_BROKER - name: KAFKA_CONTROLLER_LISTENER_NAMES value: CONTROLLER - name: CLUSTER_ID value: MkU3OEVBNTcwNTJENDM2Qk - name: KAFKA_JMX_PORT value: "9101" - name: KAFKA_JMX_HOSTNAME value: localhost - name: KAFKA_OPTS value: -Djava.security.auth.login.config=/etc/kafka/kraft/kafka_jaas.conf - name: KAFKA_LOG_DIRS value: /var/log/kafka/ resources: limits: cpu: 1500m memory: 2Gi volumeMounts: - name: config mountPath: /etc/kafka/kraft/ - name: certs mountPath: /etc/kafka/certs/ volumes: - name: config emptyDir: {} - name: certs emptyDir: {} - name: kafkaconfig configMap: name: kafka-config - name: kafkasecret secret: secretName: kafka-secret ```

Configs

Kafka configuration which contain server.properties, tls, jaas. ```yml apiVersion: v1 kind: ConfigMap metadata: name: kafka-config namespace: kafka data: server.properties: | # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # This configuration file is intended for use in KRaft mode, where # Apache ZooKeeper is not present. See config/kraft/README.md for details. # ############################# Server Basics ############################# # The role of this server. Setting this puts us in KRaft mode process.roles=broker,controller # The node id associated with this instance's roles node.id=999 # The connect string for the controller quorum controller.quorum.voters=0@kafka-0.kafka-headless.kafka.svc.cluster.local:9093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:9093 ############################# Socket Server Settings ############################# # The address the socket server listens on. # Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum. # If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(), # with PLAINTEXT listener name, and port 9092. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://:9092,CONTROLLER://:9093 # Listener name, hostname and port the broker will advertise to clients. # If not set, it uses the value for "listeners". advertised.listeners=PLAINTEXT:\kafka-0.kafka-headless.kafka.svc.cluster.local:9092 # A comma-separated list of the names of the listeners used by the controller. # If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol # This is required if running in KRaft mode. controller.listener.names=CONTROLLER # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 # The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=/tmp/kraft-combined-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Group Coordinator Settings ############################# # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. # group.initial.rebalance.delay.ms=0 message.max.bytes=1000012 auto.create.topics.enable=true delete.topic.enable=false default.replication.factor=1 inter.broker.listener.name=BROKER # remember to configure protocol correctly, otherwise it will raise `No serviceName defined in either JAAS or Kafka config` error # Kraft does not support `SCRAM-SHA-256`, if you insist it you have to choose zookeeper sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256 sasl.mechanism.inter.broker.protocol=PLAIN sasl.mechanism.controller.protocol=PLAIN ############################# ACL ############################# allow.everyone.if.no.acl.found=false authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer super.users=User:admin;User:interbroker;User:controller;User:superuser ############################# SSL ############################# # Server host name verification may be disabled by setting ssl.endpoint.identification.algorithm to an empty string. ssl.endpoint.identification.algorithm=https ssl.client.auth=requested ssl.keystore.type=PEM ssl.truststore.type=PEM ssl.truststore.location=/etc/kafka/certs/truststore.pem ssl.keystore.location=/etc/kafka/certs/keystore.pem ############################# sasl jaas config ############################# # listener.name...sasl.jaas.config listener.name.inter_broker.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="interbroker" password="k3o2hAtv1B5Q4KPP" user_interbroker="k3o2hAtv1B5Q4KPP"; listener.name.controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="controller" password="ePMgJmv34x3KufGB" user_controller="ePMgJmv34x3KufGB"; ############################# My Config ############################# # The KafkaServer section are used by the broker to initiate connections to other brokers. # KafkaClient configuration is for Kafka clients, Server side does not need this part. kafka_jaas.conf: | KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="zuMCS0l0m8LPMr8m" user_interbroker="k3o2hAtv1B5Q4KPP" user_controller="ePMgJmv34x3KufGB" user_superuser="lr6bVW5oA4dm65AK" user_tom="AlhUFaGt3o8GTz9f" user_kelvin="7XBGNp8ePke4o9NF"; }; KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="superuser" password="lr6bVW5oA4dm65AK" user_tom="AlhUFaGt3o8GTz9f" user_kelvin="7XBGNp8ePke4o9NF"; }; superuser.conf: | ssl.keystore.type=PEM ssl.truststore.type=PEM ssl.truststore.location=/etc/kafka/certs/truststore.pem ssl.keystore.location=/etc/kafka/certs/keystore.pem sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="superuser" \ password="lr6bVW5oA4dm65AK"; security.protocol=SASL_SSL sasl.mechanism=PLAIN --- apiVersion: v1 kind: Secret metadata: name: kafka-secret namespace: kafka type: Opaque stringData: truststore.pem: | -----BEGIN CERTIFICATE----- MIIDbzCCAlegAwIBAgIUE+eSW2VLLNJptIdBq5wRic9kAQwwDQYJKoZIhvcNAQEL BQAwRzEgMB4GA1UEAwwXa2Fma2Euc3ZjLmNsdXN0ZXIubG9jYWwxCzAJBgNVBAYT AlVTMRYwFAYDVQQHDA1TYW4gRnJhbnNpc2NvMB4XDTIzMDcyMzEyMzg0NFoXDTMz MDcyMDEyMzg0NFowRzEgMB4GA1UEAwwXa2Fma2Euc3ZjLmNsdXN0ZXIubG9jYWwx CzAJBgNVBAYTAlVTMRYwFAYDVQQHDA1TYW4gRnJhbnNpc2NvMIIBIjANBgkqhkiG 9w0BAQEFAAOCAQ8AMIIBCgKCAQEAxFUxSvK+Es5JeHjUBr+wMPszfC34dka2cAXu xxvld35D2hv6BEj+163L8udbwFmZbAXRbhxii6MuIZmCtF513AKlL4vE1HnE3fl3 aw6MArbJUwxKs4m+L+WJZwdN7HMBIIdhYGy5VuJSCMfDYZwODYfwpC49Kr2SImsf HWT0tTq/XNZVE4CG2HgAjtA2jKx8jz8DlXE/gS+Q5r8bXRlFjy9xat/k1iygvlWM JWBydMD7IvbSGmFroxgjTzj6INl8AdNCE0JELkGtPdn0+Afgwv/FQz55HerqYAUK v/JvJidi5sP8VhqcEhCiDKBLsO9ZLLdkVd19mX5C32hkcva4EwIDAQABo1MwUTAd BgNVHQ4EFgQUQgeSajUubkJvOtACE3LRXPzImY4wHwYDVR0jBBgwFoAUQgeSajUu bkJvOtACE3LRXPzImY4wDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOC AQEAVGYePgBeE7TSMl0KmioyfxFtA/1D8KMCwjPcEtd+28iDXBvFu1ZKF9SMVLv0 9m1EvsLS/TbLdjzrgK3A+8msJqNYR783JGi5DNzvHcXawVuY9Rs/2jt+/+JRZuEZ p3ZpDsoJhiEIkjTu3oIc06hl7OOT3MYEO8K22cr3rhz6x9GlREQiinfJn8ZwifXX uRY2gr1DGPeGD8JA+wHHo9R7Vl+c2/34+AIzbNJn1CBc+Oc0slWMtBdnEx3EAf8b +2NaFDXZKhi+EtRyZLEWsOJiOIV3cNDbqIC/eHQI8+7GaaTymGtZ4WLtktun/XLn 2RYsrrv9a5QLzK7dCFDit57NxA== -----END CERTIFICATE----- # server private key(PKCS8) + server cert + all intermediate CA + root CA keystore.pem: | -----BEGIN PRIVATE KEY----- MIIEwAIBADANBgkqhkiG9w0BAQEFAASCBKowggSmAgEAAoIBAQDMly3jcfbpzRaF BB1WO5l43AnJHt+0zWwM6spK8T+dx/eLvFDW7WLRmpd7Cz7zKGRmW0gG38bNb5mj 2Cbf7r26Ma6I5l1XLS62iQoRH9AnTf/ZFgG5y4p0evmPwuyHnRlEfLxOTRui9M/x VqjQ19OMo/6DMf6KRFHIU0/6KI/AxBI7lcNmDdhukKM9QyUAr04Cnrr0eNBswz7z ZpBMyJpADhM4Z/M6nzDFTrI/XIdS1fryt+NhtYz0A06udLn6JrykRvj1UL2Qo4PE c1thUmvon4ByMChjk1ONd4Y8As7hKoR67mD9yf9jI16EhA6a5riV/capK3am+2Q3 IJqvnO8vAgMBAAECggEBALSeWxLLUBMEznbMNSImJ829ZJ48NX8nQ9b7iXA0H9ep 7G9e53d3x6AIfOdR/FcrN8ftml8HPDmt5tPxRhC4cqNorQ+LYKhP6UeZIFwR2/B3 OgJrHUbr3OYMkzpJ+guXs/D0tkHibR2mR9ZmjU8i2070kQM+JdvniNP0SU0udgrU +LQVD6nYZKs2dtAs4mWT2O8/34Q1o1Hf61tdw79iRFSkbTmFnf7+R47D0tb1+zTJ 448mln9xg1dQS4BUXDVv9yp5Yap5XXhe6XEd0aMIy4XJIUyHh+0FYbYLmSPxaTCD BBgHjZNHX7azIg+5wQhjm0iYxNDSwtz16PvmP3pekCECgYEA/NBAr2YMFDEHvaI0 fhBl+okpqFCormiYxi4NRA7zlZFDXfMpdbW/Ata66TQphI93OP8+ec8z+X8GUZ5t vb9l7ToqQFRMdbsOgtwg0x1X8xXNNHQJ1NCyoRSPf7hMyI4e15SylQXSD9LcFU2Y qiXzl2vU13F+/Ogkfl+WjqNQmMcCgYEAzytTpUq4i6s5nchHO0/ducsh4Kr1Rnfv aApFSygmwFF7YkzVQdQBJjg5NwlWWFmHO4kCHf/iAv1CbMyhyqyyAvfiNZcDM4SA 5hyiMuJZ/0bHeCt0RwpvJjAJK2QwA/ZE5pwLAzg8hZ65vSUQAdubaIq2vw2EjJPx lmBjMVhLnlkCgYEAhv7g+4ffPUztGDqsEwDIo55oayMe33qk0XY3yTa+pQgbU+iL I/q3k1l8FxiubsV44SuSfdLBqyFEC4MwkFjKoYEknJMm/Nq0pfXLZU/O7dOfbrnj ogsKx27GLWoJPjJ9j5osPd0MzTxIHuv/cCOxzhJbZeuJ5pxDGGQMbgARAnMCgYEA oU7WGWzd1vqiqSVpuVgNh/4PydSzmIQIaFUe8U85JSQM01SJ9HCz2GOMwTrG/ZHw Rz395Qf5nmQ+VWLcU2g2TzNd8mY8Ot/gUcxXyxEODEO7Suli53HLa8uZSC0BagRW T82OkZo3Hb+iVhFGrnulPThM5OQMoj0LrhAtDB34+zkCgYEAtcz3Tlw5W8TBEqOh pCI8t8bLg5tM0NEczjHcOQhMhgF4hbotyX2t0CHxst8N75PZ77A9wpGQqOavVeSu aEyKz8chgUejGnKCQ5aLTLzU36zditjtRRzxq0TWeEKcx6k8ElKhRlcWYP+BHwSd yO3HqLtkxm4TTPddFYA9UcDXQsE= -----END PRIVATE KEY----- -----BEGIN CERTIFICATE----- MIID9TCCAt2gAwIBAgIUa5JWI7SZLXYy9GSs7iqnpn81fcAwDQYJKoZIhvcNAQEL BQAwRzEgMB4GA1UEAwwXa2Fma2Euc3ZjLmNsdXN0ZXIubG9jYWwxCzAJBgNVBAYT AlVTMRYwFAYDVQQHDA1TYW4gRnJhbnNpc2NvMB4XDTIzMDcyMzEyMzk0MFoXDTMz MDcyMDEyMzk0MFowgZExCzAJBgNVBAYTAlVTMRMwEQYDVQQIDApDYWxpZm9ybmlh MRYwFAYDVQQHDA1TYW4gRnJhbnNpc2NvMQ4wDAYDVQQKDAVLYWZrYTESMBAGA1UE CwwJS2Fma2EgRGV2MTEwLwYDVQQDDCgqLmthZmthLWhlYWRsZXNzLmthZmthLnN2 Yy5jbHVzdGVyLmxvY2FsMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA zJct43H26c0WhQQdVjuZeNwJyR7ftM1sDOrKSvE/ncf3i7xQ1u1i0ZqXews+8yhk ZltIBt/GzW+Zo9gm3+69ujGuiOZdVy0utokKER/QJ03/2RYBucuKdHr5j8Lsh50Z RHy8Tk0bovTP8Vao0NfTjKP+gzH+ikRRyFNP+iiPwMQSO5XDZg3YbpCjPUMlAK9O Ap669HjQbMM+82aQTMiaQA4TOGfzOp8wxU6yP1yHUtX68rfjYbWM9ANOrnS5+ia8 pEb49VC9kKODxHNbYVJr6J+AcjAoY5NTjXeGPALO4SqEeu5g/cn/YyNehIQOmua4 lf3GqSt2pvtkNyCar5zvLwIDAQABo4GNMIGKMB8GA1UdIwQYMBaAFEIHkmo1Lm5C bzrQAhNy0Vz8yJmOMAkGA1UdEwQCMAAwCwYDVR0PBAQDAgTwME8GA1UdEQRIMEaC KCoua2Fma2EtaGVhZGxlc3Mua2Fma2Euc3ZjLmNsdXN0ZXIubG9jYWyCGioua2Fm a2EtaGVhZGxlc3Mua2Fma2Euc3ZjMA0GCSqGSIb3DQEBCwUAA4IBAQCbdtadAxrq +RA+e21VRe2lsxmUJdpMDW6dDVO00rsuB7Rjq7uax9Bgl8x5unUxZNUIylwkYgYe TMVAFzjldEpAPO3JDRzcTGy8cPokdu8HaHWaDkyib5kQOF5mQlHw1SUqW8xSSTi+ CGnfiksVU6mdy3BILfuwrQvM+YQFyLWBiYHd3+vToo2KxFf+SueVekUjaxtUmTJe 4sgRn5MjTOyPazmuyC3k2ARhAXbZfbubQl7/ndW6HZSPOoqgY/kxHlyieYK5sqrb 7iXXjiFAepKx6cMSf6lP0T8Xlf9lvWwKDvqEXGyZ9uB6sa47bsC5rVO7bKfQ9i+K Slk/3B7Y2Lqj -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIDbzCCAlegAwIBAgIUE+eSW2VLLNJptIdBq5wRic9kAQwwDQYJKoZIhvcNAQEL BQAwRzEgMB4GA1UEAwwXa2Fma2Euc3ZjLmNsdXN0ZXIubG9jYWwxCzAJBgNVBAYT AlVTMRYwFAYDVQQHDA1TYW4gRnJhbnNpc2NvMB4XDTIzMDcyMzEyMzg0NFoXDTMz MDcyMDEyMzg0NFowRzEgMB4GA1UEAwwXa2Fma2Euc3ZjLmNsdXN0ZXIubG9jYWwx CzAJBgNVBAYTAlVTMRYwFAYDVQQHDA1TYW4gRnJhbnNpc2NvMIIBIjANBgkqhkiG 9w0BAQEFAAOCAQ8AMIIBCgKCAQEAxFUxSvK+Es5JeHjUBr+wMPszfC34dka2cAXu xxvld35D2hv6BEj+163L8udbwFmZbAXRbhxii6MuIZmCtF513AKlL4vE1HnE3fl3 aw6MArbJUwxKs4m+L+WJZwdN7HMBIIdhYGy5VuJSCMfDYZwODYfwpC49Kr2SImsf HWT0tTq/XNZVE4CG2HgAjtA2jKx8jz8DlXE/gS+Q5r8bXRlFjy9xat/k1iygvlWM JWBydMD7IvbSGmFroxgjTzj6INl8AdNCE0JELkGtPdn0+Afgwv/FQz55HerqYAUK v/JvJidi5sP8VhqcEhCiDKBLsO9ZLLdkVd19mX5C32hkcva4EwIDAQABo1MwUTAd BgNVHQ4EFgQUQgeSajUubkJvOtACE3LRXPzImY4wHwYDVR0jBBgwFoAUQgeSajUu bkJvOtACE3LRXPzImY4wDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOC AQEAVGYePgBeE7TSMl0KmioyfxFtA/1D8KMCwjPcEtd+28iDXBvFu1ZKF9SMVLv0 9m1EvsLS/TbLdjzrgK3A+8msJqNYR783JGi5DNzvHcXawVuY9Rs/2jt+/+JRZuEZ p3ZpDsoJhiEIkjTu3oIc06hl7OOT3MYEO8K22cr3rhz6x9GlREQiinfJn8ZwifXX uRY2gr1DGPeGD8JA+wHHo9R7Vl+c2/34+AIzbNJn1CBc+Oc0slWMtBdnEx3EAf8b +2NaFDXZKhi+EtRyZLEWsOJiOIV3cNDbqIC/eHQI8+7GaaTymGtZ4WLtktun/XLn 2RYsrrv9a5QLzK7dCFDit57NxA== -----END CERTIFICATE----- ```

@migruiz4 This is almost my full test example, you can try it.

migruiz4 commented 1 year ago

Hi @post-human-world,

But in your server.properties file you have added the controller user to the list of super.users, which is the fix I suggested to OP:

super.users=User:admin;User:interbroker;User:controller;User:superuser

I mentioned it could be a Kafka bug because documentation does not mention it:

Therefore, we can not ensure if this is a workaround for a bug and later versions of Kafka may make Controller listener privileged, as fromPrivilegedListener=false may suggest, or adding the controller user to the list of super.users is a permanent KRaft+ACL requirement.

post-human-world commented 1 year ago

Configuring super.users is not enough to make them pass authentication. listener.name.inter_broker.plain.sasl.jaas.config and listener.name.controller.plain.sasl.jaas.config is necessary. They can use same username to do authentication e.g. all of them use admin

https://kafka.apache.org/documentation/#security_jaas_broker

migruiz4 commented 1 year ago

Our images configure JAAS based on the input env variables KAFKA_USERS/KAFKA_PASSWORDS, KAFKA_INTER_BROKER_USER/KAFKA_INTER_BROKER_PASSWORD, and KAFKA_CONTROLLER_USER/KAFKA_CONTROLLER_PASSWORD.

In previous versions, our images created a kafka_jaas.conf file containing the JAAS configuration, but in the latest release we switched to a listener.name.<name>.<mechanism>.sasl.jaas.config approach.

github-actions[bot] commented 1 year ago

This Issue has been automatically marked as "stale" because it has not had recent activity (for 15 days). It will be closed if no further activity occurs. Thanks for the feedback.

github-actions[bot] commented 1 year ago

Due to the lack of activity in the last 5 days since it was marked as "stale", we proceed to close this Issue. Do not hesitate to reopen it later if necessary.

thejass10 commented 1 year ago

Using Bitnami Kafka Helm 23.0, Kafka 3.5.0

I have set KAFKA_CFG_AUTHORIZER_CLASS_NAME=org.apache.kafka.metadata.authorizer.StandardAuthorizer KAFKA_CFG_SUPER_USERS=User:user;User:controller_user;User:ANONYMOUS

I have added one user "test_producer" to a topic "test-acl" with "Read" permission but still am able to write to the "test-acl" topic with test_producer user and also its working for any user.

Any fix for this ? Not seeing any error in the broker logs

carrodher commented 1 year ago

Could you please create a separate issue for that describing your specific use case? Thanks

EDIT: Nevermind, I just saw https://github.com/bitnami/charts/issues/18997

sean-ias commented 4 months ago

Hi everyone, if you guys are trying to deploy confluent based kafka (7.6.1 - latest) with ACL SASL PLAINTEXT, here is the working solution (works for the bitnami too, just modify particular env vars to CFG type):

kafka1:
    image: confluentinc/cp-kafka:${CONFLUENT_VERSION}
    hostname: kafka1
    container_name: kafka1
    user: 0:0 # this is for volume binding permissions cause by default /var/docker_data is not writable
    ports:
      - "9094:9094"
      - "9092:9092"
      - "9997:9997"
    volumes:
      - "/var/docker_data/kafka1_data:/var/lib/kafka/data"
      - "./etc/secrets/:/etc/kafka/jaas/"
    environment:
    # KRaft settings
      TZ: "Asia/Tashkent"
      KAFKA_NODE_ID: 1
      CLUSTER_ID: 'ciWo7IWazngRchmPES6q5A=='
      KAFKA_KRAFT_MODE: "true"
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka1:9093
    # Listeners
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: 'SASL_PLAINTEXT'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:SASL_PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_HOST:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT # when I was having errors my Controller was in PLAINTEXT -> converted it to SASL_PLAINTEXT
      KAFKA_LISTENERS: CONTROLLER://kafka1:9093,SASL_PLAINTEXT://kafka1:29092,SASL_HOST://:9092,EXTERNAL://:9094
      KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://kafka1:29092,SASL_HOST://172.23.0.3:9092,EXTERNAL://192.168.100.161:9094 # here 172.23.0.3 is my broker container IP
      KAFKA_JMX_PORT: 9997
      KAFKA_JMX_HOSTNAME: localhost
    # SASL
      KAFKA_AUTHORIZER_CLASS_NAME: org.apache.kafka.metadata.authorizer.StandardAuthorizer
      KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN'
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: 'PLAIN'
      KAFKA_SASL_MECHANISM_CONTROLLER_PROTOCOL: 'PLAIN' # added this line
      KAFKA_SECURITY_PROTOCOL: 'SASL_PLAINTEXT'
    # ACL
      KAFKA_SUPER_USERS: User:admin,User:controller;User:ANONYMOUS # added two last users; here though I am not sure if user:anonymous is a necessary or not, just left it here, cause in error logs I saw some ANONYMOUS user issues too : )
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' # added this line
      KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas/kafka_server_jaas.conf"
      KAFKA_EARLY_START_LISTENERS: CONTROLLER # added this line; not sure if it is also necessary
    # SETTINGS
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
    #  KAFKA_HEAP_OPTS: ${KAFKA_BROKER_HEAP_OPTS}
    # deploy:
    #   resources:
    #     limits:
    #       memory: ${KAFKA_BROKER_MEM_LIMIT}
# .env file to pass $ variables:
# Sets environment variables used in docker-compose.yml

# Set to specific version needed
CONFLUENT_VERSION=7.6.1

# Limit JVM Heap Size
KAFKA_BROKER_HEAP_OPTS="-XX:MaxRAMPercentage=70.0"

# Limit container resources
KAFKA_BROKER_MEM_LIMIT=512m
# ./etc/secrets/kafka_server_jaas.conf file:
KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin159"
   user_controller="controller159"
   user_admin="admin159";
};

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    user_admin="admin159";
};