strimzi / strimzi-kafka-oauth

OAuth2 support for Apache Kafka® to work with many OAuth2 authorization servers
Apache License 2.0
147 stars 90 forks source link

Unable to setup SASL_PLAINTEXT for some listeners while using PLAINTEXT for interbroker communication #189

Closed pegtrifork closed 1 year ago

pegtrifork commented 1 year ago

Using the folloing setup I don't expect the server to do any logins, since interbroker communication is to use the INTER listener.

listeners=CLIENT://kafka:29092,INTER://kafka:19092,EXTERNAL://0.0.0.0:9092
advertised.listeners=CLIENT://kafka:29092,INTER://kafka:19092,EXTERNAL://127.0.0.1:9092
zookeeper.connect=zookeeper:2181
inter.broker.listener.name=INTER
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT,INTER:PLAINTEXT,EXTERNAL:PLAINTEXT
sasl.enabled.mechanisms=OAUTHBEARER

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    oauth.check.issuer="false" \
    oauth.jwks.endpoint.uri="http://keycloak:8080/auth/realms/master/protocol/openid-connect/certs" \
    oauth.username.claim="preferred_username";

listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler

broker.id=1
min.insync.replicas=1
default.replication.factor=1
transaction.state.log.replication.factor=1
offsets.topic.replication.factor=1
inter.broker.protocol.version=3.2

However, I get this exception when trying to start kafka:

cheetah-infrastructure-kafka-1  | java.io.IOException: No principal name in JWT claim: sub
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:165)
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
cheetah-infrastructure-kafka-1  |       at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
cheetah-infrastructure-kafka-1  |       at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:679)
cheetah-infrastructure-kafka-1  |       at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:677)
cheetah-infrastructure-kafka-1  |       at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
cheetah-infrastructure-kafka-1  |       at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:677)
cheetah-infrastructure-kafka-1  |       at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107)
cheetah-infrastructure-kafka-1  |       at kafka.network.Processor.<init>(SocketServer.scala:1008)
cheetah-infrastructure-kafka-1  |       at kafka.network.Acceptor.newProcessor(SocketServer.scala:921)
cheetah-infrastructure-kafka-1  |       at kafka.network.Acceptor.$anonfun$addProcessors$1(SocketServer.scala:894)
cheetah-infrastructure-kafka-1  |       at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
cheetah-infrastructure-kafka-1  |       at kafka.network.Acceptor.addProcessors(SocketServer.scala:893)
cheetah-infrastructure-kafka-1  |       at kafka.network.DataPlaneAcceptor.configure(SocketServer.scala:600)
cheetah-infrastructure-kafka-1  |       at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:269)
cheetah-infrastructure-kafka-1  |       at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:261)
cheetah-infrastructure-kafka-1  |       at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
cheetah-infrastructure-kafka-1  |       at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
cheetah-infrastructure-kafka-1  |       at scala.collection.AbstractIterable.foreach(Iterable.scala:926)
cheetah-infrastructure-kafka-1  |       at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:261)
cheetah-infrastructure-kafka-1  |       at kafka.network.SocketServer.startup(SocketServer.scala:135)
cheetah-infrastructure-kafka-1  |       at kafka.server.KafkaServer.startup(KafkaServer.scala:309)
cheetah-infrastructure-kafka-1  |       at kafka.Kafka$.main(Kafka.scala:109)
cheetah-infrastructure-kafka-1  |       at kafka.Kafka.main(Kafka.scala)
cheetah-infrastructure-kafka-1  | Caused by: org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerConfigException: No principal name in JWT claim: sub
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handleTokenCallback(OAuthBearerUnsecuredLoginCallbackHandler.java:219)
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:163)
cheetah-infrastructure-kafka-1  |       ... 30 more
cheetah-infrastructure-kafka-1  | Caused by: org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerIllegalTokenException: No principal name in JWT claim: sub
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws.<init>(OAuthBearerUnsecuredJws.java:109)
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handleTokenCallback(OAuthBearerUnsecuredLoginCallbackHandler.java:211)
cheetah-infrastructure-kafka-1  |       ... 31 more
cheetah-infrastructure-kafka-1  | [2023-03-20 13:06:20,739] DEBUG Nothing here to abort (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)
cheetah-infrastructure-kafka-1  | [2023-03-20 13:06:20,753] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
cheetah-infrastructure-kafka-1  | org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: An internal error occurred while retrieving token from callback handler
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107)
cheetah-infrastructure-kafka-1  |       at kafka.network.Processor.<init>(SocketServer.scala:1008)
cheetah-infrastructure-kafka-1  |       at kafka.network.Acceptor.newProcessor(SocketServer.scala:921)
cheetah-infrastructure-kafka-1  |       at kafka.network.Acceptor.$anonfun$addProcessors$1(SocketServer.scala:894)
cheetah-infrastructure-kafka-1  |       at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
cheetah-infrastructure-kafka-1  |       at kafka.network.Acceptor.addProcessors(SocketServer.scala:893)
cheetah-infrastructure-kafka-1  |       at kafka.network.DataPlaneAcceptor.configure(SocketServer.scala:600)
cheetah-infrastructure-kafka-1  |       at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:269)
cheetah-infrastructure-kafka-1  |       at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:261)
cheetah-infrastructure-kafka-1  |       at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
cheetah-infrastructure-kafka-1  |       at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
cheetah-infrastructure-kafka-1  |       at scala.collection.AbstractIterable.foreach(Iterable.scala:926)
cheetah-infrastructure-kafka-1  |       at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:261)
cheetah-infrastructure-kafka-1  |       at kafka.network.SocketServer.startup(SocketServer.scala:135)
cheetah-infrastructure-kafka-1  |       at kafka.server.KafkaServer.startup(KafkaServer.scala:309)
cheetah-infrastructure-kafka-1  |       at kafka.Kafka$.main(Kafka.scala:109)
cheetah-infrastructure-kafka-1  |       at kafka.Kafka.main(Kafka.scala)
cheetah-infrastructure-kafka-1  | Caused by: javax.security.auth.login.LoginException: An internal error occurred while retrieving token from callback handler
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:319)
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
cheetah-infrastructure-kafka-1  |       at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
cheetah-infrastructure-kafka-1  |       at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:679)
cheetah-infrastructure-kafka-1  |       at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:677)
cheetah-infrastructure-kafka-1  |       at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
cheetah-infrastructure-kafka-1  |       at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:677)
cheetah-infrastructure-kafka-1  |       at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
cheetah-infrastructure-kafka-1  |       at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)
cheetah-infrastructure-kafka-1  |       ... 18 more

The settings correspond to the documentation here: https://github.com/strimzi/strimzi-kafka-oauth/blob/6f997c1b240fc41cd74f5c29c575d9d723df903a/oauth-server/src/main/java/io/strimzi/kafka/oauth/server/JaasServerOauthValidatorCallbackHandler.java#L67

If I add

    oauth.client.id="<valid-client-id>" \
    oauth.client.secret="<valid_client-secret>" \
    oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/test/protocol/openid-connect/token" \

to the sasl.jaas.config and listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler It seems to work. But I don't understand why I need to provide credential here, and not just on the client connecting to the listener.

scholzj commented 1 year ago

I think sharing a full logs and full configurations is important here rather than just some snippets.

pegtrifork commented 1 year ago

docker-compose:

services:
  zookeeper:
    image: quay.io/strimzi/kafka:0.33.2-kafka-3.2.3
    command:
      ["sh", "-c", "bin/zookeeper-server-start.sh config/zookeeper.properties"]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    image: my-kafka
    command:
      [
        "sh",
        "-c",
        "bin/kafka-server-start.sh /opt/kafka/custom-config/server.properties",
      ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ROOT_LOGGER_LEVEL: debug
    volumes:
      - ../config/kafka/server.properties:/opt/kafka/custom-config/server.properties
    healthcheck:
      test: nc -z kafka 19092 || exit -1
      start_period: 15s
      interval: 5s
      timeout: 10s
      retries: 10

server.properties looking like this I expected to work:

listeners=CLIENT://kafka:29092,INTER://kafka:19092,EXTERNAL://0.0.0.0:9092
advertised.listeners=CLIENT://kafka:29092,INTER://kafka:19092,EXTERNAL://127.0.0.1:9092
zookeeper.connect=zookeeper:2181
inter.broker.listener.name=INTER
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT,INTER:PLAINTEXT,EXTERNAL:PLAINTEXT
sasl.enabled.mechanisms=OAUTHBEARER

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    oauth.check.issuer="false" \
    oauth.jwks.endpoint.uri="http://keycloak:8080/auth/realms/master/protocol/openid-connect/certs" \
    oauth.username.claim="preferred_username";

listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler

listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

broker.id=1
min.insync.replicas=1
default.replication.factor=1
transaction.state.log.replication.factor=1
offsets.topic.replication.factor=1
inter.broker.protocol.version=3.2

But gives the following logs:

 [2023-03-20 14:02:59,871] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
 [2023-03-20 14:03:00,145] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
 [2023-03-20 14:03:00,220] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
 [2023-03-20 14:03:00,223] INFO starting (kafka.server.KafkaServer)
 [2023-03-20 14:03:00,224] INFO Connecting to zookeeper on zookeeper:2181 (kafka.server.KafkaServer)
 [2023-03-20 14:03:00,238] INFO [ZooKeeperClient Kafka server] Initializing a new session to zookeeper:2181. (kafka.zookeeper.ZooKeeperClient)
 [2023-03-20 14:03:00,243] INFO Client environment:zookeeper.version=3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,243] INFO Client environment:host.name=eef860f8113b (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,243] INFO Client environment:java.version=17.0.6 (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,243] INFO Client environment:java.vendor=Red Hat, Inc. (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,243] INFO Client environment:java.home=/usr/lib/jvm/java-17-openjdk-17.0.6.0.10-3.el8_7.x86_64 (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,243] INFO Client environment:java.class.path=/opt/kafka/bin/../libs/accessors-smart-2.4.7.jar:/opt/kafka/bin/../libs/activation-1.1.1.jar:/opt/kafka/bin/../libs/annotations-13.0.jar:/opt/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/opt/kafka/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/bin/../libs/audience-annotations-0.5.0.jar:/opt/kafka/bin/../libs/checker-qual-3.5.0.jar:/opt/kafka/bin/../libs/cheetah-kafka-authorizer-1.0-SNAPSHOT.jar:/opt/kafka/bin/../libs/commons-cli-1.4.jar:/opt/kafka/bin/../libs/commons-lang-2.6.jar:/opt/kafka/bin/../libs/commons-lang3-3.8.1.jar:/opt/kafka/bin/../libs/connect-api-3.2.3.jar:/opt/kafka/bin/../libs/connect-basic-auth-extension-3.2.3.jar:/opt/kafka/bin/../libs/connect-json-3.2.3.jar:/opt/kafka/bin/../libs/connect-mirror-3.2.3.jar:/opt/kafka/bin/../libs/connect-mirror-client-3.2.3.jar:/opt/kafka/bin/../libs/connect-runtime-3.2.3.jar:/opt/kafka/bin/../libs/connect-transforms-3.2.3.jar:/opt/kafka/bin/../libs/cruise-control-metrics-reporter-2.5.111.jar:/opt/kafka/bin/../libs/error_prone_annotations-2.3.4.jar:/opt/kafka/bin/../libs/failureaccess-1.0.1.jar:/opt/kafka/bin/../libs/gson-2.9.0.jar:/opt/kafka/bin/../libs/guava-30.1-jre.jar:/opt/kafka/bin/../libs/hk2-api-2.6.1.jar:/opt/kafka/bin/../libs/hk2-locator-2.6.1.jar:/opt/kafka/bin/../libs/hk2-utils-2.6.1.jar:/opt/kafka/bin/../libs/j2objc-annotations-1.3.jar:/opt/kafka/bin/../libs/jackson-annotations-2.13.3.jar:/opt/kafka/bin/../libs/jackson-core-2.13.3.jar:/opt/kafka/bin/../libs/jackson-databind-2.13.3.jar:/opt/kafka/bin/../libs/jackson-dataformat-csv-2.13.3.jar:/opt/kafka/bin/../libs/jackson-dataformat-yaml-2.13.4.jar:/opt/kafka/bin/../libs/jackson-datatype-jdk8-2.13.3.jar:/opt/kafka/bin/../libs/jackson-datatype-jsr310-2.13.4.jar:/opt/kafka/bin/../libs/jackson-jaxrs-base-2.13.3.jar:/opt/kafka/bin/../libs/jackson-jaxrs-json-provider-2.13.3.jar:/opt/kafka/bin/../libs/jackson-module-jaxb-annotations-2.13.3.jar:/opt/kafka/bin/../libs/jackson-module-scala_2.13-2.13.3.jar:/opt/kafka/bin/../libs/jaeger-client-1.8.1.jar:/opt/kafka/bin/../libs/jaeger-core-1.8.1.jar:/opt/kafka/bin/../libs/jaeger-thrift-1.8.1.jar:/opt/kafka/bin/../libs/jaeger-tracerresolver-1.8.1.jar:/opt/kafka/bin/../libs/jakarta.activation-api-1.2.2.jar:/opt/kafka/bin/../libs/jakarta.annotation-api-1.3.5.jar:/opt/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/opt/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/opt/kafka/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/opt/kafka/bin/../libs/jakarta.xml.bind-api-2.3.3.jar:/opt/kafka/bin/../libs/javassist-3.27.0-GA.jar:/opt/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/opt/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/opt/kafka/bin/../libs/jaxb-api-2.3.0.jar:/opt/kafka/bin/../libs/jersey-client-2.34.jar:/opt/kafka/bin/../libs/jersey-common-2.34.jar:/opt/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/opt/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/opt/kafka/bin/../libs/jersey-hk2-2.34.jar:/opt/kafka/bin/../libs/jersey-server-2.34.jar:/opt/kafka/bin/../libs/jetty-client-9.4.48.v20220622.jar:/opt/kafka/bin/../libs/jetty-continuation-9.4.48.v20220622.jar:/opt/kafka/bin/../libs/jetty-http-9.4.48.v20220622.jar:/opt/kafka/bin/../libs/jetty-io-9.4.48.v20220622.jar:/opt/kafka/bin/../libs/jetty-security-9.4.48.v20220622.jar:/opt/kafka/bin/../libs/jetty-server-9.4.48.v20220622.jar:/opt/kafka/bin/../libs/jetty-servlet-9.4.48.v20220622.jar:/opt/kafka/bin/../libs/jetty-servlets-9.4.48.v20220622.jar:/opt/kafka/bin/../libs/jetty-util-9.4.48.v20220622.jar:/opt/kafka/bin/../libs/jetty-util-ajax-9.4.48.v20220622.jar:/opt/kafka/bin/../libs/jline-3.21.0.jar:/opt/kafka/bin/../libs/jmx_prometheus_javaagent-0.17.2.jar:/opt/kafka/bin/../libs/jopt-simple-5.0.4.jar:/opt/kafka/bin/../libs/jose4j-0.7.9.jar:/opt/kafka/bin/../libs/json-path-2.6.0.jar:/opt/kafka/bin/../libs/json-smart-2.4.7.jar:/opt/kafka/bin/../libs/jsonevent-layout-1.7.jar:/opt/kafka/bin/../libs/jsr305-3.0.2.jar:/opt/kafka/bin/../libs/kafka-agent-0.33.2.jar:/opt/kafka/bin/../libs/kafka-clients-3.2.3.jar:/opt/kafka/bin/../libs/kafka-env-var-config-provider-1.1.0.jar:/opt/kafka/bin/../libs/kafka-kubernetes-config-provider-1.1.0.jar:/opt/kafka/bin/../libs/kafka-log4j-appender-3.2.3.jar:/opt/kafka/bin/../libs/kafka-metadata-3.2.3.jar:/opt/kafka/bin/../libs/kafka-oauth-client-0.11.0.jar:/opt/kafka/bin/../libs/kafka-oauth-common-0.11.0.jar:/opt/kafka/bin/../libs/kafka-oauth-keycloak-authorizer-0.11.0.jar:/opt/kafka/bin/../libs/kafka-oauth-server-0.11.0.jar:/opt/kafka/bin/../libs/kafka-oauth-server-plain-0.11.0.jar:/opt/kafka/bin/../libs/kafka-quotas-plugin-0.2.0.jar:/opt/kafka/bin/../libs/kafka-raft-3.2.3.jar:/opt/kafka/bin/../libs/kafka-server-common-3.2.3.jar:/opt/kafka/bin/../libs/kafka-shell-3.2.3.jar:/opt/kafka/bin/../libs/kafka-storage-3.2.3.jar:/opt/kafka/bin/../libs/kafka-storage-api-3.2.3.jar:/opt/kafka/bin/../libs/kafka-streams-3.2.3.jar:/opt/kafka/bin/../libs/kafka-streams-examples-3.2.3.jar:/opt/kafka/bin/../libs/kafka-streams-scala_2.13-3.2.3.jar:/opt/kafka/bin/../libs/kafka-streams-test-utils-3.2.3.jar:/opt/kafka/bin/../libs/kafka-tools-3.2.3.jar:/opt/kafka/bin/../libs/kafka_2.13-3.2.3.jar:/opt/kafka/bin/../libs/kotlin-stdlib-1.4.10.jar:/opt/kafka/bin/../libs/kotlin-stdlib-common-1.4.0.jar:/opt/kafka/bin/../libs/kubernetes-client-6.2.0.jar:/opt/kafka/bin/../libs/kubernetes-client-api-6.2.0.jar:/opt/kafka/bin/../libs/kubernetes-httpclient-okhttp-6.2.0.jar:/opt/kafka/bin/../libs/kubernetes-model-apps-6.2.0.jar:/opt/kafka/bin/../libs/kubernetes-model-batch-6.2.0.jar:/opt/kafka/bin/../libs/kubernetes-model-certificates-6.2.0.jar:/opt/kafka/bin/../libs/kubernetes-model-common-6.2.0.jar:/opt/kafka/bin/../libs/kubernetes-model-core-6.2.0.jar:/opt/kafka/bin/../libs/kubernetes-model-extensions-6.2.0.jar:/opt/kafka/bin/../libs/kubernetes-model-gatewayapi-6.2.0.jar:/opt/kafka/bin/../libs/libthrift-0.15.0.jar:/opt/kafka/bin/../libs/listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar:/opt/kafka/bin/../libs/logging-interceptor-3.12.12.jar:/opt/kafka/bin/../libs/lz4-java-1.8.0.jar:/opt/kafka/bin/../libs/maven-artifact-3.8.4.jar:/opt/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/opt/kafka/bin/../libs/mirror-maker-2-extensions-1.2.0.jar:/opt/kafka/bin/../libs/mirror-maker-agent-0.33.2.jar:/opt/kafka/bin/../libs/netty-buffer-4.1.78.Final.jar:/opt/kafka/bin/../libs/netty-codec-4.1.78.Final.jar:/opt/kafka/bin/../libs/netty-common-4.1.78.Final.jar:/opt/kafka/bin/../libs/netty-handler-4.1.78.Final.jar:/opt/kafka/bin/../libs/netty-resolver-4.1.78.Final.jar:/opt/kafka/bin/../libs/netty-transport-4.1.78.Final.jar:/opt/kafka/bin/../libs/netty-transport-classes-epoll-4.1.78.Final.jar:/opt/kafka/bin/../libs/netty-transport-native-epoll-4.1.78.Final.jar:/opt/kafka/bin/../libs/netty-transport-native-unix-common-4.1.78.Final.jar:/opt/kafka/bin/../libs/nimbus-jose-jwt-9.10.jar:/opt/kafka/bin/../libs/okhttp-4.9.3.jar:/opt/kafka/bin/../libs/okio-2.8.0.jar:/opt/kafka/bin/../libs/opa-authorizer-1.5.0.jar:/opt/kafka/bin/../libs/opentelemetry-api-1.18.0.jar:/opt/kafka/bin/../libs/opentelemetry-context-1.18.0.jar:/opt/kafka/bin/../libs/opentelemetry-exporter-common-1.18.0.jar:/opt/kafka/bin/../libs/opentelemetry-exporter-otlp-1.18.0.jar:/opt/kafka/bin/../libs/opentelemetry-exporter-otlp-common-1.18.0.jar:/opt/kafka/bin/../libs/opentelemetry-instrumentation-api-1.18.0.jar:/opt/kafka/bin/../libs/opentelemetry-instrumentation-api-semconv-1.18.0-alpha.jar:/opt/kafka/bin/../libs/opentelemetry-kafka-clients-2.6-1.18.0-alpha.jar:/opt/kafka/bin/../libs/opentelemetry-kafka-clients-common-1.18.0-alpha.jar:/opt/kafka/bin/../libs/opentelemetry-sdk-1.18.0.jar:/opt/kafka/bin/../libs/opentelemetry-sdk-common-1.18.0.jar:/opt/kafka/bin/../libs/opentelemetry-sdk-extension-autoconfigure-1.18.0-alpha.jar:/opt/kafka/bin/../libs/opentelemetry-sdk-extension-autoconfigure-spi-1.18.0.jar:/opt/kafka/bin/../libs/opentelemetry-sdk-logs-1.18.0-alpha.jar:/opt/kafka/bin/../libs/opentelemetry-sdk-metrics-1.18.0.jar:/opt/kafka/bin/../libs/opentelemetry-sdk-trace-1.18.0.jar:/opt/kafka/bin/../libs/opentelemetry-semconv-1.18.0-alpha.jar:/opt/kafka/bin/../libs/opentracing-api-0.33.0.jar:/opt/kafka/bin/../libs/opentracing-kafka-client-0.1.15.jar:/opt/kafka/bin/../libs/opentracing-noop-0.33.0.jar:/opt/kafka/bin/../libs/opentracing-tracerresolver-0.1.8.jar:/opt/kafka/bin/../libs/opentracing-util-0.33.0.jar:/opt/kafka/bin/../libs/osgi-resource-locator-1.0.3.jar:/opt/kafka/bin/../libs/paranamer-2.8.jar:/opt/kafka/bin/../libs/plexus-utils-3.3.0.jar:/opt/kafka/bin/../libs/reflections-0.9.12.jar:/opt/kafka/bin/../libs/reload4j-1.2.19.jar:/opt/kafka/bin/../libs/rocksdbjni-6.29.4.1.jar:/opt/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/opt/kafka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/opt/kafka/bin/../libs/scala-library-2.13.8.jar:/opt/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/opt/kafka/bin/../libs/scala-reflect-2.13.8.jar:/opt/kafka/bin/../libs/slf4j-api-1.7.30.jar:/opt/kafka/bin/../libs/slf4j-api-1.7.36.jar:/opt/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/opt/kafka/bin/../libs/snakeyaml-1.33.jar:/opt/kafka/bin/../libs/snappy-java-1.1.8.4.jar:/opt/kafka/bin/../libs/tracing-agent-0.33.2.jar:/opt/kafka/bin/../libs/trogdor-3.2.3.jar:/opt/kafka/bin/../libs/zjsonpatch-0.3.0.jar:/opt/kafka/bin/../libs/zookeeper-3.6.3.jar:/opt/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/opt/kafka/bin/../libs/zstd-jni-1.5.2-1.jar (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,244] INFO Client environment:java.library.path=/usr/java/packages/lib:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,245] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,245] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,245] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,245] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,245] INFO Client environment:os.version=5.15.90.1-microsoft-standard-WSL2 (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,246] INFO Client environment:user.name=kafka (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,246] INFO Client environment:user.home=/home/kafka (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,246] INFO Client environment:user.dir=/opt/kafka (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,246] INFO Client environment:os.memory.free=984MB (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,246] INFO Client environment:os.memory.max=1024MB (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,246] INFO Client environment:os.memory.total=1024MB (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,250] INFO Initiating client connection, connectString=zookeeper:2181 sessionTimeout=18000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@2474f125 (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:00,256] INFO jute.maxbuffer value is 4194304 Bytes (org.apache.zookeeper.ClientCnxnSocket)
 [2023-03-20 14:03:00,262] INFO zookeeper.request.timeout value is 0. feature enabled=false (org.apache.zookeeper.ClientCnxn)
 [2023-03-20 14:03:00,265] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
 [2023-03-20 14:03:00,267] INFO Opening socket connection to server zookeeper/192.168.144.4:2181. (org.apache.zookeeper.ClientCnxn)
 [2023-03-20 14:03:00,271] INFO Socket connection established, initiating session, client: /192.168.144.5:50084, server: zookeeper/192.168.144.4:2181 (org.apache.zookeeper.ClientCnxn)
 [2023-03-20 14:03:00,299] INFO Session establishment complete on server zookeeper/192.168.144.4:2181, session id = 0x10009ec2fc30000, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
 [2023-03-20 14:03:00,303] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
 [2023-03-20 14:03:00,441] INFO [feature-zk-node-event-process-thread]: Starting (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
 [2023-03-20 14:03:00,453] INFO Feature ZK node at path: /feature does not exist (kafka.server.FinalizedFeatureChangeListener)
 [2023-03-20 14:03:00,454] INFO Cleared cache (kafka.server.FinalizedFeatureCache)
 [2023-03-20 14:03:00,593] INFO Cluster ID = 3ZMUl9KMToaULTRCrmJRaQ (kafka.server.KafkaServer)
 [2023-03-20 14:03:00,596] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
 [2023-03-20 14:03:00,645] INFO KafkaConfig values: 
       advertised.listeners = CLIENT://kafka:29092,INTER://kafka:19092,EXTERNAL://127.0.0.1:9092
       alter.config.policy.class.name = null
       alter.log.dirs.replication.quota.window.num = 11
       alter.log.dirs.replication.quota.window.size.seconds = 1
       authorizer.class.name =
       auto.create.topics.enable = true
       auto.leader.rebalance.enable = true
       background.threads = 10
       broker.heartbeat.interval.ms = 2000
       broker.id = 1
       broker.id.generation.enable = true
       broker.rack = null
       broker.session.timeout.ms = 9000
       client.quota.callback.class = null
       compression.type = producer
       connection.failed.authentication.delay.ms = 100
       connections.max.idle.ms = 600000
       connections.max.reauth.ms = 0
       control.plane.listener.name = null
       controlled.shutdown.enable = true
       controlled.shutdown.max.retries = 3
       controlled.shutdown.retry.backoff.ms = 5000
       controller.listener.names = null
       controller.quorum.append.linger.ms = 25
       controller.quorum.election.backoff.max.ms = 1000
       controller.quorum.election.timeout.ms = 1000
       controller.quorum.fetch.timeout.ms = 2000
       controller.quorum.request.timeout.ms = 2000
       controller.quorum.retry.backoff.ms = 20
       controller.quorum.voters = []
       controller.quota.window.num = 11
       controller.quota.window.size.seconds = 1
       controller.socket.timeout.ms = 30000
       create.topic.policy.class.name = null
       default.replication.factor = 1
       delegation.token.expiry.check.interval.ms = 3600000
       delegation.token.expiry.time.ms = 86400000
       delegation.token.master.key = null
       delegation.token.max.lifetime.ms = 604800000
       delegation.token.secret.key = null
       delete.records.purgatory.purge.interval.requests = 1
       delete.topic.enable = true
       fetch.max.bytes = 57671680
       fetch.purgatory.purge.interval.requests = 1000
       group.initial.rebalance.delay.ms = 3000
       group.max.session.timeout.ms = 1800000
       group.max.size = 2147483647
       group.min.session.timeout.ms = 6000
       initial.broker.registration.timeout.ms = 60000
       inter.broker.listener.name = INTER
       inter.broker.protocol.version = 3.2
       kafka.metrics.polling.interval.secs = 10
       kafka.metrics.reporters = []
       leader.imbalance.check.interval.seconds = 300
       leader.imbalance.per.broker.percentage = 10
       listener.security.protocol.map = CLIENT:SASL_PLAINTEXT,INTER:PLAINTEXT,EXTERNAL:PLAINTEXT
       listeners = CLIENT://kafka:29092,INTER://kafka:19092,EXTERNAL://0.0.0.0:9092
       log.cleaner.backoff.ms = 15000
       log.cleaner.dedupe.buffer.size = 134217728
       log.cleaner.delete.retention.ms = 86400000
       log.cleaner.enable = true
       log.cleaner.io.buffer.load.factor = 0.9
       log.cleaner.io.buffer.size = 524288
       log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
       log.cleaner.max.compaction.lag.ms = 9223372036854775807
       log.cleaner.min.cleanable.ratio = 0.5
       log.cleaner.min.compaction.lag.ms = 0
       log.cleaner.threads = 1
       log.cleanup.policy = [delete]
       log.dir = /tmp/kafka-logs
       log.dirs = null
       log.flush.interval.messages = 9223372036854775807
       log.flush.interval.ms = null
       log.flush.offset.checkpoint.interval.ms = 60000
       log.flush.scheduler.interval.ms = 9223372036854775807
       log.flush.start.offset.checkpoint.interval.ms = 60000
       log.index.interval.bytes = 4096
       log.index.size.max.bytes = 10485760
       log.message.downconversion.enable = true
       log.message.format.version = 3.0-IV1
       log.message.timestamp.difference.max.ms = 9223372036854775807
       log.message.timestamp.type = CreateTime
       log.preallocate = false
       log.retention.bytes = -1
       log.retention.check.interval.ms = 300000
       log.retention.hours = 168
       log.retention.minutes = null
       log.retention.ms = null
       log.roll.hours = 168
       log.roll.jitter.hours = 0
       log.roll.jitter.ms = null
       log.roll.ms = null
       log.segment.bytes = 1073741824
       log.segment.delete.delay.ms = 60000
       max.connection.creation.rate = 2147483647
       max.connections = 2147483647
       max.connections.per.ip = 2147483647
       max.connections.per.ip.overrides =
       max.incremental.fetch.session.cache.slots = 1000
       message.max.bytes = 1048588
       metadata.log.dir = null
       metadata.log.max.record.bytes.between.snapshots = 20971520
       metadata.log.segment.bytes = 1073741824
       metadata.log.segment.min.bytes = 8388608
       metadata.log.segment.ms = 604800000
       metadata.max.retention.bytes = -1
       metadata.max.retention.ms = 604800000
       metric.reporters = []
       metrics.num.samples = 2
       metrics.recording.level = INFO
       metrics.sample.window.ms = 30000
       min.insync.replicas = 1
       node.id = 1
       num.io.threads = 8
       num.network.threads = 3
       num.partitions = 1
       num.recovery.threads.per.data.dir = 1
       num.replica.alter.log.dirs.threads = null
       num.replica.fetchers = 1
       offset.metadata.max.bytes = 4096
       offsets.commit.required.acks = -1
       offsets.commit.timeout.ms = 5000
       offsets.load.buffer.size = 5242880
       offsets.retention.check.interval.ms = 600000
       offsets.retention.minutes = 10080
       offsets.topic.compression.codec = 0
       offsets.topic.num.partitions = 50
       offsets.topic.replication.factor = 1
       offsets.topic.segment.bytes = 104857600
       password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
       password.encoder.iterations = 4096
       password.encoder.key.length = 128
       password.encoder.keyfactory.algorithm = null
       password.encoder.old.secret = null
       password.encoder.secret = null
       principal.builder.class = class org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
       process.roles = []
       producer.purgatory.purge.interval.requests = 1000
       queued.max.request.bytes = -1
       queued.max.requests = 500
       quota.window.num = 11
       quota.window.size.seconds = 1
       remote.log.index.file.cache.total.size.bytes = 1073741824
       remote.log.manager.task.interval.ms = 30000
       remote.log.manager.task.retry.backoff.max.ms = 30000
       remote.log.manager.task.retry.backoff.ms = 500
       remote.log.manager.task.retry.jitter = 0.2
       remote.log.manager.thread.pool.size = 10
       remote.log.metadata.manager.class.name = null
       remote.log.metadata.manager.class.path = null
       remote.log.metadata.manager.impl.prefix = null
       remote.log.metadata.manager.listener.name = null
       remote.log.reader.max.pending.tasks = 100
       remote.log.reader.threads = 10
       remote.log.storage.manager.class.name = null
       remote.log.storage.manager.class.path = null
       remote.log.storage.manager.impl.prefix = null
       remote.log.storage.system.enable = false
       replica.fetch.backoff.ms = 1000
       replica.fetch.max.bytes = 1048576
       replica.fetch.min.bytes = 1
       replica.fetch.response.max.bytes = 10485760
       replica.fetch.wait.max.ms = 500
       replica.high.watermark.checkpoint.interval.ms = 5000
       replica.lag.time.max.ms = 30000
       replica.selector.class = null
       replica.socket.receive.buffer.bytes = 65536
       replica.socket.timeout.ms = 30000
       replication.quota.window.num = 11
       replication.quota.window.size.seconds = 1
       request.timeout.ms = 30000
       reserved.broker.max.id = 1000
       sasl.client.callback.handler.class = null
       sasl.enabled.mechanisms = [OAUTHBEARER]
       sasl.jaas.config = null
       sasl.kerberos.kinit.cmd = /usr/bin/kinit
       sasl.kerberos.min.time.before.relogin = 60000
       sasl.kerberos.principal.to.local.rules = [DEFAULT]
       sasl.kerberos.service.name = null
       sasl.kerberos.ticket.renew.jitter = 0.05
       sasl.kerberos.ticket.renew.window.factor = 0.8
       sasl.login.callback.handler.class = null
       sasl.login.class = null
       sasl.login.connect.timeout.ms = null
       sasl.login.read.timeout.ms = null
       sasl.login.refresh.buffer.seconds = 300
       sasl.login.refresh.min.period.seconds = 60
       sasl.login.refresh.window.factor = 0.8
       sasl.login.refresh.window.jitter = 0.05
       sasl.login.retry.backoff.max.ms = 10000
       sasl.login.retry.backoff.ms = 100
       sasl.mechanism.controller.protocol = GSSAPI
       sasl.mechanism.inter.broker.protocol = GSSAPI
       sasl.oauthbearer.clock.skew.seconds = 30
       sasl.oauthbearer.expected.audience = null
       sasl.oauthbearer.expected.issuer = null
       sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
       sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
       sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
       sasl.oauthbearer.jwks.endpoint.url = null
       sasl.oauthbearer.scope.claim.name = scope
       sasl.oauthbearer.sub.claim.name = sub
       sasl.oauthbearer.token.endpoint.url = null
       sasl.server.callback.handler.class = null
       sasl.server.max.receive.size = 524288
       security.inter.broker.protocol = PLAINTEXT
       security.providers = null
       socket.connection.setup.timeout.max.ms = 30000
       socket.connection.setup.timeout.ms = 10000
       socket.listen.backlog.size = 50
       socket.receive.buffer.bytes = 102400
       socket.request.max.bytes = 104857600
       socket.send.buffer.bytes = 102400
       ssl.cipher.suites = []
       ssl.client.auth = none
       ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
       ssl.endpoint.identification.algorithm = https
       ssl.engine.factory.class = null
       ssl.key.password = null
       ssl.keymanager.algorithm = SunX509
       ssl.keystore.certificate.chain = null
       ssl.keystore.key = null
       ssl.keystore.location = null
       ssl.keystore.password = null
       ssl.keystore.type = JKS
       ssl.principal.mapping.rules = DEFAULT
       ssl.protocol = TLSv1.3
       ssl.provider = null
       ssl.secure.random.implementation = null
       ssl.trustmanager.algorithm = PKIX
       ssl.truststore.certificates = null
       ssl.truststore.location = null
       ssl.truststore.password = null
       ssl.truststore.type = JKS
       transaction.abort.timed.out.transaction.cleanup.interval.ms = 10000
       transaction.max.timeout.ms = 900000
       transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
       transaction.state.log.load.buffer.size = 5242880
       transaction.state.log.min.isr = 2
       transaction.state.log.num.partitions = 50
       transaction.state.log.replication.factor = 1
       transaction.state.log.segment.bytes = 104857600
       transactional.id.expiration.ms = 604800000
       unclean.leader.election.enable = false
       zookeeper.clientCnxnSocket = null
       zookeeper.connect = zookeeper:2181
       zookeeper.connection.timeout.ms = null
       zookeeper.max.in.flight.requests = 10
       zookeeper.session.timeout.ms = 18000
       zookeeper.set.acl = false
       zookeeper.ssl.cipher.suites = null
       zookeeper.ssl.client.enable = false
       zookeeper.ssl.crl.enable = false
       zookeeper.ssl.enabled.protocols = null
       zookeeper.ssl.endpoint.identification.algorithm = HTTPS
       zookeeper.ssl.keystore.location = null
       zookeeper.ssl.keystore.password = null
       zookeeper.ssl.keystore.type = null
       zookeeper.ssl.ocsp.enable = false
       zookeeper.ssl.protocol = TLSv1.2
       zookeeper.ssl.truststore.location = null
       zookeeper.ssl.truststore.password = null
       zookeeper.ssl.truststore.type = null
  (kafka.server.KafkaConfig)
 [2023-03-20 14:03:00,705] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
 [2023-03-20 14:03:00,706] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
 [2023-03-20 14:03:00,707] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
 [2023-03-20 14:03:00,709] INFO [ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
 [2023-03-20 14:03:00,730] INFO Log directory /tmp/kafka-logs not found, creating it. (kafka.log.LogManager)
 [2023-03-20 14:03:00,756] INFO Loading logs from log dirs ArraySeq(/tmp/kafka-logs) (kafka.log.LogManager)
 [2023-03-20 14:03:00,760] INFO Attempting recovery for all logs in /tmp/kafka-logs since no clean shutdown file was found (kafka.log.LogManager)
 [2023-03-20 14:03:00,769] INFO Loaded 0 logs in 13ms. (kafka.log.LogManager)
 [2023-03-20 14:03:00,770] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
 [2023-03-20 14:03:00,773] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
 [2023-03-20 14:03:01,054] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Starting (kafka.server.BrokerToControllerRequestThread)
 [2023-03-20 14:03:01,201] INFO Updated connection-accept-rate max connection creation rate to 2147483647 (kafka.network.ConnectionQuotas)
 [2023-03-20 14:03:01,205] INFO Awaiting socket connections on kafka:29092. (kafka.network.DataPlaneAcceptor)
 [2023-03-20 14:03:01,289] INFO JWKS keys change detected. Keys updated. (io.strimzi.kafka.oauth.validator.JWTSignatureValidator)
 [2023-03-20 14:03:01,330] ERROR No principal name in JWT claim: sub (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)
 java.io.IOException: No principal name in JWT claim: sub
       at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:165)
       at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
       at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
       at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
       at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:679)
       at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:677)
       at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
       at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:677)
       at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
       at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
       at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
       at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
       at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)
       at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
       at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107)
       at kafka.network.Processor.<init>(SocketServer.scala:1008)
       at kafka.network.Acceptor.newProcessor(SocketServer.scala:921)
       at kafka.network.Acceptor.$anonfun$addProcessors$1(SocketServer.scala:894)
       at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
       at kafka.network.Acceptor.addProcessors(SocketServer.scala:893)
       at kafka.network.DataPlaneAcceptor.configure(SocketServer.scala:600)
       at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:269)
       at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:261)
       at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
       at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
       at scala.collection.AbstractIterable.foreach(Iterable.scala:926)
       at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:261)
       at kafka.network.SocketServer.startup(SocketServer.scala:135)
       at kafka.server.KafkaServer.startup(KafkaServer.scala:309)
       at kafka.Kafka$.main(Kafka.scala:109)
       at kafka.Kafka.main(Kafka.scala)
 Caused by: org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerConfigException: No principal name in JWT claim: sub
       at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handleTokenCallback(OAuthBearerUnsecuredLoginCallbackHandler.java:219)
       at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:163)
       ... 30 more
 Caused by: org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerIllegalTokenException: No principal name in JWT claim: sub
       at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws.<init>(OAuthBearerUnsecuredJws.java:109)
       at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handleTokenCallback(OAuthBearerUnsecuredLoginCallbackHandler.java:211)
       ... 31 more
 [2023-03-20 14:03:01,348] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
 org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: An internal error occurred while retrieving token from callback handler
       at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
       at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
       at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107)
       at kafka.network.Processor.<init>(SocketServer.scala:1008)
       at kafka.network.Acceptor.newProcessor(SocketServer.scala:921)
       at kafka.network.Acceptor.$anonfun$addProcessors$1(SocketServer.scala:894)
       at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
       at kafka.network.Acceptor.addProcessors(SocketServer.scala:893)
       at kafka.network.DataPlaneAcceptor.configure(SocketServer.scala:600)
       at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:269)
       at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:261)
       at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
       at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
       at scala.collection.AbstractIterable.foreach(Iterable.scala:926)
       at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:261)
       at kafka.network.SocketServer.startup(SocketServer.scala:135)
       at kafka.server.KafkaServer.startup(KafkaServer.scala:309)
       at kafka.Kafka$.main(Kafka.scala:109)
       at kafka.Kafka.main(Kafka.scala)
 Caused by: javax.security.auth.login.LoginException: An internal error occurred while retrieving token from callback handler
       at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:319)
       at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
       at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
       at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:679)
       at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:677)
       at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
       at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:677)
       at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
       at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
       at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62)
       at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
       at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)
       ... 18 more
 [2023-03-20 14:03:01,349] INFO [KafkaServer id=1] shutting down (kafka.server.KafkaServer)
 [2023-03-20 14:03:01,352] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Stopping socket server request processors (kafka.network.SocketServer)
 [2023-03-20 14:03:01,355] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Stopped socket server request processors (kafka.network.SocketServer)
 [2023-03-20 14:03:01,361] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Shutting down (kafka.server.BrokerToControllerRequestThread)
 [2023-03-20 14:03:01,362] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Stopped (kafka.server.BrokerToControllerRequestThread)
 [2023-03-20 14:03:01,362] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Shutdown completed (kafka.server.BrokerToControllerRequestThread)
 [2023-03-20 14:03:01,368] INFO Broker to controller channel manager for forwarding shutdown (kafka.server.BrokerToControllerChannelManagerImpl)
 [2023-03-20 14:03:01,369] INFO Shutting down. (kafka.log.LogManager)
 [2023-03-20 14:03:01,405] INFO Shutdown complete. (kafka.log.LogManager)
 [2023-03-20 14:03:01,405] INFO [feature-zk-node-event-process-thread]: Shutting down (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
 [2023-03-20 14:03:01,406] INFO [feature-zk-node-event-process-thread]: Stopped (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
 [2023-03-20 14:03:01,406] INFO [feature-zk-node-event-process-thread]: Shutdown completed (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
 [2023-03-20 14:03:01,407] INFO [ZooKeeperClient Kafka server] Closing. (kafka.zookeeper.ZooKeeperClient)
 [2023-03-20 14:03:01,515] INFO Session: 0x10009ec2fc30000 closed (org.apache.zookeeper.ZooKeeper)
 [2023-03-20 14:03:01,515] INFO EventThread shut down for session: 0x10009ec2fc30000 (org.apache.zookeeper.ClientCnxn)
 [2023-03-20 14:03:01,517] INFO [ZooKeeperClient Kafka server] Closed. (kafka.zookeeper.ZooKeeperClient)
 [2023-03-20 14:03:01,517] INFO [ThrottledChannelReaper-Fetch]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
 [2023-03-20 14:03:01,519] INFO [ThrottledChannelReaper-Fetch]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
 [2023-03-20 14:03:01,519] INFO [ThrottledChannelReaper-Fetch]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
 [2023-03-20 14:03:01,519] INFO [ThrottledChannelReaper-Produce]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
 [2023-03-20 14:03:01,519] INFO [ThrottledChannelReaper-Produce]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
 [2023-03-20 14:03:01,519] INFO [ThrottledChannelReaper-Produce]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
 [2023-03-20 14:03:01,520] INFO [ThrottledChannelReaper-Request]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
 [2023-03-20 14:03:01,520] INFO [ThrottledChannelReaper-Request]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
 [2023-03-20 14:03:01,520] INFO [ThrottledChannelReaper-Request]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
 [2023-03-20 14:03:01,520] INFO [ThrottledChannelReaper-ControllerMutation]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
 [2023-03-20 14:03:01,521] INFO [ThrottledChannelReaper-ControllerMutation]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
 [2023-03-20 14:03:01,521] INFO [ThrottledChannelReaper-ControllerMutation]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
 [2023-03-20 14:03:01,522] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Shutting down socket server (kafka.network.SocketServer)
 [2023-03-20 14:03:01,554] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Shutdown completed (kafka.network.SocketServer)
 [2023-03-20 14:03:01,555] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
 [2023-03-20 14:03:01,555] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
 [2023-03-20 14:03:01,555] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
 [2023-03-20 14:03:01,558] INFO Broker and topic stats closed (kafka.server.BrokerTopicStats)
 [2023-03-20 14:03:01,563] INFO App info kafka.server for 1 unregistered (org.apache.kafka.common.utils.AppInfoParser)
 [2023-03-20 14:03:01,564] INFO [KafkaServer id=1] shut down completed (kafka.server.KafkaServer)
 [2023-03-20 14:03:01,564] ERROR Exiting Kafka. (kafka.Kafka$)
 [2023-03-20 14:03:01,565] INFO [KafkaServer id=1] shutting down (kafka.server.KafkaServer)
 exited with code 1

However it works with this server.properties:

listeners=CLIENT://kafka:29092,INTER://kafka:19092,EXTERNAL://0.0.0.0:9092
advertised.listeners=CLIENT://kafka:29092,INTER://kafka:19092,EXTERNAL://127.0.0.1:9092
zookeeper.connect=zookeeper:2181
inter.broker.listener.name=INTER
listener.security.protocol.map=CLIENT:SASL_PLAINTEXT,INTER:PLAINTEXT,EXTERNAL:PLAINTEXT
sasl.enabled.mechanisms=OAUTHBEARER

listener.name.client.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    oauth.client.id="server" \
    oauth.client.secret="k9O3Gvc3G7mxt0giQ5170wxDIqXStOfT" \
    oauth.token.endpoint.uri="http://keycloak:8080/auth/realms/Test/protocol/openid-connect/token" \
    oauth.check.issuer="false" \
    oauth.jwks.endpoint.uri="http://keycloak:8080/auth/realms/master/protocol/openid-connect/certs" \
    oauth.username.claim="preferred_username";
listener.name.client.oauthbearer.sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler

listener.name.client.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler

broker.id=1
min.insync.replicas=1
default.replication.factor=1
transaction.state.log.replication.factor=1
offsets.topic.replication.factor=1
inter.broker.protocol.version=3.2
scholzj commented 1 year ago

@mstruk Any ideas?

mstruk commented 1 year ago

This has nothing to do with interbroker communication (the INTER listener), and everything to do with how OAuth is bootstrapped through the default OAUTHBEARER callback handler mechanism (the CLIENT listener).

Try to add:

unsecuredLoginStringClaim_sub="unused"

to listener.name.client.oauthbearer.sasl.jaas.config, and remove the listener.name.client.oauthbearer.sasl.login.callback.handler.class line.

The thing is that if you add listener.name.client.oauthbearer.sasl.login.callback.handler.class the specified callback handler will be instantiated and configured. But in this case you don't need it, and when you add it, it performs some configuration validation - it requires some config. But if you don't add it, then the default org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler is automatically used, and that one requires some other config of its own, and performs config validation for that. The unsecuredLoginStringClaim_sub should be enough to make it happy, and get rid of the startup problem during CLIENT listener initialisation.

Kafka communicates these errors rather poorly.

pegtrifork commented 1 year ago

Thanks, it worked! Seems like an odd way of configuring this tho.