Closed cleaton closed 9 years ago
Hi Cleaton,
I am sorry if the explanation was not given in ReadMe. I will try to give a writeup in Readme soon.
As you find out that the offset is committed after Receiver durably persists the block , that helps to prevent any data loss during receiver failure . In HighLevel Consumers with AutoCommit on , there can be a possibility that the receiver crashed after auto commit , but data is not written to blockmanager. In that case you will loose the data .
Regarding "handle failures" , if you look at the KafkaConsumer and KafkaReceiver/KafkaRangeReceiver , there are two different way it can handle faults. If the faults are related to kafka cluster ( Like leader of partition changes ) it can refresh the PartitionCordinator (ZkCoordinator) which is basically the code which basically select the leader of a partition and open a SimpleConsumer channel with it. If Leader of a Partition changes due to Kafka Broker failure, the refresh of the Coordinator will help to solve the problem .
If there is error related to OffSetOutOfRange in Kafka , the consumer can restart from earliest offset but wont write any duplicates ( The zkoffsets managed that way ).
If there are error in Spark , like BlockManager gives error, WAL gives error , etc , the Receiver is restarted automatically .
Now these kind of failure handling is not there in any receiver which Spark out of the box provides. In Spark 1.1 receiver is not a reliable receiver which leads to data loss, for Spark 1.2 receivers does not provide any fault tolerant aspect like receiver restart , error recovery etc and if error struck , receiver will stop. In spark 1.3 the Direct consumer is different design altogether and does not commit offset to ZK and use checkpointing mechanism to track consumed offset. Not sure if this has been production tested yet , but the issue is if you driver code is modified , you loose your checkpointed data , so your client code need to manage its own offset if such situation arises. I can not comment on fault recovery aspect as I have not tried that yet.
Thank you for such a quick reply @dibbhatt.
Regarding "handle failures" , if you look at the KafkaConsumer and KafkaReceiver/KafkaRangeReceiver , there are two different way it can handle faults. If the faults are related to kafka cluster ( Like leader of partition changes ) it can refresh the PartitionCordinator (ZkCoordinator) which is basically the code which basically select the leader of a partition and open a SimpleConsumer channel with it. If Leader of a Partition changes due to Kafka Broker failure, the refresh of the Coordinator will help to solve the problem .
The high-level consumer used by spark should handle Kafka leader changes and OffsetOutOfRange errors (by restarting from earliest/latest) or is there something I am missing?
So this consumer would still lose data on driver failures since offset might be committed before the data has been processed in spark?
About the new Direct consumer, It maps kafka partitions to RDD partitions and there is a new HasOffsetRanges interface that allows access to the offset ranges used to create the RDD, which enables the possibility of manual offset management (if you don't want to rely on checkpoint). But it has other possible downsides (like start the data download when processing starts, instead of in the background, etc)
Thanks for your clarifications!
Sorry , I missed to mention that , high level kafka consumer has some serious issue which seems to be blocker for using it in Production scenarios . The issue is consumer re-balancing when broker failed. Some details you can find here : https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design
For this specific reason Kafka 0.8.2 has complete whole new consumer design .
Even in Spark user group there was some thread where people faced problem due to consumer re-balancing issue .
In case of Driver failure, the data loss can be avoided in Spark 1.2.x if you enable the WAL feature ( by setting the spark.streaming.receiver.writeAheadLog.enable to "true" ) which will give you end guarantee of no data loss.
As of now there is no other options available but to rely on WAL for data loss in case of Receiver based consumer approach .
Manual offset management in Spark 1.3 is difficult if you create a derived stream from original stream which does not have HasOffsetRanges implementation ..
Hi @cleaton ,
If you are fine with my explanation why this consumer has better offset/error handling, shall I go ahead and close this issue ?
Regards, Dibyendu
Hi @dibbhatt Yes, you can close this issue. Thank you for your clarification.
This might be off topic, but the java kafka consumer sdk didn't make it into the 0.8.2 release to my knowledge (only the producer part). And for the HasOffsetRanges you have a non-derived action at the end (after the derived actions) to write the offsets. I remember that spark guarantees that actions will be run in the order they are defined (although I seem to be unable to find a source for this atm).
Thanks!
True , the New Consumer API is for Kafka 0.9 I guess. But I used the Low Level Consumer API which is backward compatible across the Kafka versions . So this this consumer code will run in other kafka versions also .
Regarding committing offset from non-derived actions in DirectStream , I am not sure if that is feasible as Spark does not provide any guarantee of processing RDD in order...Does in DirectStream process all RDD in order ? There are other issues as I see is , in your driver code one need to handle various error case scenarios ( like restart during OffsetOutOfRange cases ) etc. I think DirectStream still need some work to be production ready .
Oh, some where I remember I hear/read that actions are guaranteed to be executed in the order they are defined. RDD2 = RDD.map()
DirectStream also also uses the simple consumer and they provide some API (only internal for now) query leader metadata etc. It is quite easy to implement a DirectStream constructor that an absolute offset and handles OffsetOutOfRange cases. (see https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala#L401 for details). I have used this way to create a constructor that allows relative offset start (ex. start download from the last 40k messages in a kafka partition)
I know Kixer is using DirectStream in production (since they also developed it), more details here: http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/
(We are still using the original spark-kafka consumer in my company, but we are looking for alternatives))
Thank you for all your help! :+1:
No Problem. Thanks to you for initiating this discussion .
Regarding RDD ordering , you can find some details in this thread .
https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15
What it says that , even the Direct Stream creates batches in order ( same order of Kafka partition) , but when batches are submitted for job execution , spark does not provide any guarantee that batch will execute in submission order .. if that is not guaranteed , then your offset commit from driver may go wrong .
Dib
Hello,
The readme of this project mentions "This utility will help to pull messages from Kafka Cluster using Spark Streaming. The Kafka Consumer is Low Level Kafka Consumer ( SimpleConsumer) and have better handling of the Kafka Offsets and handle failures.", but I can not find any explanation what it does differently to provide this features.
I looked through the code and I can see that kafka offsets are committed to zookeeper after a Recevier.store() (instead of auto commit). This is the main feature or are there other reasons why it has better offset handling and failure handling?
Thanks