databricks / spark-redshift

Redshift data source for Apache Spark
Apache License 2.0
605 stars 349 forks source link

what is the designed semantics when redshift copy failed #263

Closed tzhang101 closed 8 years ago

tzhang101 commented 8 years ago

Hi, We noticed the following behavior and we think it is not right when we use this to write dataframe to redshift database. 1) write to S3 is fine. 2) copy to redshift failed with exception such as connection failure. 3) spark job is marked failed (red dot shown in spark console). 4) driver continue to start the next batch with new offsets.

So we lost some data due to the step 2) (red dot). Should 4) drvier continue but from the same offsets of last batch?

Thanks.

Tian Zhang tzhang101@yahoo.com

tzhang101 commented 8 years ago

Basically we would like to make sure if the copy failed, then the offsets should not move ahead.

tdas commented 8 years ago

you should probably handle your failures explicitly inside the foreachRDD.

dstream.foreachRDD { rdd => 
  try {
     // write to redshift
  } catch {
    // create a new thread to stop StreamingContext
  }
}

Also, if you are using DirectKafka approach and explicitly writing Kafka offsets that have been processed, you must write them such that they are written only after the redshift writes succeed. The kafka integration guide has some more info on how to access offsets.

// Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array[OffsetRange]()

// this foreachRDD will be executed before the next one in every batch  
directKafkaStream.foreachRDD { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }

transformedDstream.foreachRDD { rdd => 
   // write to redshift with failure handling
   // write offset only if redshift writing succeeds, else context will be shutdown in a different thread
}
tzhang101 commented 8 years ago

I found the following in spark doc "In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that all the Kafka data is received by Spark Streaming exactly once. Along with this, if you implement exactly-once output operation, you can achieve end-to-end exactly-once guarantees. " We are using this direct API, then what does "end-to-end exactly-once" mean here? Can you elaborate the implementation to achieve this end-to-end exactly-once with this API? Thanks.

tdas commented 8 years ago

https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html

Let me know if you need more clarifications.

tzhang101 commented 8 years ago

From the above link I saw "Thus this direct API eliminates the need for both WALs and Receivers for Kafka, while ensuring that each Kafka record is effectively received by Spark Streaming exactly once. This allows one to build a Spark Streaming + Kafka pipelines with end-to-end exactly-once semantics (if your updates to downstream systems are idempotent or transactional). Overall, it makes such streaming processing pipelines more fault-tolerant, efficient, and easier to use."

Now my downstream system is redshift (both imdempotent and transactional), but the above link did not explain how to make end-to-end exactly once when the redshift copy failed. I assume this link plus what you just explained, i.e., terminate streaming context and write kafka offsets as well together will provide end-to-end exactly once, am I right?

tdas commented 8 years ago

yeah. i believe you can set something up by combining these pieces in the right way.