dibbhatt / kafka-spark-consumer

High Performance Kafka Connector for Spark Streaming.Supports Multi Topic Fetch, Kafka Security. Reliable offset management in Zookeeper. No Data-loss. No dependency on HDFS and WAL. In-built PID rate controller. Support Message Handler . Offset Lag checker.
Apache License 2.0
635 stars 318 forks source link

Doesn't seems to be working #3

Closed akhld closed 9 years ago

akhld commented 9 years ago

I gave this project a try, i cloned the repo, build the project and used that jar inside my SparkStreaming project and used your KafkaReceiver to create dstream. Whenever i run it, it just crashes with my SparkContext being shutdown.

You can look at the code over here https://gist.github.com/akhld/a59a2369f0f1f5509af4

This is what happens when i run it in Standalone mode https://gist.github.com/akhld/b5627bf866721df7321b

This is what happens when i run it in local mode https://gist.github.com/akhld/36baccb2a866106315f6

dibbhatt commented 9 years ago

It has been working for many folks who used this consumer and been working in Pearson for last so many months. It is difficult for me to debug if issue exists in your setup. It says "All masters are unresponsive!"....kindly check your setup before raising any issue with the consumer. Looking at the logs I do not get much idea what is going wrong.

akhld commented 9 years ago

If i replace the following lines

val partitions = 3
val kafkaStreams = (1 to partitions).map { i=>
  ssc.receiverStream(new KafkaReceiver(props, i))
}

with these, then everything is working.

val numStreams = 3
val kafkaStreams = (1 to numStreams).map { i=>      
  KafkaUtils.createStream(ssc,zkQuorum,kafkaGroup, topics_map).map(x => x._2)
}

Also it was stackoverflowing inside your KafkaReceiver while running in local mode.

dibbhatt commented 9 years ago

instead using "zookeeper.consumer.connection" -> "localhost:2182"

can you specify the IP address of ZK node and the port . I will change the dummy property file to make sure people use correct ZK IP and Port.

Dib

akhld commented 9 years ago

Just had a quick sneak peak at your pom file, where you are specifying spark version 1.0.1 and mine is spark version 1.1.0, I'm assuming that is the reason why it wasn't able to connect to my cluster.

dibbhatt commented 9 years ago

I do not think this will be the issue. The issue with the ZK IP and Port as I mentioned. Please try that and let me know if that works.

akhld commented 9 years ago

It ddn;t work

dibbhatt commented 9 years ago

Ok, can you modify the Pom file and check then

akhld commented 9 years ago

yep did that, now its pointing me with an issue with my broker path.

[error] (Thread-28) org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 95, anish-dsl-w-0.c.neat-axis-616.internal): java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /broker/topics/final10/partitions
dibbhatt commented 9 years ago

it is not /broker , it should be /brokers

dibbhatt commented 9 years ago

If this works now, I can let you know few tuning options like Rate Limiting how that can be done withing Receiver. If you go to KafkaConfig.java, _fetchSizeBytes default to 64 KB for every topic partition for every Fill . if you want to higher consumer rate, you can increase it to say 1024x1024 ( 1 MB).

dibbhatt commented 9 years ago

It this is no longer a issue with Consumer , shall close this ?

akhld commented 9 years ago

Yep, now it seems working after changing the pom file and also /broker to /brokers. These are the few things i don't understand:

  1. How do we take out the real msgs? i see the DStream is of type MessageAndMetaData, also it got GC after a few iteration

    Number of records in this batch : 95000

    Time: 1418381454000 ms

    consumer.kafka.MessageAndMetadata@38d86167 consumer.kafka.MessageAndMetadata@5e2ce59 consumer.kafka.MessageAndMetadata@6d593813 consumer.kafka.MessageAndMetadata@7c709f7a consumer.kafka.MessageAndMetadata@45f64269 consumer.kafka.MessageAndMetadata@2a821f61 consumer.kafka.MessageAndMetadata@44da2f14 consumer.kafka.MessageAndMetadata@5048ed5c consumer.kafka.MessageAndMetadata@63d75c31 consumer.kafka.MessageAndMetadata@4fe5c82c ...

    error java.lang.OutOfMemoryError: Java heap space java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at org.apache.spark.storage.BlockMessage.set(BlockMessage.scala:85) at org.apache.spark.storage.BlockMessage$.fromByteBuffer(BlockMessage.scala:176) at org.apache.spark.storage.BlockMessageArray.set(BlockMessageArray.scala:63) at org.apache.spark.storage.BlockMessageArray$.fromBufferMessage(BlockMessageArray.scala:109) at org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:46) at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38) at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38) at org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682) at org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

dibbhatt commented 9 years ago

Ok . I am closing the issue.

For out of memory issue, what is the value of _fetchSizeBytes you specified in KafkaConfig.java ? You may need to use this settings to control the rate limit. I will need to make some of these tuning options easy to configure.

Please set a lower value for _fetchSizeBytes and see if you still get memory issue.

dibbhatt commented 9 years ago

You can get the real message from MessageAndMetadata using getPayload.

akhld commented 9 years ago

Thanks man.

I set _fetchSizeBytes to 2mb and it was reading upto 2Million records/Second on a 1+2 node machine(4 cores, 15gb mem each). beyond that its OOM.

Shall i create a Scala quick start documentation and send a pull request?

dibbhatt commented 9 years ago

That will be great. Please go ahead and raise the Pull request.

akhld commented 9 years ago

Created one https://github.com/dibbhatt/kafka-spark-consumer/pull/4 Let me know if you want to add/edit/modify something in it.

dibbhatt commented 9 years ago

Thanks a lot Akhil. Yes I have seen it. Will merge it . I am out of station now, will merge it by today evening.

akhld commented 9 years ago

Awesome!

akhld commented 9 years ago

Hi Dibyentu,

How is the fault tolerance mechanism here? Lets say i spawn 4 receivers to read from 4 partitions and i killed one of the receiver process, what happens then? will it automatically spawn a new receiver and continue reading without any data loss?

dibbhatt commented 9 years ago

Yes that would happen. The fault tolerant of receiver is more for Spark to handle. As Spark restart any killed Receiver, same will be applied here.

akhld commented 9 years ago

Oh Cool. So it doesn't have any specific fault tolerance mechanism?

dibbhatt commented 9 years ago

Well...This consumer wont loose data in-case of receiver failure as it happen in present KafkaUtil ( HighLevel Consumer). Spark-4062 has tried to fix this in Spark 1.2 with again a Kafka HighLevel consumer which seems to tackle the data loss during Receiver failure, but Kafka 0.8.x has a serious issue in its High Level Consumer design and various issues. Some details you can find here : https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design

If you are aware, Kafka is thus changing the Consumer design completely in Kafka 0.9.

For this same reason, I do not see if any High Level consumer is good enough for a production usage and that prompt me to develop this Low Level consumer. Now for writing a Low Level Consumer has some challenges ..like you need to handle the Kafka related stuff controlled within your consumer. That includes Kafka Leader of partition changes, Kafka ZK connection issues etc. This Low Level Consumer has solved this problem.

By the way, SPARK-3129 also solved other part of the data loss story which is data loss during Driver failure. so both solution for Driver and Receiver failure will gives you a complete coverage from data loss in Spark Streaming.

Do let me know if you need any additional fault tolerant mechanism.

akhld commented 9 years ago

Awesome. Let me explore a bit more and will let you know if i get stuck somewhere.

dibbhatt commented 9 years ago

Cool...just to add , Ideally as you know Kafka can be used for replay messages in case of failures, my goal is to have a Kafka Consumer which has its own built in Replay mechanism in case of Driver failure , thus we may not need to depends on the WAL feature which Spark-3192 provides as WAL feature will have overhead on throughput . That WAL feature is a generic solution for any Streaming consumer , but for Kafka as it can do replay of messages, if same can be built in the consumer, that would be ideal Kafka Consumer for Spark. But that needs intelligent offset management in case of Driver failures. Will keep you posted more on this.

akhld commented 9 years ago

One strange issue i ran into is that when i have more receivers than the number of partitions, the job seems to be hung.

Eg: I created a topic with 6 partitions and used 10 receivers to consume, then eventually it will show Reading from partition 7 ( which doesn't exists) and hungs the job forever.

dibbhatt commented 9 years ago

I see. I have never tried this use case. One way to solve this is not to pass the partition number separately, but rather the Receiver query ZK and find out number of partitions for a given topic and spawn that many receivers. e.g. DynamicBrokersReader.java already have API to get number of partitions for a topic which can be used .. I will make this change soon.

akhld commented 9 years ago

DynamicBrokersReader does helped. Another thing i noticed is that while the application is running, i repartitioned my topic (increased the number of partitions) and from that point onwards, the job doesn't read anything from kafka. But kafka's console consumer is able to consume data from the same topic. Program doesn't even read if i restart it also from the modified topic.

Steps to reproduce:

  1. Create a kafka topic lets say with 6 partitions
  2. Start the kafka-spark-consumer program
  3. Repartition the topic (bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partitions 10)
  4. That's it.
dibbhatt commented 9 years ago

Re-partition of topic is a very corner case which I do not think I need to take care in Consumer to automatically detect that as of now...re-partition is a manual process..also once you re-partition, I believe that would impact your Kafka producer code as well. So Re-partition is a special case that needs to taken care offline.

Is the Kafka Console consumer was running when you changed the partition count ?

Regarding not able to consume during restart, can you share the logs to me. Also can you see in your ZK how many partition it shows under /{zookeeper.consumer.path}/{kafka.topic}/{kafka.consumer.id}/

Is the number of partitions still shows old count or new count.

akhld commented 9 years ago

Yes the number of partition is the updated one of course. And my console consumer will disconnect and reconnect after the repartition. Below is the message from the console consumer.

[2014-12-23 13:51:05,368] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)

Now its reading from the partitions after couple of mins (around 20), but it wasn't reading earlier and in that case i missed a couple hundred thousand of messages.

dibbhatt commented 9 years ago

No, you won't miss any messages as if Consumer do not read it won't commit the offset in ZK also and hence once it starts , it starts from previously committed offset. I believe once you re-partition the topic , the existing leader of the partition changes and thus this Low Level Consumer can detect that change and re-connect with new leader after 100 Secs ..so you might have seen the delay.

akhld commented 9 years ago

When you say commit the offset in ZK, does it mean my consumer machines should have write access to the ZK running on another cluster?

dibbhatt commented 9 years ago

No you may not have. That is the reason I kept two properties for ZK settings. One for reading Kafka broker details and other for committing the read offset. You can have two separate cluster for that. We at pearaon have separate zk cluster for Kafka and storing comitted offset.

Dibyendu

Sent from Samsung Mobile

Akhil Das notifications@github.com wrote:

When you say commit the offset in ZK, does it mean my consumer machines should have write access to the ZK running on another cluster?

— Reply to this email directly or view it on GitHub.

akhld commented 9 years ago

Awesome.!

akhld commented 9 years ago

Hi Dibyendu, One more thing i want to clarify, so my batch duration is 2 seconds and i'm consuming around 600MB of data at once, and it takes ~8 seconds to process the whole data. Mean while my kafka producer keeps on pushing data to the topic and after 8 seconds the consumer is consuming about 2-3GB of data which lags drastically/throws up exceptions. Any workaround on these? I tried limiting the parameters in KafkaConfig.java but couldn't get a proper number.

dibbhatt commented 9 years ago

Hi Akhil,

Here are two tuning options you need to try..

You can see following two variables KafkaConfig.java As the SimpleConsumer is generating the Block itself and push the block to BlockManager, the BlockInterval is controlled by the consumer itself ( The Spark specific property spark.streaming.blockInterval does not have any affect ). This is the default value..

public int _fetchSizeBytes = 512 * 1024; public int _fillFreqMs = 200 ;

This suggests that, Receiver for any given Partition of a Topic will pull 512 KB Block of data at every 200ms. With this default settings, let assume your Kafka Topic have 5 partitions, and your Spark Batch Duration is say 10 Seconds, this Consumer will pull

512 KB x ( 10 seconds / 200 ms ) x 5 = 128 MB of data for every Batch.

If you need higher rate, you can increase the _fetchSizeBytes , or if you need less number of Block generated you can increase _fillFreqMs.

These two parameter need to be carefully tuned keeping in mind your downstream processing rate and your memory settings.

I guess you are tuning the _fetchSizeBytes value. You can try increasing the _fillFreqMs and see if that will affect ( This will lower the number of blocks generated in a given Batch Interval ).

Dibyendu

akhld commented 9 years ago

Thanks for the detailed response Dibyendu.