apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.19k stars 2.16k forks source link

Make iceberg an idempotent sink for Spark like delta lake #8809

Open paulpaul1076 opened 11 months ago

paulpaul1076 commented 11 months ago

Feature Request / Improvement

Delta lake has an interesting feature which you can read about here: https://docs.delta.io/latest/delta-streaming.html#idempotent-table-writes-in-foreachbatch And here: image image

From what I understand, iceberg does not support this, but I think that it is a really important feature. Can we add this to iceberg?

I don't think that multi-table transactions will solve this problem, because from my understanding foreachBatch commits its offsets after the entire lambda function passed to it gets executed, now imagine you have this code with multi-table transactions:

  dfStr.writeStream.foreachBatch((df: DataFrame, id: Long) => {
    // create transaction1
    // create transaction2
    // multi_table_commit(transaction1, transaction2)
    // send something to kafka
  }).start().awaitTermination()

From what I understand, if the "send something to kafka" step fails, the entire microbatch is re-executed and the multi-table transaction will write the same data a second time, which will cause data duplication. At my job, for example, we use this kind of logic and we frequently kill our streaming jobs to redeploy new code after which we restart them.

So, from my understanding, iceberg is not an idempotent sink and you can't expect to have end-to-end exactly once guarantees with iceberg?

I found some issue about this here: https://github.com/apache/iceberg/issues/178 Also here it says that iceberg is an idempotent sink: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-iceberg-how-it-works.html

If this is supported, can somebody provide an example of how this works?

Query engine

Spark

paulpaul1076 commented 11 months ago

@RussellSpitzer provided this code which achieves the same:

foreachBatch (batch_df, batch_id) => {
  val lastBatch = Spark3Util.loadIcebergTable(spark,"db.timezoned").currentSnapshot().summary()(STREAMID)

   if (batch_id > lastBatch) {
       batch_df.writeTo(...).option("snapshot-property."+STREAMID, batch_id).....append
   }

}

But I wonder if delta lake if faster here, because I assume that this metadata lookup Spark3Util.loadIcebergTable(spark,"db.timezoned").currentSnapshot().summary()(STREAMID) goes to S3?

Can somebody add this to the structured streaming doc? Isn't this important information?

github-actions[bot] commented 1 hour ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.