rbrush / kite-apps

Prescriptive Applications over Kite and Hadoop
Apache License 2.0
12 stars 3 forks source link

Use checkpoints in Spark Streaming #12

Open rbrush opened 9 years ago

rbrush commented 9 years ago

The current Spark Streaming implementation does not checkpoint state, and is therefore not resilient to failures. Checkpoints should be written to a well-defined location, probably in the application's installation folder.

sjdurfey commented 9 years ago

I started looking into this. Before we spoke I wanted to add the checkpoint directory to the StreamDescription, but after we discussed placing the checkpoint directory in the root install directory, I didn't want to modify the StreamDescription anymore to be different from what the user specified in their App. So, I went down the path of adding the install root to the AppContext, as it seemed to be the best fit.

The only thing I don't really like with this approach, is needing to specify the checkpoint directory within the install root whenever it needs to be referenced. I feel like there should be some static values that have the paths after the root directory. So, instead of needing to do appContext.getInstallPath() + "/var/checkpoints", I could just do appContext.getInstallPath() + AppContext.CHECKPOINT_DIR.

I'm still looking into how to retrieve the kafka offsets from the previous running streaming app, but for now here is the change set for adding in the checkpoint support (without tests). https://github.com/sjdurfey/kite-apps/commit/a61ee1a3456a201646ff32651f5f85959e338e9a

rbrush commented 9 years ago

I like the idea of adding the install path to the app context. Makes sense for the reasons you described.

It did occur to me that we'd need a checkpoint folder per job versus one for the entire app, since at job could be running it's own Spark Streaming instance. So the pattern probably needs to be /var/checkpoints/.

+1 also to avoiding hard-coding "var/checkpoints" in a bunch of places. There are a number of things like this that I've done that I'd like to refactor into constants...we just need to work out a good place to put them.

One change that I noticed in this stab at it is the addition of a timestamp to the installation root. I'm not sure we want to do this, at least right now. The caller may have better defined versioning semantics than a timestamp (such as a version or a build number), so if they want a new version installed the caller should just place it in a separate directory.

sjdurfey commented 9 years ago

I actually didn't mean to include the timestamp on the directory. I added it for local testing to avoid exceptions for the directory already existing when re-running the command multiple times. I did add in the support for specifying the duration for sampling the kafka topic. This value is pulled from settings, specified in the properties file at the command line. Is there a better place to put this config?

It did occur to me that we'd need a checkpoint folder per job versus one for the entire app, since at job could be running it's own Spark Streaming instance. So the pattern probably needs to be /var/checkpoints/.

Could you expand upon this? Kite apps will create a streaming context when it starts the app, and there can only be one streaming context per jvm.

sjdurfey commented 9 years ago

I'm having some trouble tracking down a simple way of getting the processed offsets. Spark doesn't provide a public way of reading the checkpoint directory back in to retrieve the offsets, and I wouldn't be comfortable copying code to accomplish it. In the documentation and on the mailing list they were pushing me towards getting the offsets from the RDD as so:

 var offsetRanges = Array[OffsetRange]()
 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
           ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }
   ...
 }

The problem with this approach is that kite-apps would need to regain control of the pipeline at the end in order to do something with the offsets. So, that isn't ideal.

So, I think we're left with adding in a function in the SparkKafkaStreamLoader class that will be inserted between the creation of of the direct stream, and the map of ToAvroFunction to get the offsets from the RDD, and writing them out somewhere. Either HBase, HDFS, or another Kafka topic. Managing the amount of files that will be written in HDFS (since the writes would occur from the worker nodes) would be unmanageable (and many small files; no good) when trying to retrieve the last set of offsets that were processed (wall clock timestamps wouldn't be of any help, since there is no order guarantee or clock sync).

It seems that HBase would be the ideal choice. We could maybe use a broadcast variable to define a version that the function could use in the row key when writing to HBase so the offsets could be scoped to a particular run, and then we could just replay all the offsets from the last successfully finished job, just to make sure no data was lost during the last run if the shutdown wasn't graceful.

Or, the simplest choice would be to just replay everything in the kafka topic.

rbrush commented 9 years ago

I think we'll need to write this somewhere, since we can't let the Kafka topic grow unbounded. Perhaps we could write it to Zookeeper itself? It sounds like that's what the other Kafka consumers do to track offsets, we already have a dependency on Zookeeper, and the size and frequency of data is small enough (an integer offset for every RDD loaded from the stream) is small enough that Zookeeper should be able to handle it.