Apicurio / apicurio-registry

An API/Schema registry - stores APIs and Schemas.
https://www.apicur.io/registry/
Apache License 2.0
609 stars 269 forks source link

Use SASL/PLAIN to log in to KafkaSQL storage backend #4893

Open JoschaLaubach opened 4 months ago

JoschaLaubach commented 4 months ago

Description

Registry Version: 2.5.8.Final Persistence type: kafkasql

We're trying to use our Kafka cluster as a persistence backend. Login to the cluster requires SASL/PLAIN via SSL. We try to set everything via env vars in the Pod that apicurio-registry is running in.

Environment

Running on Kubernetes 1.29.4

Steps to Reproduce

We set the following env vars:

    - name: ENABLE_KAFKA_SASL
      value: "true"
    - name: REGISTRY_KAFKASQL_TOPIC
      value: "apicurio-registry-kafkasql-journal"
    - name: REGISTRY_KAFKASQL_TOPIC_AUTO_CREATE
      value: "false"
    - name: REGISTRY_KAFKA_COMMON_SECURITY_PROTOCOL
      value: "SASL_SSL"
    - name: REGISTRY_KAFKA_COMMON_SASL_JAAS_CONFIG
      value: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"apicurio-registry\" password=\"secret\";"
    - name: REGISTRY_KAFKA_COMMON_SASL_MECHANISM
      value: "PLAIN"
    - name: KAFKA_SSL_TRUSTSTORE_TYPE
      value: "PKCS12"
    - name: KAFKA_SSL_TRUSTSTORE_LOCATION
      value: "/etc/kafka/secrets/truststore.p12"
    - name: KAFKA_SSL_TRUSTSTORE_PASSWORD
      value: "secret"

We also tried to unset the JaasClientOauthLoginCallbackHandler, but it didn't work:

    - name: REGISTRY_KAFKA_COMMON_SASL_LOGIN_CALLBACK_HANDLER_CLASS
      value: null

Expected vs Actual Behaviour

Login via SASL/PLAIN to Kafka cluster works, but instead we get an error. It seems the defaults here are set to use OAUTHBEARER, but our cluster doesn't use that. At least the CallbackHandler is still being used. We get the error Unexpected SASL mechanism: PLAIN.

Logs

2024-07-16 12:51:33 INFO <> [org.apache.kafka.common.config.AbstractConfig] (KSQL Kafka Consumer Thread) ConsumerConfig values: 
        allow.auto.create.topics = true
        auto.commit.interval.ms = 1000
        auto.include.jmx.reporter = true
        auto.offset.reset = earliest
        bootstrap.servers = [kafka:9093]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-apicurio-registry-044dba07-5454-46cf-b9d9-b4fbc5d4f3cc-2
        client.rack = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = apicurio-registry-044dba07-5454-46cf-b9d9-b4fbc5d4f3cc
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class io.apicurio.registry.storage.impl.kafkasql.serde.KafkaSqlKeyDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = [hidden]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = class io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
        sasl.login.class = null
        sasl.login.connect.timeout.ms = null
        sasl.login.read.timeout.ms = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.login.retry.backoff.max.ms = 10000
        sasl.login.retry.backoff.ms = 100
        sasl.mechanism = PLAIN
        sasl.oauthbearer.clock.skew.seconds = 30
        sasl.oauthbearer.expected.audience = null
        sasl.oauthbearer.expected.issuer = null
        sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
        sasl.oauthbearer.jwks.endpoint.url = null
        sasl.oauthbearer.scope.claim.name = scope
        sasl.oauthbearer.sub.claim.name = sub
        sasl.oauthbearer.token.endpoint.url = null
        security.protocol = SASL_SSL
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 45000
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = /etc/kafka/secrets/truststore.p12
        ssl.truststore.password = [hidden]
        ssl.truststore.type = PKCS12
        value.deserializer = class io.apicurio.registry.storage.impl.kafkasql.serde.KafkaSqlValueDeserializer

2024-07-16 12:51:33 INFO <> [org.apache.kafka.common.metrics.Metrics] (KSQL Kafka Consumer Thread) Metrics scheduler closed
2024-07-16 12:51:33 INFO <> [org.apache.kafka.common.metrics.Metrics] (KSQL Kafka Consumer Thread) Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-07-16 12:51:33 INFO <> [org.apache.kafka.common.metrics.Metrics] (KSQL Kafka Consumer Thread) Metrics reporters closed
2024-07-16 12:51:33 INFO <> [org.apache.kafka.common.utils.AppInfoParser] (KSQL Kafka Consumer Thread) App info kafka.consumer for consumer-apicurio-registry-044dba07-5454-46cf-b9d9-b4fbc5d4f3cc-2 unregistered
Exception in thread "KSQL Kafka Consumer Thread" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:837)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:671)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:652)
        at io.apicurio.registry.storage.impl.kafkasql.KafkaSqlFactory.createKafkaConsumer(KafkaSqlFactory.java:239)
        at io.apicurio.registry.storage.impl.kafkasql.KafkaSqlFactory_ProducerMethod_createKafkaConsumer_9609b3686433ee9660bed3ef223ff6de09a5346e_Bean.doCreate(Unknown Source)
        at io.apicurio.registry.storage.impl.kafkasql.KafkaSqlFactory_ProducerMethod_createKafkaConsumer_9609b3686433ee9660bed3ef223ff6de09a5346e_Bean.create(Unknown Source)
        at io.apicurio.registry.storage.impl.kafkasql.KafkaSqlFactory_ProducerMethod_createKafkaConsumer_9609b3686433ee9660bed3ef223ff6de09a5346e_Bean.create(Unknown Source)
        at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:113)
        at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:37)
        at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:34)
        at io.quarkus.arc.impl.LazyValue.get(LazyValue.java:26)
        at io.quarkus.arc.impl.ComputingCache.computeIfAbsent(ComputingCache.java:69)
        at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:34)
        at io.quarkus.arc.impl.ClientProxies.getApplicationScopedDelegate(ClientProxies.java:21)
        at org.apache.kafka.clients.consumer.KafkaSqlFactory_ProducerMethod_createKafkaConsumer_9609b3686433ee9660bed3ef223ff6de09a5346e_ClientProxy.arc$delegate(Unknown Source)
        at org.apache.kafka.clients.consumer.KafkaSqlFactory_ProducerMethod_createKafkaConsumer_9609b3686433ee9660bed3ef223ff6de09a5346e_ClientProxy.close(Unknown Source)
        at io.apicurio.registry.storage.impl.kafkasql.KafkaSqlRegistryStorage.lambda$startConsumerThread$2(KafkaSqlRegistryStorage.java:316)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Unexpected SASL mechanism: PLAIN
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
        at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:744)
        ... 17 more
Caused by: java.lang.IllegalArgumentException: Unexpected SASL mechanism: PLAIN
        at io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler.configure(JaasClientOauthLoginCallbackHandler.java:90)
        at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:60)
        at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)
        ... 21 more
JoschaLaubach commented 3 months ago

Is there some config we're missing? Any help is welcome!

carlesarnal commented 1 month ago

No, no config missing, this is a bug in the implementation. When plain is used the JaasClientOauthLoginCallbackHandler should not be used, but it's still being forced.

CZuegner commented 1 month ago

It worked for me in 2.6.5.Final when set the CallbackHandler to the following value: REGISTRY_KAFKA_COMMON_SASL_LOGIN_CALLBACK_HANDLER_CLASS=org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler