Morningstar / kafka-offset-monitor

A small web app to monitor the progress of kafka consumers and their lag wrt the log.
Apache License 2.0
282 stars 108 forks source link

Error creating an AdminClient #34

Closed jia2 closed 5 years ago

jia2 commented 6 years ago

Hi, I'm trying to connect kafka offset manager with a ssl secured kafka cluster with three brokers (kafka version 1.0.1, scala 2.11). And I started kafkaoffsetmanager with following:

java -Djava.security.auth.login.config=/home/ec2-user/kafka-config/kafka_jaas.conf \
-Djavax.net.ssl.trustStore=/home/ec2-user/kafka-config/kafka.jks \
-Djavax.net.ssl.trustStorePassword=mypassword \
-Djavax.net.ssl.keyStore=/home/ec2-user/kafka-config/kafka.jks \
-Djavax.net.ssl.keyStorePassword=mypassword \
-cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka \
--kafkaSecurityProtocol SASL_SSL \
--kafkaBrokers 10.105.20.103:9093,10.105.21.171:9093,10.105.22.213:9093 \
--zk 10.105.20.47:2181,10.105.21.228:2181,10.105.22.233:2181 \
--port 9999 \
--refresh 10.seconds \
--retain 2.days \
--dbName offsetapp_kafka_ssl

I got this error:

2018-07-30 08:11:23 ERROR KafkaOffsetGetter$:103 - An unhandled exception was thrown while reading messages from the committed offsets topic.
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:648)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:542)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:524)
        at com.quantifind.kafka.core.KafkaOffsetGetter$.createNewKafkaConsumer(KafkaOffsetGetter.scala:207)
        at com.quantifind.kafka.core.KafkaOffsetGetter$.startCommittedOffsetListener(KafkaOffsetGetter.scala:320)
        at com.quantifind.kafka.OffsetGetter$$anon$3.run(OffsetGetter.scala:289)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:74)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:60)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:79)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:577)
        ... 10 more

And I also noticed that the consumer properties are not correctly initilized:

2018-07-30 08:11:23 INFO  ConsumerConfig:165 - ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id = kafka-monitor-committedOffsetListener
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [10.105.20.103:9093, 10.105.21.171:9093, 10.105.22.213:9093]
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.keystore.type = JKS
        ssl.trustmanager.algorithm = PKIX
        enable.auto.commit = false
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        ssl.truststore.password = null
        session.timeout.ms = 30000
        metrics.num.samples = 2
        client.id =
        ssl.endpoint.identification.algorithm = null
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        check.crcs = true
        request.timeout.ms = 40000
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 32768
        ssl.cipher.suites = null
        ssl.truststore.type = JKS
        security.protocol = SASL_SSL
        ssl.truststore.location = null

How can I fix this error and pass by correct values for consumer properties?

Thanks a lot for any hint!

Dingjun

rcasey212 commented 5 years ago

SSL authentication is not currently supported. Please feel free to submit a PR to add this feature. I would be happy to review it.