Closed mroiter-larus closed 1 week ago
@scholzj any suggestions please?
Sorry, I don't know much about OAuth. Plus Confluent platform is a fork of Apache Kafka and I have zero experience with it. So who knows if it even works there.
@mstruk Do you have any idea about this? You might at least understand the OAuth part.
The provided info does not point to a definite cause. It could be listeners misconfiguration, it could be Keycloak realm misconfiguration ... In principle OAuth should work fine with Confluent platform as it does not care about a distribution itself only that you use the upstream-compatible Kafka.
@mroiter-larus You should really turn on DEBUG logging for logger io.strimzi
and provide the full output of that log. Also, it would help to see the content of server.properties
. The exception that you get points to the interbroker channel failure - it seems that you configured your Kafka so that the interbroker communication also uses the OAUTHBEARER protected listener and the client side of it tries to authenticate. The way you configured token validation it seems to expect to extract the user id from the JWT token's sub
claim but it seems there is no such claim in the token that you get from the Keycloak.
It's as if your OAUTH_USERNAME_CLAIM is ignored or the Keycloak's access token uses a different claim name - for example username
rather than preferred_username
. You should inspect the JWT token you get from the Keycloak.
@scholzj @mstruk Thanks for pointing me to the Kafka listeners. That was misconfigured. I'm finally be able to complete the authentication process using the following configuration:
KAFKA_LOG4J_ROOT_LOGLEVEL: 'DEBUG'
KAFKA_LOG4J_LOGGERS: 'kafka=WARN,kafka.controller=WARN,kafka.log.LogCleaner=WARN,state.change.logger=WARN,kafka.producer.async.DefaultEventHandler=WARN,log4j.logger.io.strimzi=DEBUG,log4j.logger.io.strimzi.kafka.oauth=DEBUG'
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OAUTH://kafka:10092,SSL://kafka:11092,CLEAR://kafka:12092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:SASL_PLAINTEXT,OAUTH:SASL_SSL,SSL:SSL,CLEAR:PLAINTEXT
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: OAUTHBEARER
KAFKA_SASL_ENABLED_MECHANISMS: OAUTHBEARER
KAFKA_INTER_BROKER_LISTENER_NAME: OAUTH
KAFKA_LISTENER_NAME_OAUTH_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
KAFKA_LISTENER_NAME_OAUTH_OAUTHBEARER_SASL_LOGIN_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/broker_jaas.conf
and then pointing to a static JAAS config file:
KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
oauth.client.id="kafkaBroker"
oauth.client.secret="****"
oauth.token.endpoint.uri="https://<keycloak_host>:9553/realms/KafkaCluster/protocol/openid-connect/token"
oauth.valid.issuer.uri="https://<keycloak_host>:9553/realms/KafkaCluster"
oauth.jwks.endpoint.uri="https://<keycloak_host>:9553/realms/KafkaCluster/protocol/openid-connect/certs"
oauth.username.claim="preferred_username"
unsecuredLoginStringClaim_sub="unused";
};
KafkaClient {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
oauth.client.id="kafkaBroker"
oauth.client.secret="****"
oauth.token.endpoint.uri="https://<keycloak_host>:9553/realms/KafkaCluster/protocol/openid-connect/token"
oauth.jwks.endpoint.uri="https://<keycloak_host>:9553/realms/KafkaCluster/protocol/openid-connect/certs"
oauth.username.claim="preferred_username";
};
As you can see from the Kafka broker configuration part, i had already enabled the DEBUG logs for io.strimzi
package and now i can finally see the authentication has been established successfully:
kafka | [2024-11-20 11:01:32,053] DEBUG Access token expires at (UTC): 2024-11-20T10:06:29 (io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler)
kafka | [2024-11-20 11:01:32,057] DEBUG User validated (Principal:service-account-kafkabroker) (io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler)
kafka | [2024-11-20 11:01:32,059] INFO ### CHECK LOG LEVEL 'true' ### (io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler)
kafka | [2024-11-20 11:01:32,070] DEBUG Set validated token on callback: BearerTokenWithPayloadImpl (principalName: service-account-kafkabroker, groups: null, lifetimeMs: 1732097189000 [2024-11-20T10:06:29 UTC], startTimeMs: 1732096889000 [2024-11-20T10:01:29 UTC], scope: [openid, profile, email], payload: {"exp":1732097189,"iat":1732096889,"jti":"aab24feb-4e9a-41c0-a87c-0a9201bc80fa","iss":"https://<keycloak_host>:9553/realms/KafkaCluster","aud":["kafkaBroker","account"],"sub":"51ebb679-4b74-4790-abd2-5f3577aeda70","typ":"Bearer","azp":"kafkaBroker","acr":"1","allowed-origins":["/*"],"realm_access":{"roles":["offline_access","uma_authorization","default-roles-kafkacluster"]},"resource_access":{"kafkaBroker":{"roles":["uma_protection"]},"account":{"roles":["manage-account","manage-account-links","view-profile"]}},"scope":"profile email openid","clientHost":"172.18.0.1","email_verified":false,"preferred_username":"service-account-kafkabroker","clientAddress":"172.18.0.1","client_id":"kafkaBroker"}, sessionId: 1669634415) (io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler)
kafka | [2024-11-20 11:01:32,071] DEBUG Successfully authenticate User=service-account-kafkabroker (org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslServer)
kafka | [2024-11-20 11:01:32,072] DEBUG Authentication complete; session max lifetime from broker config=3600000 ms, credential expiration=Wed Nov 20 11:06:29 CET 2024 (296929 ms); session expiration = Wed Nov 20 11:06:29 CET 2024 (296929 ms), sending 296929 ms to client (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
Thanks for your support!
Hi @mstruk ,
i'm trying to use the latest version (0.15.0) of the library in order to setup Kafka authentication via OAUTHBEARER protcol (over SASL_SSL with self-signed certificates). Starting from the Confluent Kafka community edition, i customized the original Docker image importing the strimzi-kafka-oauth JARs as mentioned in the documentation.
On the Keycloak side i've setup a
kafkaBroker
client as service account roles only:with the following client scopes:
Following a snippet of the docker-compose i've used to setup Kafka security:
I've added some custom logs in the library in order to debug the authentication phase and here is what i can see in the Kafka logs:
so it seems the login callback handler is detected correctly. In fact in the following logs i can see the login has been done successfully:
But, after the above successful login, the following happens:
As you can see here the login callback handler is
OAuthBearerUnsecuredLoginCallbackHandler
and not the one i had setup (JaasClientOauthLoginCallbackHandler
). I can't understand why this is happening.After that, Kafka startup process ends with the following error:
Am i missing something in the configuration?? Any suggestions would be really appreciated?? If you need further details about the configuration please let me know.
Thanks in advance!
Mauro