dibbhatt / kafka-spark-consumer

High Performance Kafka Connector for Spark Streaming.Supports Multi Topic Fetch, Kafka Security. Reliable offset management in Zookeeper. No Data-loss. No dependency on HDFS and WAL. In-built PID rate controller. Support Message Handler . Offset Lag checker.
Apache License 2.0
635 stars 318 forks source link

Unable to receive data #25

Closed sorabh89 closed 9 years ago

sorabh89 commented 9 years ago

Hi Dibbhatt,

A am facing some issue after changing one of the zookeeper nodes. I've changed all the settings related to zookeeper ip's. But getting the following:

ERROR ReceiverTracker: Deregistered receiver for stream 1: Error starting receiver 1 - java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:73) at consumer.kafka.ZkCoordinator.(ZkCoordinator.java:64) at consumer.kafka.KafkaConsumer.open(KafkaConsumer.java:61) at consumer.kafka.client.KafkaRangeReceiver.start(KafkaRangeReceiver.java:73) at consumer.kafka.client.KafkaRangeReceiver.onStart(KafkaRangeReceiver.java:58) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:125) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:109) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:308) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:300) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:117) at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:62) ... 16 more Caused by: java.lang.IllegalStateException: instance must be started before calling this method at org.spark-project.guava.base.Preconditions.checkState(Preconditions.java:149) at org.apache.curator.framework.imps.CuratorFrameworkImpl.getData(CuratorFrameworkImpl.java:360) at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:110) ... 17 more

15/09/15 12:56:45 ERROR ReceiverTracker: Deregistered receiver for stream 1: Error starting receiver 1 - java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:73) at consumer.kafka.ZkCoordinator.(ZkCoordinator.java:64) at consumer.kafka.KafkaConsumer.open(KafkaConsumer.java:61) at consumer.kafka.client.KafkaRangeReceiver.start(KafkaRangeReceiver.java:73) at consumer.kafka.client.KafkaRangeReceiver.onStart(KafkaRangeReceiver.java:58) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:125) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:109) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:308) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:300) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:117) at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:62) ... 16 more Caused by: java.lang.IllegalStateException: instance must be started before calling this method at org.spark-project.guava.base.Preconditions.checkState(Preconditions.java:149) at org.apache.curator.framework.imps.CuratorFrameworkImpl.getData(CuratorFrameworkImpl.java:360) at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:110) ... 17 more

entered forEachRDD [Stage 2:> (0 + 4) / 4][Stage 3:> (0 + 0) / 14] 15/09/15 12:57:24 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:73) at consumer.kafka.ZkCoordinator.(ZkCoordinator.java:64) at consumer.kafka.KafkaConsumer.open(KafkaConsumer.java:61) at consumer.kafka.client.KafkaRangeReceiver.start(KafkaRangeReceiver.java:73) at consumer.kafka.client.KafkaRangeReceiver.onStart(KafkaRangeReceiver.java:58) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:125) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:109) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:308) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:300) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: java.lang.IllegalStateException: instance must be started before calling this method at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:117) at consumer.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:62) ... 16 more Caused by: java.lang.IllegalStateException: instance must be started before calling this method at org.spark-project.guava.base.Preconditions.checkState(Preconditions.java:149) at org.apache.curator.framework.imps.CuratorFrameworkImpl.getData(CuratorFrameworkImpl.java:360) at consumer.kafka.DynamicBrokersReader.getLeaderFor(DynamicBrokersReader.java:110) ... 17 more

Regards, Sorabh

dibbhatt commented 9 years ago

Did you change the ZK settings while Streaming job is still running ?

sorabh89 commented 9 years ago

No, I stopped the streaming and then changes the settings.

dibbhatt commented 9 years ago

There are two ZK Configuration .

  1. The ZK which Kafka uses .. This to get connected to kafka partitions. This is the zookeeper.hosts and zookeeper.ports property.
  2. The ZK where offset are committed. This is zookeeper.consumer.connection property.

Both can point to same ZK cluster. But in some cases people may prefer not to touch the ZK which control Kafka .

Which ZK settings you changed ? if you haven't touch the Kafka cluster ZK settings, then you do not need to change 1 . If you want to commit offset to different ZK cluster, you can change 2.

dibbhatt commented 9 years ago

It may be possible your ZK is not UP . Do you able to connect to Kafka Cluster from Console using the changed ZK settings ?

dibbhatt commented 9 years ago

bin/kafka-console-consumer.sh --zookeeper 10.252.1.136:2181 --topic mytopic --from-beginning

Try this from Console and see if you are getting kafka messages ..

sorabh89 commented 9 years ago

I've changed it at both the places, and yes it is working from console.

sorabh89 commented 9 years ago

I am getting some error related to brokerInfo. And since while connecting from console, I don't provide broker info, its working from console.

dibbhatt commented 9 years ago

Ok, then goto zk..

and remove the earlier consumer path ..

or use new kafka.consumer.id

sorabh89 commented 9 years ago

I've tried using new consumer id also, but getting the same error.

dibbhatt commented 9 years ago

It may be also possible, you Spark cluster where Executors are running , not able to connect to new ZK IP. Check the etc/passwd file

sorabh89 commented 9 years ago

Actually when I start I get this error but I also get some data. and then after 2-3 batches it shows 0 records.

dibbhatt commented 9 years ago

So the error you mentioned is not there now ?

dibbhatt commented 9 years ago

Which version of ZK you installed ? Is it different than earlier version ? There could be also possibility that the Curator Framework not able to connect to ZK properly

sorabh89 commented 9 years ago

It is there, but after printing the error the program continues.

sorabh89 commented 9 years ago

No the version of ZK is same.

dibbhatt commented 9 years ago

in Spark UI do you see Receivers are running ?

sorabh89 commented 9 years ago

You mean, executors?

dibbhatt commented 9 years ago

yes. It says ERROR ReceiverTracker: Deregistered receiver . Not sure is your Receivers are still running

sorabh89 commented 9 years ago

They are running, earlier it was working fine but after changing the zookeeper ip, this error started showing up.

sorabh89 commented 9 years ago

Got resolved after restarting Kafka and zookeeper.

Thanks,

dibbhatt commented 9 years ago

Cool