Open soenkeliebau opened 8 months ago
Below is a draft custom resource for a Kafka cluster, using overrides. a few things to note:
docker.stackable.tech/apoc/stackable/kafka:3.7.1-stackable0.0.0-cyrus-sasl-gssapi
adds (microdnf install) cyrus-sasl-gssapi
to the kafka image/stackable/kcat -V
can be used if we just want the pods to come up, but then the full command can be successfully executed when shelling into the kcat container!args
needs to be overridden to be able to change listener.security.protocol.map
Server kafka/172.19.0.5@CLUSTER.LOCAL not found in Kerberos database
, i.e. the advertised listener needs to be kerberos-ized as well.~
kafka.stackable-products.svc.cluster.local:9093
as shown below scope: pod
for multiple brokers---
apiVersion: zookeeper.stackable.tech/v1alpha1
kind: ZookeeperZnode
metadata:
name: kafka-znode
namespace: stackable-products
spec:
clusterRef:
name: zookeeper
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: kafka
namespace: stackable-products
spec:
image:
productVersion: 3.7.1
repo: docker.stackable.tech/apoc/stackable
pullPolicy: IfNotPresent
clusterConfig:
tls:
serverSecretClass: tls
zookeeperConfigMapName: kafka-znode
brokers:
config:
logging:
enableVectorAgent: False
resources:
memory:
limit: '1.3Gi'
roleGroups:
default:
replicas: 1
envOverrides:
KRB5_CONFIG: "/etc/krb5.conf"
configOverrides:
server.properties:
sasl.enabled.mechanisms: "GSSAPI"
sasl.kerberos.service.name : "kafka"
sasl.mechanism.inter.broker.protocol: "GSSAPI"
podOverrides:
spec:
containers:
- name: kafka
volumeMounts:
- name: kerberos
mountPath: /stackable/kerberos
- name: kerberos
mountPath: /etc/krb5.conf
subPath: krb5.conf
args:
- |2
prepare_signal_handlers()
{
unset term_child_pid
unset term_kill_needed
trap 'handle_term_signal' TERM
}
handle_term_signal()
{
if [ "${term_child_pid}" ]; then
kill -TERM "${term_child_pid}" 2>/dev/null
else
term_kill_needed="yes"
fi
}
wait_for_termination()
{
set +e
term_child_pid=$1
if [[ -v term_kill_needed ]]; then
kill -TERM "${term_child_pid}" 2>/dev/null
fi
wait ${term_child_pid} 2>/dev/null
trap - TERM
wait ${term_child_pid} 2>/dev/null
set -e
}
rm -f /stackable/log/_vector/shutdown
prepare_signal_handlers
bin/kafka-server-start.sh \
/stackable/config/server.properties \
--override "zookeeper.connect=$ZOOKEEPER" \
--override "listeners=CLIENT://0.0.0.0:9093,INTERNAL://0.0.0.0:19093" \
--override "advertised.listeners=CLIENT://$POD_NAME.kafka-broker-default.stackable-products.svc.cluster.local:9093,INTERNAL://$POD_NAME.kafka-broker-default.stackable-products.svc.cluster.local:19093" \
--override "listener.security.protocol.map=CLIENT:SASL_SSL,INTERNAL:SSL" \
--override "listener.name.client.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/stackable/kerberos/keytab\" principal=\"kafka/$POD_NAME.kafka-broker-default.stackable-products.svc.cluster.local@CLUSTER.LOCAL\";" &
wait_for_termination $!
mkdir -p /stackable/log/_vector && touch /stackable/log/_vector/shutdown
command:
- /bin/bash
- -x
- -euo
- pipefail
- -c
- name: kcat-prober
env:
- name: KRB5_CONFIG
value: /etc/krb5.conf
volumeMounts:
- name: kerberos
mountPath: /stackable/kerberos
- name: kerberos
mountPath: /etc/krb5.conf
subPath: krb5.conf
readinessProbe:
exec:
command:
- /stackable/kcat
- -V
volumes:
- name: kerberos
ephemeral:
volumeClaimTemplate:
metadata:
annotations:
secrets.stackable.tech/class: kerberos
secrets.stackable.tech/scope: pod
secrets.stackable.tech/kerberos.service.names: kafka
spec:
storageClassName: secrets.stackable.tech
accessModes:
- ReadWriteOnce
resources:
requests:
storage: "1"
Herre is a sample client job:
---
apiVersion: batch/v1
kind: Job
metadata:
name: access-kafka
namespace: stackable-products
spec:
template:
spec:
containers:
- name: access-kafka
image: docker.stackable.tech/stackable/kafka:3.7.1-stackable0.0.0-cyrus-sasl-gssapi
command:
- /bin/bash
- /tmp/script/script.sh
env:
- name: KRB5_CONFIG
value: /etc/krb5.conf
volumeMounts:
- name: script
mountPath: /tmp/script
- mountPath: /stackable/tls_keystore_internal
name: tls-keystore-internal
- mountPath: /stackable/tls_keystore_server
name: tls-keystore-server
- mountPath: /stackable/tls_cert_server_mount
name: tls-cert-server-mount
- name: config-emptydir
mountPath: /stackable/conf/hbase
- name: kerberos
mountPath: /stackable/kerberos
- name: kerberos
mountPath: /etc/krb5.conf
subPath: krb5.conf
volumes:
- name: script
configMap:
name: access-kafka-script
- name: tls-keystore-server
ephemeral:
volumeClaimTemplate:
metadata:
annotations:
secrets.stackable.tech/class: tls
secrets.stackable.tech/format: tls-pkcs12
secrets.stackable.tech/scope: pod,node
spec:
storageClassName: secrets.stackable.tech
accessModes:
- ReadWriteOnce
resources:
requests:
storage: "1"
- name: tls-keystore-internal
ephemeral:
volumeClaimTemplate:
metadata:
annotations:
secrets.stackable.tech/class: tls
secrets.stackable.tech/format: tls-pkcs12
secrets.stackable.tech/scope: pod,node
spec:
storageClassName: secrets.stackable.tech
accessModes:
- ReadWriteOnce
resources:
requests:
storage: "1"
- name: config-emptydir
emptyDir: {}
- name: kerberos
ephemeral:
volumeClaimTemplate:
metadata:
annotations:
secrets.stackable.tech/class: kerberos
secrets.stackable.tech/scope: service=access-kafka
secrets.stackable.tech/kerberos.service.names: admin
spec:
storageClassName: secrets.stackable.tech
accessModes:
- ReadWriteOnce
resources:
requests:
storage: "1"
- name: tls-cert-server-mount
ephemeral:
volumeClaimTemplate:
metadata:
annotations:
secrets.stackable.tech/class: tls
secrets.stackable.tech/scope: pod,node,service=kafka
creationTimestamp: null
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: "1"
storageClassName: secrets.stackable.tech
volumeMode: Filesystem
securityContext:
fsGroup: 1000
runAsGroup: 1000
runAsUser: 1000
restartPolicy: OnFailure
---
apiVersion: v1
kind: ConfigMap
metadata:
name: access-kafka-script
namespace: stackable-products
data:
script.sh: |
set -ex
sleep infinity
A sample kcat callout from this pod:
/stackable/kcat -b kafka-broker-default-0.kafka-broker-default.stackable-products.svc.cluster.local:9093 \
-X security.protocol=SASL_SSL \
-X ssl.ca.location=/stackable/tls_cert_server_mount/ca.crt \
-X sasl.kerberos.keytab=/stackable/kerberos/keytab \
-X sasl.kerberos.service.name=kafka \
-X sasl.kerberos.principal=admin/access-kafka.stackable-products.svc.cluster.local@CLUSTER.LOCAL \
-X sasl.mechanism=GSSAPI \
-L -# t test-topic -C
Currently we only support mutual tls as authentication mechanism, especially in existing installations, Kerberos may be a preferred way of doing this.