apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.36k stars 2.2k forks source link

Failing to upsert/delete records in Apache Iceberg Tables using `MERGE INTO` via Spark Structured Streaming (PySpark) #7627

Closed brandonstanleyappfolio closed 1 year ago

brandonstanleyappfolio commented 1 year ago

Query engine

Spark 3.3.0 Iceberg 1.2.0 Debezium 2.0.1

Question

Hi Iceberg Community,

I am reaching out to ask if anyone has succeeded in using Spark Structured Streaming (PySpark) jobs to create upserted Iceberg tables via the MERGE INTO SQL commands using a forEachBatch. In trying to achieve this, I ran into a couple of issues, and I would like to know if anyone has succeeded or had the same experience.

I was able to perform an initial write to the Iceberg Table. However, any subsequent insert, update or delete to a record resulted in the following exception being thrown:

Caused by: org.apache.iceberg.exceptions.ValidationException: Found conflicting files that can contain records matching true: [s3a://<PATH_TO_PARQUET_FILE>]

I tried to simulate the same functionality using Spark in batch mode and did not get the same error. Hence I'm led to believe this is caused by the foreachBatch operation. Any help or guidance would be greatly appreciated!

For additional context, I ran the following steps:

  1. Started a PySpark shell

    pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.0,za.co.absa:abris_2.12:6.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-hadoop-cloud_2.12:3.3.0 --repositories https://packages.confluent.io/maven/ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hadoop --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.local.type=hadoop --conf spark.sql.catalog.local.warehouse=$PWD/warehouse --conf spark.sql.catalog.spark_catalog.warehouse=s3a://<PATH_TO_WAREHOUSE> --conf spark.sql.catalog.spark_catalog.cache-enabled=false --conf spark.sql.catalog.spark_catalog.local.cache-enabled=false
  2. Created an empty Iceberg table on S3:

    spark.sql("CREATE OR REPLACE TABLE my_iceberg_table (id int, first_name string, last_name string, age int) USING iceberg LOCATION 's3a://<PATH_TO_TABLE>'")
  3. Started a Spark Structured streaming application to consume CDC (Debezium) records from Kafka and upsert/delete the records into my Apache Iceberg table on S3. The following code is a sample of what I executed. I've excluded components irrelevant to the issue that I am trying to solve (ex: deserialization of Kafka messages using Schema Registry).

    
    TABLE_NAME = "<TABLE_NAME>"
    CHECKPOINT_LOCATION = "<CHECKPOINT_LOCATION>"
    TOPIC_NAME = "<TOPIC_NAME>"
    KAFKA_BOOTSTRAP_SERVERS = "<KAFKA_BOOTSTRAP_SERVERS>"

def write_to_s3(spark, df, targettable): df.createOrReplaceTempView(f"tmp{target_table}") merge_sql = f""" MERGE INTO spark_catalog.default.{target_table} t USING ( select id, first_name, last_name, age, op from ( SELECT id, first_name, last_name, row_number() OVER (PARTITION BY id ORDER BY offset desc) rownum FROM tmp{target_table} ) where row_num=1 ) s ON t.id = s.id WHEN MATCHED AND s.op = 'd' THEN DELETE WHEN MATCHED AND s.op = 'u' THEN UPDATE SET t.id = s.id, t.first_name=s.first_name, t.last_name = s.last_name, t.age = s.age WHEN NOT MATCHED AND s.op IN ('c', 'r') THEN INSERT (id, first_name, last_name, age) VALUES (s.id, s.first_name, s.last_name, s.age) """ df._jdf.sparkSession().sql(merge_sql)

spark = SparkSession.builder.appName(app_name).getOrCreate()

df = ( spark.readStream.format("kafka") .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) .option("startingOffsets", "earliest") .option("subscribe", TOPIC_NAME) .load() )

query = df.writeStream.option("checkpointLocation", CHECKPOINT_LOCATION)

write_to_s3_iceberg = query.foreachBatch( lambda batch_df, batch_id: write_to_s3(spark=spark, df=batch_df, target_table=TABLE_NAME) ).start()


Here is the full stack trace:

py4j.protocol.Py4JJavaError: An error occurred while calling o283.sql. : org.apache.spark.SparkException: Writing job aborted at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:749) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:409) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:353) at org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:290) at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:332) at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:331) at org.apache.spark.sql.execution.datasources.v2.ReplaceDataExec.run(WriteToDataSourceV2Exec.scala:290) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79) at org.apache.spark.sql.Dataset.(Dataset.scala:220) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) at jdk.internal.reflect.GeneratedMethodAccessor144.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:244) at py4j.CallbackClient.sendCommand(CallbackClient.java:384) at py4j.CallbackClient.sendCommand(CallbackClient.java:356) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106) at com.sun.proxy.$Proxy37.call(Unknown Source) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:51) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:51) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:660) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:658) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:658) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:255) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:218) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:212) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208) Caused by: org.apache.iceberg.exceptions.ValidationException: Found conflicting files that can contain records matching true: [s3a://] at org.apache.iceberg.MergingSnapshotProducer.validateAddedDataFiles(MergingSnapshotProducer.java:350) at org.apache.iceberg.BaseOverwriteFiles.validate(BaseOverwriteFiles.java:142) at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:215) at org.apache.iceberg.BaseOverwriteFiles.apply(BaseOverwriteFiles.java:31) at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:365) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196) at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:363) at org.apache.iceberg.BaseOverwriteFiles.commit(BaseOverwriteFiles.java:31) at org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:213) at org.apache.iceberg.spark.source.SparkWrite.access$1300(SparkWrite.java:83) at org.apache.iceberg.spark.source.SparkWrite$CopyOnWriteOperation.commitWithSerializableIsolation(SparkWrite.java:437) at org.apache.iceberg.spark.source.SparkWrite$CopyOnWriteOperation.commit(SparkWrite.java:407) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:392) ... 75 more

at py4j.Protocol.getReturnValue(Protocol.java:476)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
at com.sun.proxy.$Proxy37.call(Unknown Source)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:51)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:51)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:660)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:658)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:658)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:255)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:218)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:212)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
brandonstanleyappfolio commented 1 year ago

This issue was resolved. The root cause of this problem was when the PySpark shell is not terminated correctly (i.e. without explicitly calling exit()). Here are the steps to re-create the problem:

  1. Start PySpark shell
  2. Execute the structured streaming code above
  3. Close terminal while PySpark shell is still running (without invoking the exit() function)
  4. Re-open PySpark shell
  5. Execute the structured streaming code above

I was able to successfully execute the original code after re-creating my Spark environment entirely (i.e. re-creating the Kubernetes pod that I was using to test my code).