Morningstar / kafka-offset-monitor

A small web app to monitor the progress of kafka consumers and their lag wrt the log.
Apache License 2.0
282 stars 108 forks source link

scala.NotImplementedError: an implementation is missing? #20

Closed zifangsky closed 6 years ago

zifangsky commented 6 years ago

When I start kafka-offset-monitor,I have the following error:

2018-02-27 17:41:14 WARN ServletHandler:586 - Error for /topiclist scala.NotImplementedError: an implementation is missing at scala.Predef$.$qmark$qmark$qmark(Predef.scala:230) at com.quantifind.kafka.core.ZKOffsetGetter.getTopicsAndLogEndOffsets(ZKOffsetGetter.scala:126) at com.quantifind.kafka.offsetapp.OffsetGetterWeb$$anonfun$getTopicsAndLogEndOffsets$1.apply(OffsetGetterWeb.scala:136) at com.quantifind.kafka.offsetapp.OffsetGetterWeb$$anonfun$getTopicsAndLogEndOffsets$1.apply(OffsetGetterWeb.scala:136) at com.quantifind.kafka.offsetapp.OffsetGetterWeb$.withOG(OffsetGetterWeb.scala:109) at com.quantifind.kafka.offsetapp.OffsetGetterWeb$.getTopicsAndLogEndOffsets(OffsetGetterWeb.scala:135) at com.quantifind.kafka.offsetapp.OffsetGetterWeb$$anon$1$$anonfun$intent$1.applyOrElse(OffsetGetterWeb.scala:190) at com.quantifind.kafka.offsetapp.OffsetGetterWeb$$anon$1$$anonfun$intent$1.applyOrElse(OffsetGetterWeb.scala:178) at scala.PartialFunction$Lifted.apply(PartialFunction.scala:223) at scala.PartialFunction$Lifted.apply(PartialFunction.scala:219) ...

How can I fix that?

Thanks.

rcasey212 commented 6 years ago

Hi @zifangsky,

This appears to be an issue if you are using ZooKeeper to store your committed offsets. There is currently no ZooKeeper equivalent implementation for this function. I'll have to look into that, sorry.

Are you really storing your committed offsets into ZooKeeper? As of Kafka 0.9, the default is to store the offsets in Kafka. Please double-check. If you are storing your offsets in Kafka, then use this option on the command line, and you should be good:

-- offsetStorage "kafka"
zifangsky commented 6 years ago

@rcasey212 Thank you for your reply.My Kafka version is 1.0.0,and here is my start command:

!/bin/bash

java -cp KafkaOffsetMonitor-assembly-0.4.6-SNAPSHOT.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --offsetStorage "kafka" \ --zk 192.168.1.159:2100,192.168.1.159:2101,192.168.1.159:2102 \ --kafkaBrokers 192.168.1.159:9092,192.168.1.159:9093,192.168.1.159:9094 \ --kafkaSecurityProtocol "PLAINTEXT" \ --port 10000 \ --refresh 5.minutes \ --retain 1.day 1>/usr/local/KafkaMonitor/logs/stdout.log 2>/usr/local/KafkaMonitor/logs/stderr.log

And now,it appears another error:

2018-03-02 10:39:38 INFO AppInfoParser:82 - Kafka version : 0.9.0.1 2018-03-02 10:39:38 INFO AppInfoParser:83 - Kafka commitId : 23c69d62a0cabf06 2018-03-02 10:40:18 ERROR KafkaOffsetGetter$:103 - The Kafka Client reading topic/partition LogEndOffsets has thrown an unhandled exception. Will attempt to reconnect. org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata 2018-03-02 10:40:18 INFO KafkaOffsetGetter$:68 - Creating new Kafka AdminClient to get consumer and group info. 2018-03-02 10:40:18 INFO KafkaOffsetGetter$:68 - Created admin client: kafka.admin.AdminClient@1e7ccb96 2018-03-02 10:40:18 ERROR KafkaOffsetGetter$:103 - Kafka AdminClient polling aborted due to an unexpected exception. java.lang.RuntimeException: Request METADATA failed on brokers List(Node(-2, localhost.localdomain, 9093), Node(-1, localhost.localdomain, 9092), Node(-3, localhost.localdomain, 9094)) at kafka.admin.AdminClient.sendAnyNode(AdminClient.scala:73) at kafka.admin.AdminClient.findAllBrokers(AdminClient.scala:93) at kafka.admin.AdminClient.listAllGroups(AdminClient.scala:101) at kafka.admin.AdminClient.listAllGroupsFlattened(AdminClient.scala:122) at kafka.admin.AdminClient.listAllConsumerGroupsFlattened(AdminClient.scala:126) at com.quantifind.kafka.core.KafkaOffsetGetter$$anonfun$f$lzycompute$1.apply$mcV$sp(KafkaOffsetGetter.scala:421) at com.quantifind.kafka.core.KafkaOffsetGetter$$anonfun$f$lzycompute$1.apply(KafkaOffsetGetter.scala:415) at com.quantifind.kafka.core.KafkaOffsetGetter$$anonfun$f$lzycompute$1.apply(KafkaOffsetGetter.scala:415) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Can you help me to see it again,please? Is it because my version of Kafka is too high?

rcasey212 commented 6 years ago

Moved your second question into a new issue as it is unrelated to your first question, which has been answered. Trying to keep these clean so that if other people have the same issue in the future, it is easy for them to follow and get useful information.