banzaicloud / koperator

Oh no! Yet another Apache Kafka operator for Kubernetes
Apache License 2.0
784 stars 195 forks source link

unable to delete a user #728

Closed Raboo closed 1 year ago

Raboo commented 2 years ago

Describe the bug Cannot remove a user.

Steps to reproduce the issue: Created a user with write permission. Added read permission later to that user. Deleted the user.

Expected behavior User gets deleted.

Screenshots Operator logs

{"level":"info","ts":"2022-01-05T08:56:16.434Z","logger":"controllers.KafkaUser","msg":"Reconciling KafkaUser","kafkauser":"kafka-dev/myuser","Request.Name":"myuser"}
{"level":"info","ts":"2022-01-05T08:56:16.446Z","logger":"controllers.KafkaUser","msg":"Kafka user is marked for deletion, revoking certificates","kafkauser":"kafka-dev/myuser","Request.Name":"myuser"}
{"level":"info","ts":"2022-01-05T08:56:16.446Z","logger":"controllers.KafkaUser","msg":"Deleting user ACLs from kafka","kafkauser":"kafka-dev/myuser","Request.Name":"myuser"}
{"level":"info","ts":"2022-01-05T08:56:17.344Z","logger":"kafka_util","msg":"Kafka client closed cleanly"}
{"level":"info","ts":"2022-01-05T08:56:17.344Z","logger":"controllers.KafkaUser","msg":"failed to finalize kafkauser","kafkauser":"kafka-dev/myuser","Request.Name":"myuser"}
{"level":"error","ts":"2022-01-05T08:56:17.345Z","logger":"controller.KafkaUser","msg":"Reconciler error","reconciler group":"kafka.banzaicloud.io","reconciler kind":"KafkaUser","name":"myuser","namespace":"kafka-dev","error":"EOF","stacktrace":"sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2.2\n\t/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.10.3/pkg/internal/controller/controller.go:227"}
$ kubectl -n kafka-dev get kafkauser myuser -o yaml
apiVersion: kafka.banzaicloud.io/v1alpha1
kind: KafkaUser
metadata:
  annotations:
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"kafka.banzaicloud.io/v1alpha1","kind":"KafkaUser","metadata":{"annotations":{},"name":"myuser","namespace":"kafka-dev"},"spec":{"clusterRef":{"name":"kafka"},"secretName":"myuser-cert","topicGrants":[{"accessType":"write","topicName":"my-topic"},{"accessType":"read","topicName":"my-topic"}]}}
  creationTimestamp: "2022-01-05T00:07:11Z"
  deletionGracePeriodSeconds: 0
  deletionTimestamp: "2022-01-05T00:13:47Z"
  finalizers:
  - finalizer.kafkausers.kafka.banzaicloud.io
  generation: 3
  labels:
    kafkaCluster: kafka.kafka-dev
  name: myuser
  namespace: kafka-dev
  resourceVersion: "347089176"
  uid: 8d705a82-c814-4406-92a3-d74f37bb841c
spec:
  clusterRef:
    name: kafka
  secretName: myuser-cert
  topicGrants:
  - accessType: write
    topicName: my-topic
  - accessType: read
    topicName: my-topic
status:
  acls:
  - User:CN=myuser,Topic,LITERAL,my-topic,Describe,Allow,*
  - User:CN=myuser,Topic,LITERAL,my-topic,Create,Allow,*
  - User:CN=myuser,Topic,LITERAL,my-topic,Write,Allow,*
  - User:CN=myuser,Topic,LITERAL,my-topic,Read,Allow,*
  - User:CN=myuser,Group,LITERAL,*,Read,Allow,*
  state: created

Additional context v0.20.1

baluchicken commented 2 years ago

Thanks for opening this issue. Can you confirm that during the deletion your kafka cluster is healthy and up and running? Also can you please share your KafkaCluster CR and the CertManager version you are using?

Raboo commented 2 years ago

Cluster is fine. Ok, it's weird, I cannot replicate this with other user accounts, but I still can't delete the KafkaUser myuser. It's stuck in removal.

baluchicken commented 2 years ago

Can you please check with the bundled kafka-acls shell script if these rules are still present inside Kafka?

  - User:CN=myuser,Topic,LITERAL,my-topic,Describe,Allow,*
  - User:CN=myuser,Topic,LITERAL,my-topic,Create,Allow,*
  - User:CN=myuser,Topic,LITERAL,my-topic,Write,Allow,*
  - User:CN=myuser,Topic,LITERAL,my-topic,Read,Allow,*
  - User:CN=myuser,Group,LITERAL,*,Read,Allow,*

If not just edit the myuser KafkaUser CR and remove the finalizer part from it:

    finalizers:
     - finalizer.kafkausers.kafka.banzaicloud.io

Then the operator will be able to remove the resource.

Raboo commented 2 years ago

I cannot run kafka-acls.sh

$ kubectl exec -it kafka-0-j7qhs -c kafka -- kafka-acls.sh --bootstrap-server 0.0.0.0:29092 --list
Exception in thread "main" java.lang.reflect.InvocationTargetException
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
    at java.instrument/sun.instrument.InstrumentationImpl.loadClassAndStartAgent(Unknown Source)
    at java.instrument/sun.instrument.InstrumentationImpl.loadClassAndCallPremain(Unknown Source)
Caused by: java.net.BindException: Address already in use
    at java.base/sun.nio.ch.Net.bind0(Native Method)
    at java.base/sun.nio.ch.Net.bind(Unknown Source)
    at java.base/sun.nio.ch.Net.bind(Unknown Source)
    at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source)
    at java.base/sun.nio.ch.ServerSocketAdaptor.bind(Unknown Source)
    at jdk.httpserver/sun.net.httpserver.ServerImpl.<init>(Unknown Source)
    at jdk.httpserver/sun.net.httpserver.HttpServerImpl.<init>(Unknown Source)
    at jdk.httpserver/sun.net.httpserver.DefaultHttpServerProvider.createHttpServer(Unknown Source)
    at jdk.httpserver/com.sun.net.httpserver.HttpServer.create(Unknown Source)
    at io.prometheus.jmx.shaded.io.prometheus.client.exporter.HTTPServer.<init>(HTTPServer.java:190)
    at io.prometheus.jmx.shaded.io.prometheus.jmx.JavaAgent.premain(JavaAgent.java:31)
    ... 6 more
*** java.lang.instrument ASSERTION FAILED ***: "result" with message agent load/premain call failed at ./src/java.instrument/share/native/libinstrument/JPLISAgent.c line: 422
FATAL ERROR in native method: processing of -javaagent failed, processJavaStart failed
command terminated with exit code 134

It seems to try to run -javaagent:/opt/jmx-exporter/jmx_prometheus.jar=9020:/etc/jmx-exporter/config.yaml and the port 9020 is used.

Raboo commented 2 years ago

I also ran a

/usr/local/openjdk-11/bin/java -Xmx2G -Xms2G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Dsun.net.inetaddr.ttl=60 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/opt/kafka/bin/../logs -Dlog4j.configuration=file:/opt/kafka/bin/../config/tools-log4j.properties -cp '/opt/kafka/libs/extensions/*:/opt/kafka/bin/../libs/activation-1.1.1.jar:/opt/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/opt/kafka/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/bin/../libs/audience-annotations-0.5.0.jar:/opt/kafka/bin/../libs/commons-cli-1.4.jar:/opt/kafka/bin/../libs/commons-lang3-3.8.1.jar:/opt/kafka/bin/../libs/connect-api-2.8.1.jar:/opt/kafka/bin/../libs/connect-basic-auth-extension-2.8.1.jar:/opt/kafka/bin/../libs/connect-file-2.8.1.jar:/opt/kafka/bin/../libs/connect-json-2.8.1.jar:/opt/kafka/bin/../libs/connect-mirror-2.8.1.jar:/opt/kafka/bin/../libs/connect-mirror-client-2.8.1.jar:/opt/kafka/bin/../libs/connect-runtime-2.8.1.jar:/opt/kafka/bin/../libs/connect-transforms-2.8.1.jar:/opt/kafka/bin/../libs/extensions:/opt/kafka/bin/../libs/hk2-api-2.6.1.jar:/opt/kafka/bin/../libs/hk2-locator-2.6.1.jar:/opt/kafka/bin/../libs/hk2-utils-2.6.1.jar:/opt/kafka/bin/../libs/jackson-annotations-2.10.5.jar:/opt/kafka/bin/../libs/jackson-core-2.10.5.jar:/opt/kafka/bin/../libs/jackson-databind-2.10.5.1.jar:/opt/kafka/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/opt/kafka/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/opt/kafka/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/opt/kafka/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/opt/kafka/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/opt/kafka/bin/../libs/jackson-module-paranamer-2.10.5.jar:/opt/kafka/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/opt/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/opt/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/opt/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/opt/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/opt/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/opt/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/opt/kafka/bin/../libs/javassist-3.27.0-GA.jar:/opt/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/opt/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/opt/kafka/bin/../libs/jaxb-api-2.3.0.jar:/opt/kafka/bin/../libs/jersey-client-2.34.jar:/opt/kafka/bin/../libs/jersey-common-2.34.jar:/opt/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/opt/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/opt/kafka/bin/../libs/jersey-hk2-2.34.jar:/opt/kafka/bin/../libs/jersey-server-2.34.jar:/opt/kafka/bin/../libs/jetty-client-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-http-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-io-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-security-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-server-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-util-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jline-3.12.1.jar:/opt/kafka/bin/../libs/jopt-simple-5.0.4.jar:/opt/kafka/bin/../libs/kafka-clients-2.8.1.jar:/opt/kafka/bin/../libs/kafka-log4j-appender-2.8.1.jar:/opt/kafka/bin/../libs/kafka-metadata-2.8.1.jar:/opt/kafka/bin/../libs/kafka-raft-2.8.1.jar:/opt/kafka/bin/../libs/kafka-shell-2.8.1.jar:/opt/kafka/bin/../libs/kafka-streams-2.8.1.jar:/opt/kafka/bin/../libs/kafka-streams-examples-2.8.1.jar:/opt/kafka/bin/../libs/kafka-streams-scala_2.13-2.8.1.jar:/opt/kafka/bin/../libs/kafka-streams-test-utils-2.8.1.jar:/opt/kafka/bin/../libs/kafka-tools-2.8.1.jar:/opt/kafka/bin/../libs/kafka_2.13-2.8.1-sources.jar:/opt/kafka/bin/../libs/kafka_2.13-2.8.1.jar:/opt/kafka/bin/../libs/log4j-1.2.17.jar:/opt/kafka/bin/../libs/lz4-java-1.7.1.jar:/opt/kafka/bin/../libs/maven-artifact-3.8.1.jar:/opt/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka/bin/../libs/netty-buffer-4.1.62.Final.jar:/opt/kafka/bin/../libs/netty-codec-4.1.62.Final.jar:/opt/kafka/bin/../libs/netty-common-4.1.62.Final.jar:/opt/kafka/bin/../libs/netty-handler-4.1.62.Final.jar:/opt/kafka/bin/../libs/netty-resolver-4.1.62.Final.jar:/opt/kafka/bin/../libs/netty-transport-4.1.62.Final.jar:/opt/kafka/bin/../libs/netty-transport-native-epoll-4.1.62.Final.jar:/opt/kafka/bin/../libs/netty-transport-native-unix-common-4.1.62.Final.jar:/opt/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/opt/kafka/bin/../libs/paranamer-2.8.jar:/opt/kafka/bin/../libs/plexus-utils-3.2.1.jar:/opt/kafka/bin/../libs/reflections-0.9.12.jar:/opt/kafka/bin/../libs/rocksdbjni-5.18.4.jar:/opt/kafka/bin/../libs/scala-collection-compat_2.13-2.3.0.jar:/opt/kafka/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/opt/kafka/bin/../libs/scala-library-2.13.5.jar:/opt/kafka/bin/../libs/scala-logging_2.13-3.9.2.jar:/opt/kafka/bin/../libs/scala-reflect-2.13.5.jar:/opt/kafka/bin/../libs/slf4j-api-1.7.30.jar:/opt/kafka/bin/../libs/slf4j-log4j12-1.7.30.jar:/opt/kafka/bin/../libs/snappy-java-1.1.8.1.jar:/opt/kafka/bin/../libs/zookeeper-3.5.9.jar:/opt/kafka/bin/../libs/zookeeper-jute-3.5.9.jar:/opt/kafka/bin/../libs/zstd-jni-1.4.9-1.jar' kafka.admin.AclCommand --list --bootstrap-server 0.0.0.0:29092

But the pod got deleted, I think it might have oom'ed or something.

So I tried

root@kafka-0:/# /usr/local/openjdk-11/bin/java -Xmx450M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true -Dsun.net.inetaddr.ttl=60 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/opt/kafka/bin/../logs -Dlog4j.configuration=file:/opt/kafka/bin/../config/tools-log4j.properties -cp '/opt/kafka/libs/extensions/*:/opt/kafka/bin/../libs/activation-1.1.1.jar:/opt/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/opt/kafka/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/bin/../libs/audience-annotations-0.5.0.jar:/opt/kafka/bin/../libs/commons-cli-1.4.jar:/opt/kafka/bin/../libs/commons-lang3-3.8.1.jar:/opt/kafka/bin/../libs/connect-api-2.8.1.jar:/opt/kafka/bin/../libs/connect-basic-auth-extension-2.8.1.jar:/opt/kafka/bin/../libs/connect-file-2.8.1.jar:/opt/kafka/bin/../libs/connect-json-2.8.1.jar:/opt/kafka/bin/../libs/connect-mirror-2.8.1.jar:/opt/kafka/bin/../libs/connect-mirror-client-2.8.1.jar:/opt/kafka/bin/../libs/connect-runtime-2.8.1.jar:/opt/kafka/bin/../libs/connect-transforms-2.8.1.jar:/opt/kafka/bin/../libs/extensions:/opt/kafka/bin/../libs/hk2-api-2.6.1.jar:/opt/kafka/bin/../libs/hk2-locator-2.6.1.jar:/opt/kafka/bin/../libs/hk2-utils-2.6.1.jar:/opt/kafka/bin/../libs/jackson-annotations-2.10.5.jar:/opt/kafka/bin/../libs/jackson-core-2.10.5.jar:/opt/kafka/bin/../libs/jackson-databind-2.10.5.1.jar:/opt/kafka/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/opt/kafka/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/opt/kafka/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/opt/kafka/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/opt/kafka/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/opt/kafka/bin/../libs/jackson-module-paranamer-2.10.5.jar:/opt/kafka/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/opt/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/opt/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/opt/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/opt/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/opt/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/opt/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/opt/kafka/bin/../libs/javassist-3.27.0-GA.jar:/opt/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/opt/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/opt/kafka/bin/../libs/jaxb-api-2.3.0.jar:/opt/kafka/bin/../libs/jersey-client-2.34.jar:/opt/kafka/bin/../libs/jersey-common-2.34.jar:/opt/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/opt/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/opt/kafka/bin/../libs/jersey-hk2-2.34.jar:/opt/kafka/bin/../libs/jersey-server-2.34.jar:/opt/kafka/bin/../libs/jetty-client-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-continuation-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-http-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-io-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-security-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-server-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-servlet-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-servlets-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-util-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jetty-util-ajax-9.4.43.v20210629.jar:/opt/kafka/bin/../libs/jline-3.12.1.jar:/opt/kafka/bin/../libs/jopt-simple-5.0.4.jar:/opt/kafka/bin/../libs/kafka-clients-2.8.1.jar:/opt/kafka/bin/../libs/kafka-log4j-appender-2.8.1.jar:/opt/kafka/bin/../libs/kafka-metadata-2.8.1.jar:/opt/kafka/bin/../libs/kafka-raft-2.8.1.jar:/opt/kafka/bin/../libs/kafka-shell-2.8.1.jar:/opt/kafka/bin/../libs/kafka-streams-2.8.1.jar:/opt/kafka/bin/../libs/kafka-streams-examples-2.8.1.jar:/opt/kafka/bin/../libs/kafka-streams-scala_2.13-2.8.1.jar:/opt/kafka/bin/../libs/kafka-streams-test-utils-2.8.1.jar:/opt/kafka/bin/../libs/kafka-tools-2.8.1.jar:/opt/kafka/bin/../libs/kafka_2.13-2.8.1-sources.jar:/opt/kafka/bin/../libs/kafka_2.13-2.8.1.jar:/opt/kafka/bin/../libs/log4j-1.2.17.jar:/opt/kafka/bin/../libs/lz4-java-1.7.1.jar:/opt/kafka/bin/../libs/maven-artifact-3.8.1.jar:/opt/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka/bin/../libs/netty-buffer-4.1.62.Final.jar:/opt/kafka/bin/../libs/netty-codec-4.1.62.Final.jar:/opt/kafka/bin/../libs/netty-common-4.1.62.Final.jar:/opt/kafka/bin/../libs/netty-handler-4.1.62.Final.jar:/opt/kafka/bin/../libs/netty-resolver-4.1.62.Final.jar:/opt/kafka/bin/../libs/netty-transport-4.1.62.Final.jar:/opt/kafka/bin/../libs/netty-transport-native-epoll-4.1.62.Final.jar:/opt/kafka/bin/../libs/netty-transport-native-unix-common-4.1.62.Final.jar:/opt/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/opt/kafka/bin/../libs/paranamer-2.8.jar:/opt/kafka/bin/../libs/plexus-utils-3.2.1.jar:/opt/kafka/bin/../libs/reflections-0.9.12.jar:/opt/kafka/bin/../libs/rocksdbjni-5.18.4.jar:/opt/kafka/bin/../libs/scala-collection-compat_2.13-2.3.0.jar:/opt/kafka/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/opt/kafka/bin/../libs/scala-library-2.13.5.jar:/opt/kafka/bin/../libs/scala-logging_2.13-3.9.2.jar:/opt/kafka/bin/../libs/scala-reflect-2.13.5.jar:/opt/kafka/bin/../libs/slf4j-api-1.7.30.jar:/opt/kafka/bin/../libs/slf4j-log4j12-1.7.30.jar:/opt/kafka/bin/../libs/snappy-java-1.1.8.1.jar:/opt/kafka/bin/../libs/zookeeper-3.5.9.jar:/opt/kafka/bin/../libs/zookeeper-r-jute-3.5.9.jar:/opt/kafka/bin/../libs/zstd-jni-1.4.9-1.jar' kafka.admin.AclCommand --bootstrap-server 0.0.0.0:29092 --list

It's running and after a while I get a timeout

Error while executing ACL command: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeAcls, deadlineMs=1641812582631, tries=1, nextAllowedTryMs=1641812582732) timed out at 1641812582632 after 1 attempt(s)
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeAcls, deadlineMs=1641812582631, tries=1, nextAllowedTryMs=1641812582732) timed out at 1641812582632 after 1 attempt(s)
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at kafka.admin.AclCommand$AdminClientService.getAcls(AclCommand.scala:177)
    at kafka.admin.AclCommand$AdminClientService.listAcls(AclCommand.scala:146)
    at kafka.admin.AclCommand$AdminClientService.$anonfun$listAcls$1(AclCommand.scala:139)
    at kafka.admin.AclCommand$AdminClientService.listAcls(AclCommand.scala:138)
    at kafka.admin.AclCommand$.main(AclCommand.scala:74)
    at kafka.admin.AclCommand.main(AclCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeAcls, deadlineMs=1641812582631, tries=1, nextAllowedTryMs=1641812582732) timed out at 1641812582632 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeAcls
stoader commented 2 years ago

Caused by: java.net.BindException: Address already in use

You get the above error as its' not supported to run kafka tools from within a broker pod as Kafka tools try to bind to ports which are already in use by the broker. You need to create a separate pod and run the kafka-acls shell script.

e.g.

apiVersion: v1
kind: Pod
metadata:
  name: kafka-client
spec:
  containers:
  - name: kafka
    image: ghcr.io/banzaicloud/kafka:2.13-2.8.1
    # Just spin & wait forever
    command: [ "/bin/bash", "-c", "--" ]
    args: [ "while true; do sleep 3000; done;" ]

Once the kafka-client pod is up and running exec into it an run the kafka-acls shell script from /opt/kafka/bin

Raboo commented 2 years ago

I've tried [kafka-pod-ip]:9094/29092/29093, I've tried kafka-headless:9094/29092/29093.

They all result in this error

Error while executing ACL command: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeAcls, deadlineMs=1641821474583, tries=1, nextAllowedTryMs=1641821474684) timed out at 1641821474584 after 1 attempt(s)
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeAcls, deadlineMs=1641821474583, tries=1, nextAllowedTryMs=1641821474684) timed out at 1641821474584 after 1 attempt(s)
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at kafka.admin.AclCommand$AdminClientService.getAcls(AclCommand.scala:177)
    at kafka.admin.AclCommand$AdminClientService.listAcls(AclCommand.scala:146)
    at kafka.admin.AclCommand$AdminClientService.$anonfun$listAcls$1(AclCommand.scala:139)
    at kafka.admin.AclCommand$AdminClientService.listAcls(AclCommand.scala:138)
    at kafka.admin.AclCommand$.main(AclCommand.scala:74)
    at kafka.admin.AclCommand.main(AclCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeAcls, deadlineMs=1641821474583, tries=1, nextAllowedTryMs=1641821474684) timed out at 1641821474584 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeAcls

Against which endpoint should i use the --boostrap-server?

baluchicken commented 2 years ago

Can you please share your kafkacluster CR for further debugging this issue? Thanks.

Raboo commented 2 years ago
apiVersion: kafka.banzaicloud.io/v1beta1
kind: KafkaCluster
metadata:
  name: kafka
  namespace: kafka-dev
spec:
  headlessServiceEnabled: true
  zkAddresses:
    - "zookeeper-client:2181"
  zkPath: "/kafka"
  propagateLabels: false
  oneBrokerPerNode: false
  clusterImage: "ghcr.io/banzaicloud/kafka:2.13-2.8.1"
  readOnlyConfig: |
    auto.create.topics.enable=false
    default.replication.factor=3
    cruise.control.metrics.topic.auto.create=true
    cruise.control.metrics.topic.num.partitions=1
    cruise.control.metrics.topic.replication.factor=2
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    allow.everyone.if.no.acl.found=false
  brokerConfigGroups:
    default:
      storageConfigs:
        - mountPath: /kafka-logs
          pvcSpec:
            accessModes:
              - ReadWriteOnce
            resources:
              requests:
                storage: 10Gi
      brokerAnnotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "9020"
  brokers:
    - id: 0
      brokerConfigGroup: default
    - id: 1
      brokerConfigGroup: default
    - id: 2
      brokerConfigGroup: default
  rollingUpgradeConfig:
    failureThreshold: 1
  listenersConfig:
    externalListeners:
      - type: ssl
        name: external
        externalStartingPort: 19090
        containerPort: 9094
        serviceAnnotations:
          consul.hashicorp.com/service-name: kafka-dev
        hostnameOverride: kafka-dev.service.consul
    internalListeners:
      - type: ssl
        name: internal
        containerPort: 29092
        usedForInnerBrokerCommunication: true
      - type: ssl
        name: controller
        containerPort: 29093
        usedForInnerBrokerCommunication: false
        usedForControllerCommunication: true
    sslSecrets: # not sure when and where this is used.
      tlsSecretName: test-kafka-operator
      jksPasswordName: test-kafka-operator-pass
      create: true
  ingressController: envoy
  envoyConfig:
    replicas: 2
  cruiseControlConfig:
    cruiseControlAnnotations:
      prometheus.io/port: "9020"
      prometheus.io/scrape: "true"
    cruiseControlTaskSpec:
      RetryDurationMinutes: 5
    topicConfig:
      partitions: 12
      replicationFactor: 3
    config: |
      # Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
      #
      # This is an example property file for Kafka Cruise Control. See KafkaCruiseControlConfig for more details.
      # Configuration for the metadata client.
      # =======================================
      # The maximum interval in milliseconds between two metadata refreshes.
      #metadata.max.age.ms=300000
      # Client id for the Cruise Control. It is used for the metadata client.
      #client.id=kafka-cruise-control
      # The size of TCP send buffer bytes for the metadata client.
      #send.buffer.bytes=131072
      # The size of TCP receive buffer size for the metadata client.
      #receive.buffer.bytes=131072
      # The time to wait before disconnect an idle TCP connection.
      #connections.max.idle.ms=540000
      # The time to wait before reconnect to a given host.
      #reconnect.backoff.ms=50
      # The time to wait for a response from a host after sending a request.
      #request.timeout.ms=30000
      # Configurations for the load monitor
      # =======================================
      # The number of metric fetcher thread to fetch metrics for the Kafka cluster
      num.metric.fetchers=1
      # The metric sampler class
      metric.sampler.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.CruiseControlMetricsReporterSampler
      # Configurations for CruiseControlMetricsReporterSampler
      metric.reporter.topic.pattern=__CruiseControlMetrics
      # The sample store class name
      sample.store.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore
      # The config for the Kafka sample store to save the partition metric samples
      partition.metric.sample.store.topic=__KafkaCruiseControlPartitionMetricSamples
      # The config for the Kafka sample store to save the model training samples
      broker.metric.sample.store.topic=__KafkaCruiseControlModelTrainingSamples
      # The replication factor of Kafka metric sample store topic
      sample.store.topic.replication.factor=2
      # The config for the number of Kafka sample store consumer threads
      num.sample.loading.threads=8
      # The partition assignor class for the metric samplers
      metric.sampler.partition.assignor.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.DefaultMetricSamplerPartitionAssignor
      # The metric sampling interval in milliseconds
      metric.sampling.interval.ms=120000
      metric.anomaly.detection.interval.ms=180000
      # The partition metrics window size in milliseconds
      partition.metrics.window.ms=300000
      # The number of partition metric windows to keep in memory
      num.partition.metrics.windows=1
      # The minimum partition metric samples required for a partition in each window
      min.samples.per.partition.metrics.window=1
      # The broker metrics window size in milliseconds
      broker.metrics.window.ms=300000
      # The number of broker metric windows to keep in memory
      num.broker.metrics.windows=20
      # The minimum broker metric samples required for a partition in each window
      min.samples.per.broker.metrics.window=1
      # The configuration for the BrokerCapacityConfigFileResolver (supports JBOD and non-JBOD broker capacities)
      capacity.config.file=config/capacity.json
      #capacity.config.file=config/capacityJBOD.json
      # Configurations for the analyzer
      # =======================================
      # The list of goals to optimize the Kafka cluster for with pre-computed proposals
      default.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal
      # The list of supported goals
      goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal
      # The list of supported hard goals
      hard.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal
      # The minimum percentage of well monitored partitions out of all the partitions
      min.monitored.partition.percentage=0.95
      # The balance threshold for CPU
      cpu.balance.threshold=1.1
      # The balance threshold for disk
      disk.balance.threshold=1.1
      # The balance threshold for network inbound utilization
      network.inbound.balance.threshold=1.1
      # The balance threshold for network outbound utilization
      network.outbound.balance.threshold=1.1
      # The balance threshold for the replica count
      replica.count.balance.threshold=1.1
      # The capacity threshold for CPU in percentage
      cpu.capacity.threshold=0.8
      # The capacity threshold for disk in percentage
      disk.capacity.threshold=0.8
      # The capacity threshold for network inbound utilization in percentage
      network.inbound.capacity.threshold=0.8
      # The capacity threshold for network outbound utilization in percentage
      network.outbound.capacity.threshold=0.8
      # The threshold to define the cluster to be in a low CPU utilization state
      cpu.low.utilization.threshold=0.0
      # The threshold to define the cluster to be in a low disk utilization state
      disk.low.utilization.threshold=0.0
      # The threshold to define the cluster to be in a low network inbound utilization state
      network.inbound.low.utilization.threshold=0.0
      # The threshold to define the cluster to be in a low disk utilization state
      network.outbound.low.utilization.threshold=0.0
      # The metric anomaly percentile upper threshold
      metric.anomaly.percentile.upper.threshold=90.0
      # The metric anomaly percentile lower threshold
      metric.anomaly.percentile.lower.threshold=10.0
      # How often should the cached proposal be expired and recalculated if necessary
      proposal.expiration.ms=60000
      # The maximum number of replicas that can reside on a broker at any given time.
      max.replicas.per.broker=10000
      # The number of threads to use for proposal candidate precomputing.
      num.proposal.precompute.threads=1
      # the topics that should be excluded from the partition movement.
      #topics.excluded.from.partition.movement
      # Configurations for the executor
      # =======================================
      # The max number of partitions to move in/out on a given broker at a given time.
      num.concurrent.partition.movements.per.broker=10
      # The interval between two execution progress checks.
      execution.progress.check.interval.ms=10000
      # Configurations for anomaly detector
      # =======================================
      # The goal violation notifier class
      anomaly.notifier.class=com.linkedin.kafka.cruisecontrol.detector.notifier.SelfHealingNotifier
      # The metric anomaly finder class
      metric.anomaly.finder.class=com.linkedin.kafka.cruisecontrol.detector.KafkaMetricAnomalyFinder
      # The anomaly detection interval
      anomaly.detection.interval.ms=10000
      # The goal violation to detect.
      anomaly.detection.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal
      # The interested metrics for metric anomaly analyzer.
      metric.anomaly.analyzer.metrics=BROKER_PRODUCE_LOCAL_TIME_MS_MAX,BROKER_PRODUCE_LOCAL_TIME_MS_MEAN,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MAX,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MEAN,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MAX,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MEAN,BROKER_LOG_FLUSH_TIME_MS_MAX,BROKER_LOG_FLUSH_TIME_MS_MEAN
      ## Adjust accordingly if your metrics reporter is an older version and does not produce these metrics.
      #metric.anomaly.analyzer.metrics=BROKER_PRODUCE_LOCAL_TIME_MS_50TH,BROKER_PRODUCE_LOCAL_TIME_MS_999TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_50TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_999TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_50TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_999TH,BROKER_LOG_FLUSH_TIME_MS_50TH,BROKER_LOG_FLUSH_TIME_MS_999TH
      # The zk path to store failed broker information.
      failed.brokers.zk.path=/CruiseControlBrokerList
      # Topic config provider class
      topic.config.provider.class=com.linkedin.kafka.cruisecontrol.config.KafkaTopicConfigProvider
      # The cluster configurations for the KafkaTopicConfigProvider
      cluster.configs.file=config/clusterConfigs.json
      # The maximum time in milliseconds to store the response and access details of a completed user task.
      completed.user.task.retention.time.ms=21600000
      # The maximum time in milliseconds to retain the demotion history of brokers.
      demotion.history.retention.time.ms=86400000
      # The maximum number of completed user tasks for which the response and access details will be cached.
      max.cached.completed.user.tasks=100
      # The maximum number of user tasks for concurrently running in async endpoints across all users.
      max.active.user.tasks=5
      # Enable self healing for all anomaly detectors, unless the particular anomaly detector is explicitly disabled
      self.healing.enabled=true
      # Enable self healing for broker failure detector
      #self.healing.broker.failure.enabled=true
      # Enable self healing for goal violation detector
      #self.healing.goal.violation.enabled=true
      # Enable self healing for metric anomaly detector
      #self.healing.metric.anomaly.enabled=true
      # configurations for the webserver
      # ================================
      # HTTP listen port
      webserver.http.port=9090
      # HTTP listen address
      webserver.http.address=0.0.0.0
      # Whether CORS support is enabled for API or not
      webserver.http.cors.enabled=false
      # Value for Access-Control-Allow-Origin
      webserver.http.cors.origin=http://localhost:8080/
      # Value for Access-Control-Request-Method
      webserver.http.cors.allowmethods=OPTIONS,GET,POST
      # Headers that should be exposed to the Browser (Webapp)
      # This is a special header that is used by the
      # User Tasks subsystem and should be explicitly
      # Enabled when CORS mode is used as part of the
      # Admin Interface
      webserver.http.cors.exposeheaders=User-Task-ID
      # REST API default prefix
      # (dont forget the ending *)
      webserver.api.urlprefix=/kafkacruisecontrol/*
      # Location where the Cruise Control frontend is deployed
      webserver.ui.diskpath=./cruise-control-ui/dist/
      # URL path prefix for UI
      # (dont forget the ending *)
      webserver.ui.urlprefix=/*
      # Time After which request is converted to Async
      webserver.request.maxBlockTimeMs=10000
      # Default Session Expiry Period
      webserver.session.maxExpiryTimeMs=60000
      # Session cookie path
      webserver.session.path=/
      # Server Access Logs
      webserver.accesslog.enabled=true
      # Location of HTTP Request Logs
      webserver.accesslog.path=access.log
      # HTTP Request Log retention days
      webserver.accesslog.retention.days=14
    clusterConfig: |
      {
        "min.insync.replicas": 3
      }
baluchicken commented 2 years ago

Thanks, since the allow.everyone.if.no.acl.found=false set to true kafka will not allow to list the ACLs for any user. That is why you are seeing the timeout exception when trying to connect with a kafka client. To make it work please specify a super user cert to the kafka-acl call. If you installed the operator correctly and used cert-manager, there should be two superuser created for you by default by the operator those secrets can be used for this test. The proper secret should be called kafka-controller in your case.

Raboo commented 2 years ago
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=my-topic, patternType=LITERAL)`:
    (principal=User:CN=myuser, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:CN=myuser, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:CN=myuser, host=*, operation=DESCRIBE_CONFIGS, permissionType=ALLOW)
    (principal=User:CN=myuser, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:CN=myuser, host=*, operation=READ, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=*, patternType=LITERAL)`:
    (principal=User:CN=myuser, host=*, operation=READ, permissionType=ALLOW)
baluchicken commented 2 years ago

It seems that the operator cannot delete this ACL from Kafka, can you please enable verbose logging for the operator and paste the logs here. You can enable the verbose logging by modifying the operator deployment and adding the --verbose to the container command field.

Raboo commented 2 years ago

Sure, here are the logs with verbose enabled

{"level":"info","ts":"2022-01-18T09:51:44.052Z","logger":"controllers.KafkaUser","msg":"Reconciling KafkaUser","kafkauser":"kafka-dev/myuser","Request.Name":"myuser"}
{"level":"info","ts":"2022-01-18T09:51:44.064Z","logger":"controllers.KafkaUser","msg":"Kafka user is marked for deletion, revoking certificates","kafkauser":"kafka-dev/myuser","Request.Name":"myuser"}
{"level":"info","ts":"2022-01-18T09:51:44.064Z","logger":"controllers.KafkaUser","msg":"Deleting user ACLs from kafka","kafkauser":"kafka-dev/myuser","Request.Name":"myuser"}
{"level":"info","ts":"2022-01-18T09:51:44.773Z","logger":"kafka_util","msg":"Kafka client closed cleanly"}
{"level":"info","ts":"2022-01-18T09:51:44.773Z","logger":"controllers.KafkaUser","msg":"failed to finalize kafkauser","kafkauser":"kafka-dev/myuser","Request.Name":"myuser"}
{"level":"error","ts":"2022-01-18T09:51:44.774Z","logger":"controller.KafkaUser","msg":"Reconciler error","reconciler group":"kafka.banzaicloud.io","reconciler kind":"KafkaUser","name":"myuser","namespace":"kafka-dev","error":"EOF","stacktrace":"sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).Start.func2.2\n\t/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.10.3/pkg/internal/controller/controller.go:227"}

Verbose doesn't seem to make any difference to the logging.

baluchicken commented 2 years ago

Hmm it is very odd. Sadly I do not know what is causing this. The only thing you can do is removing the finaliser from the affected kafka User and removing ACL from kafka with the shell script provided by the kafka itself.

stoader commented 2 years ago

@Raboo can you attach your KafkaCluster, KafkaUser, KafkaTopic etc custom resources that can be used for reproing this issue?

Raboo commented 2 years ago

@stoader They are listed above. However I myself am not able to reproduce this. I simply let that user exist as this was a dev cluster. I mostly reported this issue to help improve the operator and learn how to manually fix this (which i didn't)

stoader commented 2 years ago

@Raboo I can't see the KafkaTopic custom resource only KafkaCluster and KafkaUser

Raboo commented 1 year ago

Sorry, this cluster is no longer available. And I was unable to reproduce this in a new cluster. Perhaps it was a glitch in the matrix. Thanks for the effort!