apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.24k stars 2.17k forks source link

[bug] Spark SQL phase optimization failed on concurrent write attempt #7800

Closed Disentinel closed 2 weeks ago

Disentinel commented 1 year ago

Apache Iceberg version

1.2.1 (latest release)

Query engine

Spark

Please describe the bug 🐞

I got The Spark SQL phase optimization failed with an internal error. Please, fill a bug report in, and provide the full stack trace. High-level def:

Spark log with trace:

23/06/08 06:33:01 ERROR MicroBatchExecution: Query [id = 3e585ace-8106-4bdd-a011-967c7db7799a, runId = 3ce50cdc-7dd7-44c8-8921-150568ccf4ae] terminated with error
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 276, in call
    raise e
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 273, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "/tmp/spark-fad2f6c4-d2e6-4343-840c-6295df177f61/partitioned_sync.py", line 131, in process
    df.sparkSession.sql("""
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 1034, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self)
  File "/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o92.sql.
: org.apache.spark.SparkException: The Spark SQL phase optimization failed with an internal error. Please, fill a bug report in, and provide the full stack trace.
    at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:500)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:118)
    at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
    at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
    at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:244)
    at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
    at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
    at com.sun.proxy.$Proxy38.call(Unknown Source)
    at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:51)
    at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:51)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:665)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:663)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:663)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Caused by: java.lang.NullPointerException
    at org.apache.iceberg.DeleteFileIndex.forEntry(DeleteFileIndex.java:122)
    at org.apache.iceberg.ManifestGroup.lambda$createFileScanTasks$13(ManifestGroup.java:351)
    at org.apache.iceberg.io.CloseableIterable$7$1.next(CloseableIterable.java:202)
    at org.apache.iceberg.util.ParallelIterable$ParallelIterator.lambda$new$1(ParallelIterable.java:69)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

    at py4j.Protocol.getReturnValue(Protocol.java:476)
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
    at com.sun.proxy.$Proxy38.call(Unknown Source)
    at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:51)
    at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:51)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:665)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:663)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:663)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Traceback (most recent call last):
  File "/tmp/spark-fad2f6c4-d2e6-4343-840c-6295df177f61/partitioned_sync.py", line 184, in <module>
    SQ.awaitTermination()
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 107, in awaitTermination
  File "/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
pyspark.sql.utils.StreamingQueryException: Query [id = 3e585ace-8106-4bdd-a011-967c7db7799a, runId = 3ce50cdc-7dd7-44c8-8921-150568ccf4ae] terminated with exception: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 276, in call
    raise e
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 273, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  File "/tmp/spark-fad2f6c4-d2e6-4343-840c-6295df177f61/partitioned_sync.py", line 131, in process
    df.sparkSession.sql("""
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 1034, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self)
  File "/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o92.sql.
: org.apache.spark.SparkException: The Spark SQL phase optimization failed with an internal error. Please, fill a bug report in, and provide the full stack trace.
    at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:500)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:118)
    at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
    at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
    at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:244)
    at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
    at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
    at com.sun.proxy.$Proxy38.call(Unknown Source)
    at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:51)
    at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:51)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:665)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:663)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:663)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Caused by: java.lang.NullPointerException
    at org.apache.iceberg.DeleteFileIndex.forEntry(DeleteFileIndex.java:122)
    at org.apache.iceberg.ManifestGroup.lambda$createFileScanTasks$13(ManifestGroup.java:351)
    at org.apache.iceberg.io.CloseableIterable$7$1.next(CloseableIterable.java:202)
    at org.apache.iceberg.util.ParallelIterable$ParallelIterator.lambda$new$1(ParallelIterable.java:69)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Job source code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.column import *
import sys
import os
import http.client
import json
from datetime import datetime as dt
import time

dl_service = http.client.HTTPSConnection(sys.argv[5])
headers = {'Content-Type': 'application/json'}

HOSTNAME = sys.argv[1]
DATABASE = sys.argv[2]
COLLECTION = sys.argv[3]
KAFKA_BOOTSTRAP_SERVER = sys.argv[4]
OFFSETS_LIMIT = os.getenv("OFFSETS_LIMIT")
LOG_LEVEL = os.getenv("LOG_LEVEL")
JOB_NAME = os.getenv("JOB_NAME")
PARTITIONS = os.getenv("PARTITIONS") or "[0,1,2,3,4,5,6,7,8,9,10,11]"

if __name__ == "__main__":
    spark = (
        SparkSession.builder.appName(HOSTNAME+"-"+DATABASE+"-"+COLLECTION)
            .config('spark.sql.sources.partitionOverwriteMode=dynamic')
            .getOrCreate()
    )
    spark.sparkContext.setLogLevel(LOG_LEVEL)

KAFKA_PASSWORD = os.getenv("KAFKA_PASSWORD")
KAFKA_USER = os.getenv("KAFKA_USER")
KAFKA_TOPIC_NAME = "dl."+COLLECTION
ASSIGN_PATTERN = '{"'+KAFKA_TOPIC_NAME+'":'+PARTITIONS+"}"

print('Starting kafka on topic '+KAFKA_TOPIC_NAME+PARTITIONS)

payloadSchema = StructType([
    StructField('documentKey', StringType()),
    StructField('fullDocument', StringType()),
    StructField('operationType', StringType()),
    StructField('clusterTime', StringType()),
])

keySchema = StructType([
    StructField('_id', StringType())
])

clusterTimeSchema = StructType([
    StructField('$timestamp', StructType([
        StructField('t', StringType()),
        StructField('i', StringType()),
    ]))
])

last_exec = dt.now()

print('OFFSETS_LIMIT', OFFSETS_LIMIT)

kafkaStream = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_PLAINTEXT") \
    .option("kafka.sasl.jaas.config", 'org.apache.kafka.common.security.plain.PlainLoginModule required username="'+KAFKA_USER+'" password="'+KAFKA_PASSWORD+'";') \
    .option("kafka.group.id", COLLECTION) \
    .option("failOnDataLoss", "false") \
    .option("assign", ASSIGN_PATTERN) \
    .option("maxOffsetsPerTrigger", OFFSETS_LIMIT) \
    .option("startingOffsets", "earliest")
streamingDF = kafkaStream.load() \
    .selectExpr("CAST(value AS STRING)", "offset", "timestamp") \
    .withColumn("val", from_json("value", payloadSchema))  \
    .withColumn("key", from_json("val.documentKey", keySchema)) \
    .withColumn("clusterTime", from_json("val.clusterTime", clusterTimeSchema)) \
    .select(
        col('key._id').alias('_id'),
        col('val.fullDocument').alias('doc'),
        col('val.operationType').alias('op_type'),
        'offset',
        'timestamp',
        col('clusterTime.$timestamp.t').cast('int').alias('oplog_ts_t'),
        col('clusterTime.$timestamp.i').cast('int').alias('oplog_ts_i')
    )

SQ = None

def process(df, epoch_id):
    global last_exec
    global SQ
    print("========= batch =========")
    df.show()
    df.persist()
    df.createOrReplaceTempView("update_batch")
    # create database
    temp_view_start = time.perf_counter()
    df.sparkSession.sql("""
    CREATE DATABASE IF NOT EXISTS `{host}`;
    """.format(host=HOSTNAME))
    df.sparkSession.sql("""
    CREATE TABLE IF NOT EXISTS `{host}`.`{db}__{table}` (
        _id string COMMENT 'unique id',
        doc string,
        offset bigint,
        oplog_ts_t bigint,
        oplog_ts_i bigint,
        timestamp timestamp
    )
    USING iceberg PARTITIONED BY (bucket(12, _id));
    """.format(host=HOSTNAME, db=DATABASE, table=COLLECTION))

# NOTE! I'm doing also ALTER TABLE from another script
# ALTER TABLE `{host}`.`{db}__{table}` SET TBLPROPERTIES (
#    'write.delete.mode' = 'merge-on-read',
#    'write.update.mode' = 'merge-on-read',
#    'write.merge.mode' = 'merge-on-read',
#    'option.format-version' = '2',
#    'format-version' = '2'
# )

# create temp view
    df.sparkSession.sql("""
    SELECT
        _id,
        FIRST(doc) as doc,
        FIRST(op_type) as op_type,
        FIRST(offset) as offset,
        FIRST(timestamp) as timestamp,
        FIRST(oplog_ts_t) as oplog_ts_t,
        FIRST(oplog_ts_i) as oplog_ts_i
    FROM (
        SELECT * FROM update_batch
        ORDER BY oplog_ts_t DESC, oplog_ts_i DESC
    )
    GROUP BY _id
    """).createOrReplaceTempView("last_state")
    # apply changes to table
    temp_view_end = time.perf_counter()
    print("temp view time", temp_view_end - temp_view_start)
    merge_start = time.perf_counter()
    df.sparkSession.sql("""
    MERGE INTO `{host}`.`{db}__{table}` t
    USING (
        SELECT _id, doc, op_type, offset, timestamp, oplog_ts_t, oplog_ts_i
        FROM last_state
    ) s
    ON t._id = s._id
    WHEN MATCHED AND
        s.op_type = 'delete' THEN DELETE
    WHEN MATCHED
        AND ISNULL(t.oplog_ts_t)
        OR s.oplog_ts_t > t.oplog_ts_t
        OR s.oplog_ts_t = t.oplog_ts_t AND s.oplog_ts_i > t.oplog_ts_i
    THEN UPDATE SET
        t.doc = s.doc,
        t.offset = s.offset,
        t.timestamp = s.timestamp,
        t.oplog_ts_t = s.oplog_ts_t,
        t.oplog_ts_i = s.oplog_ts_i
    WHEN NOT MATCHED AND
        s.op_type != 'delete'
    THEN INSERT (_id, doc, offset, oplog_ts_t, oplog_ts_i, timestamp)
    VALUES (s._id, s.doc, s.offset, s.oplog_ts_t, s.oplog_ts_i, s.timestamp)
    """.format(host=HOSTNAME, db=DATABASE, table=COLLECTION))
    merge_end = time.perf_counter()
    print("merge time", merge_end - merge_start)
    print('processed')
    df.unpersist()

SQ = streamingDF.writeStream.format('console') \
    .outputMode('append') \
    .option("checkpointLocation", "s3a://MY_BUCKET_NAME/checkpoints/"+JOB_NAME) \
    .foreachBatch(process) \
    .start()

while SQ.isActive:
    if SQ.lastProgress is not None:
        endOffsets = SQ.lastProgress['sources'][0]['endOffset'][KAFKA_TOPIC_NAME]
        print('SENDING LAST OFFSET', endOffsets)
        data = {
            'spark': spark.sparkContext.getConf().getAll(),
            'topic': KAFKA_TOPIC_NAME,
            'last_progress': SQ.lastProgress,
            'exec': sys.argv[0]
            }
        json_data = json.dumps(data)
        dl_service.request('POST', '/datalake/metric', json_data, headers)
        response = dl_service.getresponse()
        print(response.read().decode())
    else:
        print('NO LAST PROGRESS DEFINED YET')
    time.sleep(60)

SQ.awaitTermination()

print('Waiting!')

Some of related spark config options:


'spark.jars.packages': 'org.apache.spark:'
        +'spark-sql-kafka-0-10_2.12:3.3.2,'
        +'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1,'
        +'software.amazon.awssdk:bundle:2.20.56,'
        +'software.amazon.awssdk:url-connection-client:2.20.56',
    'spark.sql.extensions': 'org.apache.iceberg.spark.'
        +'extensions.IcebergSparkSessionExtensions',
    'spark.sql.broadcastTimeout': '3600',
    'spark.sql.defaultCatalog': 'datalake',
    'spark.sql.catalog.datalake': 'org.apache.iceberg.'
        +'spark.SparkCatalog',
    'spark.sql.catalog.datalake.warehouse': 's3://MY_BUCKET_NAME/dbs',
    'spark.sql.catalog.datalake.catalog-impl':
        'org.apache.iceberg.aws.glue.GlueCatalog',
    'spark.sql.catalog.datalake.io-impl':
        'org.apache.iceberg.aws.s3.S3FileIO',
    'spark.kafka.session.timeout.ms': '1800000',`
kangyang-wang commented 11 months ago

Got the same issue here while trying to write to s3... Any solutions?

github-actions[bot] commented 1 month ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] commented 2 weeks ago

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'