opensearch-project / opensearch-spark

Spark Accelerator framework ; It enables secondary indices to remote data stores.
Apache License 2.0
22 stars 33 forks source link

[FEATURE] Handle Iceberg overwrite and delete snapshots to prevent index refresh failure #708

Open dai-chen opened 1 month ago

dai-chen commented 1 month ago

Is your feature request related to a problem?

When working with Iceberg tables in Spark streaming jobs, the stream will terminate with an error if there are updated or deleted rows in the Iceberg table. Specifically, Iceberg throws an exception when an overwrite snapshot is encountered, which causes the Spark streaming job to fail.

What solution would you like?

The Flint index refresh job should be able to handle updated or deleted rows gracefully by using the options streaming-skip-overwrite-snapshots=true and streaming-skip-delete-snapshots=true to avoid termination. These options should be set by default for use cases involving streaming and incremental updates, allowing the job to continue processing without manual intervention.

If we pursue this approach, we need to determine how to elegantly configure the source operator when creating a streaming job. Currently, we have the FlintSparkSourceRelationProvider, which is primarily used for query rewriting. Additionally, we should consider configuring other defaults, such as maxFilesPerTrigger, which can help speed up progress and generate results more quickly for Flint materialized view refreshes.

What alternatives have you considered?

Alternatively, ensure that these options are well-documented and easily discoverable. So users can set them manually by extraOptions in index options in create index statement and avoid missing this critical step. Clear guidance would help users avoid job failures caused by unhandled overwrite or delete snapshots.

Do you have any additional context?

  1. Related to https://github.com/opensearch-project/opensearch-spark/issues/700
  2. Example error:
scala> spark.readStream.table("myglue.default.iceberg_test")
  .writeStream
  .outputMode("update")
  .format("console")
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .option("checkpointLocation", "s3://checkpoint_1")
  .start()

# Append snapshot can succeed
scala> sql("INSERT INTO myglue.default.iceberg_test values (8, 'h', 8.0)")
res26: org.apache.spark.sql.DataFrame = []

Batch: 2
-------------------------------------------
+---+----+---+
| id|data|col|
+---+----+---+
|  8|   h|8.0|
+---+----+---+

# Overwrite snapshot caused streaming job to terminate
scala> sql("UPDATE myglue.default.iceberg_test SET data = 'hhh' WHERE id = 8")
res27: org.apache.spark.sql.DataFrame = []

24/09/26 21:12:14 ERROR MicroBatchExecution: Query [id = 55630ffa-df96-4b4b-abe0-abe95c658565,
  runId = 5f628a2a-6c8a-4944-bab3-4f2101f4cded] terminated with error
java.lang.IllegalStateException: Cannot process overwrite snapshot: 2626116465374966496, to ignore overwrites,
  set streaming-skip-overwrite-snapshots=true
  at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkState(Preconditions.java:772)
  at org.apache.iceberg.spark.source.SparkMicroBatchStream.shouldProcess(SparkMicroBatchStream.java:271)
  at org.apache.iceberg.spark.source.SparkMicroBatchStream.nextValidSnapshot(SparkMicroBatchStream.java:426)
  at org.apache.iceberg.spark.source.SparkMicroBatchStream.latestOffset(SparkMicroBatchStream.java:395)
  ...

# Start streaming job with ignore options
scala> spark.readStream
  .option("streaming-skip-overwrite-snapshots", true)
  .option("streaming-skip-delete-snapshots", true)
  .table("myglue.default.iceberg_test")
  .writeStream
  .outputMode("update")
  .format("console")
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .option("checkpointLocation", "s3://checkpoint_2")
  .start()

# Ignored
scala> sql("UPDATE myglue.default.iceberg_test SET data = 'hhh123' WHERE id = 8")
res47: org.apache.spark.sql.DataFrame = []

scala> sql("INSERT INTO myglue.default.iceberg_test values (10, 'i', 10.0)")
res48: org.apache.spark.sql.DataFrame = []

scala> -------------------------------------------
Batch: 1
-------------------------------------------
+---+----+----+
| id|data| col|
+---+----+----+
| 10|   i|10.0|
+---+----+----+
dai-chen commented 1 month ago

TODO: Verify if current Flint append mode also has this issue.