delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.39k stars 1.66k forks source link

Spark Streaming job failing on DeltaTable.forPath #593

Closed NimeshSatam closed 2 years ago

NimeshSatam commented 3 years ago

Hi,

We have been running Spark streaming jobs along with Delta Lake for past couple of months. But for last couple of days we have been getting the below error while writing to DeltaLake location on S3. This happens mainly when we try for a "merge" using the DeltaTable.forPath command. IF we try to do direct insert it working fine. Can you please let me know what we may be doing wrong here?

py4j.protocol.Py4JJavaError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath. : java.util.concurrent.ExecutionException: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:

Caused by: java.util.NoSuchElementException: None.get

This happens when we call:

df = spark.read.format("delta").load(source_bucket)

or when we call

DeltaTable.forPath

Details of the streaming job:

Apache Spark: 3.0.1 Delta Lake: 0.7.0 Python: 3.6.9 Object Store: AWS S3 openjdk version: "1.8.0_275"

Note: If we rewrite the delta location using pyspark terminal and then run the same spark job it works fine. But then again starts failing on the next run.

Can you let me know the probable cause of the error?

Pyspark Command used: PYSPARK_DRIVER_PYTHON=ipython $SPARK_HOME/bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-avro_2.12:3.0.1,za.co.absa:abris_2.12:3.2.1,io.delta:delta-core_2.12:0.7.0,org.apache.hadoop:hadoop-aws:3.2.0,org.apache.hadoop:hadoop-common:3.2.0 --repositories https://packages.confluent.io/maven/ --conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore

Regards, Nimesh.

Iamcerba commented 3 years ago

Hi @NimeshSatam, I faced exactly the same issue with 0.7.0 and 0.8.0 version of the package. Did you solve the issue? The only difference that I've tried to stream in Azure DataLake Gen2 and GSC.

zsxwing commented 3 years ago

Could you provide the full stack trace?

Iamcerba commented 3 years ago

@zsxwing this is mine:

pyspark.sql.utils.StreamingQueryException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 210, in call
    raise e
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 207, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
  File "", line 218, in __upsert_to_delta
    merge_builder.whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
  File "/app/dist/spark_core-1.0.0-py3.7.egg/core/delta/tables.py", line 542, in execute
    self._jbuilder.execute()
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 131, in deco
    return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o8296.execute.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(coalesce(add#768850.path, remove#768851.path), 50), false, [id=#3286]
+- *(1) Project [txn#768849, add#768850, remove#768851, metaData#768852, protocol#768853, cdc#768854, commitInfo#768855, UDF(input_file_name()) AS file#768857]
   +- *(1) SerializeFromObject [if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn)) null else named_struct(appId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn).appId, true, false), version, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn).version, lastUpdated, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn).lastUpdated)) AS txn#768849, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add)) null else named_struct(path, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).path, true, false), partitionValues, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -1), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -2), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).partitionValues), size, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).size, modificationTime, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).modificationTime, dataChange, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).dataChange, stats, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).stats, true, false), tags, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -3), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -3), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -4), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -4), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).tags)) AS add#768850, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove)) null else named_struct(path, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).path, true, false), deletionTimestamp, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).deletionTimestamp), dataChange, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).dataChange, extendedFileMetadata, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).extendedFileMetadata, partitionValues, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -5), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -5), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -6), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -6), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).partitionValues), size, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).size, tags, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -7), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -7), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -8), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -8), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).tags)) AS remove#768851, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData)) null else named_struct(id, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).id, true, false), name, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).name, true, false), description, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).description, true, false), format, if (isnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).format)) null else named_struct(provider, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).format).provider, true, false), options, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -9), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -9), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -10), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -10), true, false), knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).format).options)), schemaString, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).schemaString, true, false), partitionColumns, mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.String), true, -11), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(MapObject, ObjectType(class java.lang.String), true, -11), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).partitionColumns, None), configuration, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -12), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -12), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -13), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -13), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).configuration), createdTime, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).createdTime)) AS metaData#768852, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).protocol)) null else named_struct(minReaderVersion, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).protocol).minReaderVersion, minWriterVersion, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).protocol).minWriterVersion) AS protocol#768853, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc)) null else named_struct(path, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).path, true, false), partitionValues, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -14), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -14), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -15), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -15), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).partitionValues), size, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).size, tags, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -16), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -16), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -17), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -17), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).tags)) AS cdc#768854, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo)) null else named_struct(version, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).version), timestamp, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).timestamp, true, false), userId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).userId), true, false), userName, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).userName), true, false), operation, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).operation, true, false), operationParameters, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -18), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -18), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -19), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -19), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).operationParameters), job, if (isnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job))) null else named_struct(jobId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).jobId, true, false), jobName, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).jobName, true, false), runId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).runId, true, false), jobOwnerId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).jobOwnerId, true, false), triggerType, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).triggerType, true, false)), notebook, if (isnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.NotebookInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).notebook))) null else named_struct(notebookId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.NotebookInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).notebook)).notebookId, true, false)), clusterId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).clusterId), true, false), readVersion, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).readVersion), isolationLevel, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).isolationLevel), true, false), isBlindAppend, unwrapoption(BooleanType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).isBlindAppend), ... 4 more fields) AS commitInfo#768855]
      +- MapPartitions org.apache.spark.sql.delta.Snapshot$$Lambda$2648/2119995861@35ea0900, obj#768848: org.apache.spark.sql.delta.actions.SingleAction
         +- DeserializeToObject newInstance(class org.apache.spark.sql.delta.actions.SingleAction), obj#768847: org.apache.spark.sql.delta.actions.SingleAction
            +- Union
               :- FileScan parquet [txn#768788,add#768789,remove#768790,metaData#768791,protocol#768792,cdc#768793,commitInfo#768794] Batched: false, DataFilters: [], Format: Parquet, Location: DeltaLogFileIndex[gs://.../_delta_log/0000000000000000001..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<txn:struct<appId:string,version:bigint,lastUpdated:bigint>,add:struct<path:string,partitio...
               +- FileScan json [txn#768810,add#768811,remove#768812,metaData#768813,protocol#768814,cdc#768815,commitInfo#768816] Batched: false, DataFilters: [], Format: JSON, Location: DeltaLogFileIndex[gs://.../_delta_log/0000000000000000001..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<txn:struct<appId:string,version:bigint,lastUpdated:bigint>,add:struct<path:string,partitio...

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:95)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
    at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:132)
    at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:192)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
    at org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:117)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
    at org.apache.spark.sql.delta.util.StateCache$CachedDS.<init>(StateCache.scala:52)
    at org.apache.spark.sql.delta.util.StateCache.cacheDS(StateCache.scala:100)
    at org.apache.spark.sql.delta.util.StateCache.cacheDS$(StateCache.scala:99)
    at org.apache.spark.sql.delta.Snapshot.cacheDS(Snapshot.scala:55)
    at org.apache.spark.sql.delta.Snapshot.cachedState$lzycompute(Snapshot.scala:123)
    at org.apache.spark.sql.delta.Snapshot.cachedState(Snapshot.scala:122)
    at org.apache.spark.sql.delta.Snapshot.state(Snapshot.scala:126)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$computedState$1(Snapshot.scala:146)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27)
    at org.apache.spark.sql.delta.Snapshot.withStatusCode(Snapshot.scala:55)
    at org.apache.spark.sql.delta.Snapshot.computedState$lzycompute(Snapshot.scala:143)
    at org.apache.spark.sql.delta.Snapshot.computedState(Snapshot.scala:142)
    at org.apache.spark.sql.delta.Snapshot.metadata(Snapshot.scala:185)
    at org.apache.spark.sql.delta.Snapshot.toString(Snapshot.scala:296)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:131)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$new$1(Snapshot.scala:299)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$logInfo$1(Snapshot.scala:276)
    at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
    at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)
    at org.apache.spark.sql.delta.Snapshot.logInfo(Snapshot.scala:276)
    at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:299)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot(SnapshotManagement.scala:221)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot$(SnapshotManagement.scala:209)
    at org.apache.spark.sql.delta.DeltaLog.createSnapshot(DeltaLog.scala:60)
    at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$updateInternal$1(SnapshotManagement.scala:301)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
    at org.apache.spark.sql.delta.DeltaLog.recordOperation(DeltaLog.scala:60)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
    at org.apache.spark.sql.delta.DeltaLog.recordDeltaOperation(DeltaLog.scala:60)
    at org.apache.spark.sql.delta.SnapshotManagement.updateInternal(SnapshotManagement.scala:285)
    at org.apache.spark.sql.delta.SnapshotManagement.updateInternal$(SnapshotManagement.scala:284)
    at org.apache.spark.sql.delta.DeltaLog.updateInternal(DeltaLog.scala:60)
    at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$update$1(SnapshotManagement.scala:246)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:153)
    at org.apache.spark.sql.delta.SnapshotManagement.update(SnapshotManagement.scala:246)
    at org.apache.spark.sql.delta.SnapshotManagement.update$(SnapshotManagement.scala:242)
    at org.apache.spark.sql.delta.DeltaLog.update(DeltaLog.scala:60)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit(OptimisticTransaction.scala:637)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit$(OptimisticTransaction.scala:606)
    at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:81)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$2(OptimisticTransaction.scala:578)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$1(OptimisticTransaction.scala:574)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:153)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively(OptimisticTransaction.scala:569)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively$(OptimisticTransaction.scala:566)
    at org.apache.spark.sql.delta.OptimisticTransaction.doCommitRetryIteratively(OptimisticTransaction.scala:81)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commit$1(OptimisticTransaction.scala:439)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit(OptimisticTransaction.scala:390)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit$(OptimisticTransaction.scala:388)
    at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:81)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$2(MergeIntoCommand.scala:280)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$2$adapted(MergeIntoCommand.scala:250)
    at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$1(MergeIntoCommand.scala:250)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordMergeOperation(MergeIntoCommand.scala:622)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:249)
    at io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:239)
    at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:60)
    at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:48)
    at io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:123)
    at io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:228)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:529)
    at scala.None$.get(Option.scala:527)
    at org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion$lzycompute(DataSourceScanExec.scala:178)
    at org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion(DataSourceScanExec.scala:176)
    at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:462)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.UnionExec.$anonfun$doExecute$5(basicPhysicalOperators.scala:644)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:644)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:192)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
    at org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:117)
    at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:64)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:64)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:83)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:81)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:98)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 136 more
Iamcerba commented 3 years ago

Hi @zsxwing, do you need any additional details?

NimeshSatam commented 3 years ago

Hi @Iamcerba, apparently for us the issue got resolved by upgrading to 0.8.0. But not sure if it will be permanent or might occur at a later time.

Iamcerba commented 3 years ago

@zsxwing find more details below for my configuration:

Apache Spark: 3.0.0 / 3.0.1 (tried both) Delta Lake: 0.7.0 / 0.8.0 (tried both) Python: 3.7.3 Object Store: Google Cloud Storage / Azure DataLake Gen2 (tried both) OpenJDK version: 1.8.0_252

py_files:

packages:

A simplified version of the app:

import json
import uuid
from functools import reduce
from typing import Union

from pyspark.sql import DataFrame, SparkSession, Window
from pyspark.sql.functions import array_remove, col, concat, expr, lpad, row_number, rpad, split, to_date, \
    unix_timestamp, when
from pyspark.sql.types import StringType
from pyspark.sql.utils import AnalysisException

from core.delta.tables import DeltaMergeBuilder, DeltaTable  # Taken latest version for here:  https://raw.githubusercontent.com/delta-io/delta/master/python/delta/tables.py

pipeline_name: str = 'test_pipeline'
app_name = 'test_app'

app_config = { ... }

service_account_info = json.loads(app_config['service_account_info'])

spark_s = (SparkSession
           .builder
           .appName(f'{pipeline_name}_{app_name}')
           .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
           .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')
           .config('spark.sql.streaming.stopActiveRunOnRestart', 'true')
           .config('spark.sql.streaming.stopTimeout', '0')
           .config('spark.databricks.delta.schema.autoMerge.enabled', 'true')
           .config('spark.databricks.delta.optimizeWrite.enabled', 'true')
           .config('spark.databricks.delta.autoCompact.enabled', 'true')
           .config('spark.databricks.delta.vacuum.parallelDelete.enabled', 'true')
           .config('spark.databricks.delta.retentionDurationCheck.enabled', 'true')
           .config('spark.databricks.delta.merge.repartitionBeforeWrite.enabled', 'true')
           .config('spark.sql.autoBroadcastJoinThreshold', '-1')
           .config('fs.gs.auth.service.account.email', service_account_info['client_email'])
           .config('fs.gs.auth.service.account.private.key.id', service_account_info['private_key_id'])
           .config('fs.gs.auth.service.account.private.key', service_account_info['private_key'])
           .config('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
           .config('fs.AbstractFileSystem.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS')
           .config(
                'spark.jars',
                'resources/packages/jars/spark-avro_2.12-3.0.1.jar,'            
                'resources/packages/jars/gcs-connector-hadoop3-2.2.0-shaded.jar,'            
                'resources/packages/jars/spark-sql-kafka-0-10_2.12-3.0.1.jar,'
                'resources/packages/jars/kafka-clients-2.7.0.jar,'
                'resources/packages/jars/spark-token-provider-kafka-0-10_2.12-3.0.1.jar,'
                'resources/packages/jars/commons-pool2-2.9.0.jar,'
                'resources/packages/jars/delta-core_2.12-0.8.0.jar'
            )
           .getOrCreate())

kafka_conf = {
    'kafka.bootstrap.servers': app_config['connection']['bootstrap_servers'],
    'kafka.security.protocol': app_config['connection']['protocol'],
    'kafka.sasl.mechanism': app_config['connection']['mechanism'],
    'kafka.sasl.jaas.config': app_config['connection']['jaas_config'],
    'groupIdPrefix': app_config['client_id'],
    'subscribe': app_config['topic'],
    'startingOffsets': app_config['starting_offsets'],
    'includeHeaders': app_config.get('include_headers', 'true')
}

data_frame = spark_s.readStream.format('kafka').options(**kafka_conf).load()

table_location = app_config['table_location']

ID_COL_NAME: str = '_id'

def transform_events(df: DataFrame) -> DataFrame:

    # STEP 0: Prepare value
    df = (df
          .withColumn('__kafka_audit_trail',
                        expr('transform(headers, x-> CASE WHEN x.key="additional_metadata" '
                             '                        THEN decode(x.value,"UTF8") ELSE "" END)'))
          .withColumn('__kafka_audit_trail', array_remove('__kafka_audit_trail', '')[0])
          .select('value.*', '__kafka_audit_trail'))

    # STEP 1: Perform deduplication of events
    ad_window = Window.partitionBy('ObjectGUID').orderBy(col('USNChanged').desc())
    df = df.withColumn('rn', row_number().over(ad_window)).filter('rn=1').drop('rn')

    # Transformations omitted for breavity

    return df

def get_delta_table(path_to_delta_table: str) -> Union[DeltaTable, None]:

    try:
        dt: DeltaTable = DeltaTable.forPath(sparkSession=spark_s, path=path_to_delta_table)
    except AnalysisException as e:
        if 'doesn\'t exist;' in str(e).lower() or 'is not a delta table.' in str(e).lower():
            spark_logger.info(e)

            return None
        else:
            raise e

    return dt

def upload(micro_batch_df: DataFrame, batch_id: int):
    micro_batch_df = transform_events(micro_batch_df)

    id_col = app_config['delta_table']['id_col']

    if (id_col, 'string') in [(name, dtype) for name, dtype in micro_batch_df.dtypes]:
        micro_batch_df = micro_batch_df.withColumn(ID_COL_NAME, col(id_col))
    else:
        micro_batch_df = micro_batch_df.withColumn(ID_COL_NAME, expr(id_col).cast(StringType()))

    micro_batch_df = micro_batch_df.withColumn('_timestamp', unix_timestamp())

    delta_table = get_delta_table(path_to_delta_table=table_location)

    if not delta_table:
        micro_batch_df.write.format('delta').save(table_location)
    else:
        source_alias, target_alias = 'src', 'trg'

        merge_builder: DeltaMergeBuilder = (
            delta_table.alias(target_alias)
                .merge(source=micro_batch_df.alias(source_alias),
                       condition=f'{source_alias}.{ID_COL_NAME}={target_alias}.{ID_COL_NAME}')

        )

        merge_builder.whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

query = (data_frame
         .writeStream
         .trigger(processingTime='90 seconds')
         .foreachBatch(upload)
         .outputMode('append')
         .start())

query.awaitTermination()

If I proceed with a clean delta table then it fails after approx 9-11 batches (unpredictable, I assume data with id that already exists in the Delta comes to the topic) with the following error:

21/02/15 10:42:14 ERROR MicroBatchExecution: Query ... [id = c19b5886-7cd2-4b09-9ca7-aca2f31e9e6d, runId = 390b7175-36c0-40be-81d7-7ece1c826efc] terminated with error
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 210, in call
    raise e
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 207, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
  File ".../app.py", line 217, in upload
    merge_builder.whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
  File "....egg/core/delta/tables.py", line 542, in execute
    self._jbuilder.execute()
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 131, in deco
    return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o9935.execute.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(coalesce(add#916537.path, remove#916538.path), 50), false, [id=#3651]
+- *(1) Project [txn#916536, add#916537, remove#916538, metaData#916539, protocol#916540, cdc#916541, commitInfo#916542, UDF(input_file_name()) AS file#916544]
   +- *(1) SerializeFromObject [if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn)) null else named_struct(appId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn).appId, true, false), version, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn).version, lastUpdated, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn).lastUpdated)) AS txn#916536, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add)) null else named_struct(path, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).path, true, false), partitionValues, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -1), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -2), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).partitionValues), size, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).size, modificationTime, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).modificationTime, dataChange, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).dataChange, stats, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).stats, true, false), tags, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -3), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -3), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -4), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -4), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).tags)) AS add#916537, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove)) null else named_struct(path, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).path, true, false), deletionTimestamp, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).deletionTimestamp), dataChange, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).dataChange, extendedFileMetadata, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).extendedFileMetadata, partitionValues, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -5), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -5), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -6), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -6), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).partitionValues), size, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).size, tags, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -7), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -7), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -8), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -8), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).tags)) AS remove#916538, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData)) null else named_struct(id, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).id, true, false), name, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).name, true, false), description, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).description, true, false), format, if (isnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).format)) null else named_struct(provider, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).format).provider, true, false), options, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -9), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -9), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -10), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -10), true, false), knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).format).options)), schemaString, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).schemaString, true, false), partitionColumns, mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.String), true, -11), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(MapObject, ObjectType(class java.lang.String), true, -11), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).partitionColumns, None), configuration, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -12), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -12), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -13), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -13), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).configuration), createdTime, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).createdTime)) AS metaData#916539, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).protocol)) null else named_struct(minReaderVersion, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).protocol).minReaderVersion, minWriterVersion, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).protocol).minWriterVersion) AS protocol#916540, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc)) null else named_struct(path, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).path, true, false), partitionValues, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -14), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -14), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -15), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -15), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).partitionValues), size, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).size, tags, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -16), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -16), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -17), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -17), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).tags)) AS cdc#916541, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo)) null else named_struct(version, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).version), timestamp, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).timestamp, true, false), userId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).userId), true, false), userName, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).userName), true, false), operation, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).operation, true, false), operationParameters, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -18), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -18), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -19), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -19), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).operationParameters), job, if (isnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job))) null else named_struct(jobId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).jobId, true, false), jobName, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).jobName, true, false), runId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).runId, true, false), jobOwnerId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).jobOwnerId, true, false), triggerType, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).triggerType, true, false)), notebook, if (isnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.NotebookInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).notebook))) null else named_struct(notebookId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.NotebookInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).notebook)).notebookId, true, false)), clusterId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).clusterId), true, false), readVersion, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).readVersion), isolationLevel, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).isolationLevel), true, false), isBlindAppend, unwrapoption(BooleanType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).isBlindAppend), ... 4 more fields) AS commitInfo#916542]
      +- MapPartitions org.apache.spark.sql.delta.Snapshot$$Lambda$3335/1361813805@7397c836, obj#916535: org.apache.spark.sql.delta.actions.SingleAction
         +- DeserializeToObject newInstance(class org.apache.spark.sql.delta.actions.SingleAction), obj#916534: org.apache.spark.sql.delta.actions.SingleAction
            +- Union
               :- FileScan parquet [txn#916475,add#916476,remove#916477,metaData#916478,protocol#916479,cdc#916480,commitInfo#916481] Batched: false, DataFilters: [], Format: Parquet, Location: DeltaLogFileIndex[gs://.../_delta_log/0000000000000000001..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<txn:struct<appId:string,version:bigint,lastUpdated:bigint>,add:struct<path:string,partitio...
               +- FileScan json [txn#916497,add#916498,remove#916499,metaData#916500,protocol#916501,cdc#916502,commitInfo#916503] Batched: false, DataFilters: [], Format: JSON, Location: DeltaLogFileIndex[gs://.../_delta_log/0000000000000000001..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<txn:struct<appId:string,version:bigint,lastUpdated:bigint>,add:struct<path:string,partitio...

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:95)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
    at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:132)
    at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:192)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
    at org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:117)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
    at org.apache.spark.sql.delta.util.StateCache$CachedDS.<init>(StateCache.scala:52)
    at org.apache.spark.sql.delta.util.StateCache.cacheDS(StateCache.scala:100)
    at org.apache.spark.sql.delta.util.StateCache.cacheDS$(StateCache.scala:99)
    at org.apache.spark.sql.delta.Snapshot.cacheDS(Snapshot.scala:55)
    at org.apache.spark.sql.delta.Snapshot.cachedState$lzycompute(Snapshot.scala:123)
    at org.apache.spark.sql.delta.Snapshot.cachedState(Snapshot.scala:122)
    at org.apache.spark.sql.delta.Snapshot.state(Snapshot.scala:126)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$computedState$1(Snapshot.scala:146)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27)
    at org.apache.spark.sql.delta.Snapshot.withStatusCode(Snapshot.scala:55)
    at org.apache.spark.sql.delta.Snapshot.computedState$lzycompute(Snapshot.scala:143)
    at org.apache.spark.sql.delta.Snapshot.computedState(Snapshot.scala:142)
    at org.apache.spark.sql.delta.Snapshot.metadata(Snapshot.scala:185)
    at org.apache.spark.sql.delta.Snapshot.toString(Snapshot.scala:296)
    at java.lang.String.valueOf(String.java:2994)
    at java.lang.StringBuilder.append(StringBuilder.java:131)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$new$1(Snapshot.scala:299)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$logInfo$1(Snapshot.scala:276)
    at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
    at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)
    at org.apache.spark.sql.delta.Snapshot.logInfo(Snapshot.scala:276)
    at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:299)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot(SnapshotManagement.scala:221)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot$(SnapshotManagement.scala:209)
    at org.apache.spark.sql.delta.DeltaLog.createSnapshot(DeltaLog.scala:60)
    at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$updateInternal$1(SnapshotManagement.scala:301)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
    at org.apache.spark.sql.delta.DeltaLog.recordOperation(DeltaLog.scala:60)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
    at org.apache.spark.sql.delta.DeltaLog.recordDeltaOperation(DeltaLog.scala:60)
    at org.apache.spark.sql.delta.SnapshotManagement.updateInternal(SnapshotManagement.scala:285)
    at org.apache.spark.sql.delta.SnapshotManagement.updateInternal$(SnapshotManagement.scala:284)
    at org.apache.spark.sql.delta.DeltaLog.updateInternal(DeltaLog.scala:60)
    at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$update$1(SnapshotManagement.scala:246)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:153)
    at org.apache.spark.sql.delta.SnapshotManagement.update(SnapshotManagement.scala:246)
    at org.apache.spark.sql.delta.SnapshotManagement.update$(SnapshotManagement.scala:242)
    at org.apache.spark.sql.delta.DeltaLog.update(DeltaLog.scala:60)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit(OptimisticTransaction.scala:637)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit$(OptimisticTransaction.scala:606)
    at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:81)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$2(OptimisticTransaction.scala:578)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$1(OptimisticTransaction.scala:574)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:153)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively(OptimisticTransaction.scala:569)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively$(OptimisticTransaction.scala:566)
    at org.apache.spark.sql.delta.OptimisticTransaction.doCommitRetryIteratively(OptimisticTransaction.scala:81)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commit$1(OptimisticTransaction.scala:439)
    at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:81)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:81)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit(OptimisticTransaction.scala:390)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit$(OptimisticTransaction.scala:388)
    at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:81)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$2(MergeIntoCommand.scala:280)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$2$adapted(MergeIntoCommand.scala:250)
    at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$1(MergeIntoCommand.scala:250)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordMergeOperation(MergeIntoCommand.scala:622)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:249)
    at io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:239)
    at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:60)
    at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:48)
    at io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:123)
    at io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:228)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:529)
    at scala.None$.get(Option.scala:527)
    at org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion$lzycompute(DataSourceScanExec.scala:178)
    at org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion(DataSourceScanExec.scala:176)
    at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:462)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.UnionExec.$anonfun$doExecute$5(basicPhysicalOperators.scala:644)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:644)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:192)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
    at org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:117)
    at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:64)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:64)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:83)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:81)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:98)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 136 more

    at py4j.Protocol.getReturnValue(Protocol.java:476)
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
    at com.sun.proxy.$Proxy25.call(Unknown Source)
    at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:56)
    at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:56)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:36)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:573)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:571)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:571)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)

On the next attempt it always fails during execution of the "DeltaTable.forPath" method as described in the first post:

StreamingQueryException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 207, in call
    raise e
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 204, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
  File "<ipython-input-3-095231659335>", line 393, in upload
    delta_table = get_delta_table(path_to_delta_table=table_location)
  File "<ipython-input-3-095231659335>", line 365, in get_delta_table
    dt: DeltaTable = DeltaTable.forPath(sparkSession=spark_s, path=path_to_delta_table)
  File ".../core/delta/tables.py", line 311, in forPath
    jdt = sparkSession._sc._jvm.io.delta.tables.DeltaTable.forPath(
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 128, in deco
    return f(*a, **kw)
  File "/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath.
: java.util.concurrent.ExecutionException: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(coalesce(add#71148.path, remove#71149.path), 50), false, [id=#100]
+- *(1) Project [txn#71147, add#71148, remove#71149, metaData#71150, protocol#71151, cdc#71152, commitInfo#71153, UDF(input_file_name()) AS file#71155]
   +- *(1) SerializeFromObject [if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn)) null else named_struct(appId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn).appId, true, false), version, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn).version, lastUpdated, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn).lastUpdated)) AS txn#71147, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add)) null else named_struct(path, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).path, true, false), partitionValues, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -1), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -2), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).partitionValues), size, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).size, modificationTime, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).modificationTime, dataChange, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).dataChange, stats, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).stats, true, false), tags, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -3), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -3), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -4), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -4), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).tags)) AS add#71148, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove)) null else named_struct(path, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).path, true, false), deletionTimestamp, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).deletionTimestamp), dataChange, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).dataChange, extendedFileMetadata, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).extendedFileMetadata, partitionValues, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -5), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -5), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -6), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -6), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).partitionValues), size, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).size, tags, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -7), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -7), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -8), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -8), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).tags)) AS remove#71149, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData)) null else named_struct(id, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).id, true, false), name, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).name, true, false), description, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).description, true, false), format, if (isnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).format)) null else named_struct(provider, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).format).provider, true, false), options, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -9), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -9), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -10), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -10), true, false), knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).format).options)), schemaString, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).schemaString, true, false), partitionColumns, mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.String), true, -11), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(MapObject, ObjectType(class java.lang.String), true, -11), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).partitionColumns, None), configuration, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -12), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -12), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -13), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -13), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).configuration), createdTime, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).createdTime)) AS metaData#71150, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).protocol)) null else named_struct(minReaderVersion, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).protocol).minReaderVersion, minWriterVersion, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).protocol).minWriterVersion) AS protocol#71151, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc)) null else named_struct(path, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).path, true, false), partitionValues, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -14), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -14), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -15), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -15), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).partitionValues), size, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).size, tags, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -16), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -16), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -17), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -17), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).tags)) AS cdc#71152, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo)) null else named_struct(version, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).version), timestamp, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).timestamp, true, false), userId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).userId), true, false), userName, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).userName), true, false), operation, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).operation, true, false), operationParameters, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -18), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -18), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -19), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -19), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).operationParameters), job, if (isnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job))) null else named_struct(jobId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).jobId, true, false), jobName, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).jobName, true, false), runId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).runId, true, false), jobOwnerId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).jobOwnerId, true, false), triggerType, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).triggerType, true, false)), notebook, if (isnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.NotebookInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).notebook))) null else named_struct(notebookId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.NotebookInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).notebook)).notebookId, true, false)), clusterId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).clusterId), true, false), readVersion, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).readVersion), isolationLevel, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).isolationLevel), true, false), isBlindAppend, unwrapoption(BooleanType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).isBlindAppend), ... 4 more fields) AS commitInfo#71153]
      +- MapPartitions org.apache.spark.sql.delta.Snapshot$$Lambda$3347/0x0000000841486840@56df0e41, obj#71146: org.apache.spark.sql.delta.actions.SingleAction
         +- DeserializeToObject newInstance(class org.apache.spark.sql.delta.actions.SingleAction), obj#71145: org.apache.spark.sql.delta.actions.SingleAction
            +- Union
               :- FileScan parquet [txn#71079,add#71080,remove#71081,metaData#71082,protocol#71083,cdc#71084,commitInfo#71085] Batched: false, DataFilters: [], Format: Parquet, Location: DeltaLogFileIndex[gs://.../_delta_log/0000000000000000001..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<txn:struct<appId:string,version:bigint,lastUpdated:bigint>,add:struct<path:string,partitio...
               +- FileScan json [txn#71108,add#71109,remove#71110,metaData#71111,protocol#71112,cdc#71113,commitInfo#71114] Batched: false, DataFilters: [], Format: JSON, Location: DeltaLogFileIndex[gs://.../_delta_log/0000000000000000001..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<txn:struct<appId:string,version:bigint,lastUpdated:bigint>,add:struct<path:string,partitio...

    at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
    at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
    at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
    at com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
    at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
    at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
    at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
    at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:467)
    at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:404)
    at org.apache.spark.sql.delta.catalog.DeltaTableV2.deltaLog$lzycompute(DeltaTableV2.scala:71)
    at org.apache.spark.sql.delta.catalog.DeltaTableV2.deltaLog(DeltaTableV2.scala:71)
    at org.apache.spark.sql.delta.catalog.DeltaTableV2.toBaseRelation(DeltaTableV2.scala:134)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:171)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:284)
    at scala.Option.map(Option.scala:230)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:248)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232)
    at io.delta.tables.DeltaTable$.forPath(DeltaTable.scala:676)
    at io.delta.tables.DeltaTable.forPath(DeltaTable.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(coalesce(add#71148.path, remove#71149.path), 50), false, [id=#100]
+- *(1) Project [txn#71147, add#71148, remove#71149, metaData#71150, protocol#71151, cdc#71152, commitInfo#71153, UDF(input_file_name()) AS file#71155]
   +- *(1) SerializeFromObject [if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn)) null else named_struct(appId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn).appId, true, false), version, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn).version, lastUpdated, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn).lastUpdated)) AS txn#71147, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add)) null else named_struct(path, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).path, true, false), partitionValues, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -1), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -1), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -2), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -2), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).partitionValues), size, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).size, modificationTime, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).modificationTime, dataChange, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).dataChange, stats, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).stats, true, false), tags, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -3), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -3), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -4), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -4), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).add).tags)) AS add#71148, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove)) null else named_struct(path, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).path, true, false), deletionTimestamp, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).deletionTimestamp), dataChange, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).dataChange, extendedFileMetadata, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).extendedFileMetadata, partitionValues, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -5), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -5), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -6), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -6), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).partitionValues), size, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).size, tags, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -7), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -7), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -8), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -8), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).remove).tags)) AS remove#71149, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData)) null else named_struct(id, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).id, true, false), name, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).name, true, false), description, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).description, true, false), format, if (isnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).format)) null else named_struct(provider, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).format).provider, true, false), options, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -9), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -9), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -10), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -10), true, false), knownnotnull(knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).format).options)), schemaString, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).schemaString, true, false), partitionColumns, mapobjects(lambdavariable(MapObject, ObjectType(class java.lang.String), true, -11), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(MapObject, ObjectType(class java.lang.String), true, -11), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).partitionColumns, None), configuration, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -12), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -12), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -13), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -13), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).configuration), createdTime, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).metaData).createdTime)) AS metaData#71150, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).protocol)) null else named_struct(minReaderVersion, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).protocol).minReaderVersion, minWriterVersion, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).protocol).minWriterVersion) AS protocol#71151, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc)) null else named_struct(path, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).path, true, false), partitionValues, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -14), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -14), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -15), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -15), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).partitionValues), size, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).size, tags, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -16), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -16), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -17), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -17), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).cdc).tags)) AS cdc#71152, if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo)) null else named_struct(version, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).version), timestamp, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).timestamp, true, false), userId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).userId), true, false), userName, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).userName), true, false), operation, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).operation, true, false), operationParameters, externalmaptocatalyst(lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -18), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_key, ObjectType(class java.lang.String), true, -18), true, false), lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -19), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(ExternalMapToCatalyst_value, ObjectType(class java.lang.String), true, -19), true, false), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).operationParameters), job, if (isnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job))) null else named_struct(jobId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).jobId, true, false), jobName, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).jobName, true, false), runId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).runId, true, false), jobOwnerId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).jobOwnerId, true, false), triggerType, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.JobInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).job)).triggerType, true, false)), notebook, if (isnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.NotebookInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).notebook))) null else named_struct(notebookId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(unwrapoption(ObjectType(class org.apache.spark.sql.delta.actions.NotebookInfo), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).notebook)).notebookId, true, false)), clusterId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).clusterId), true, false), readVersion, unwrapoption(LongType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).readVersion), isolationLevel, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, unwrapoption(ObjectType(class java.lang.String), knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).isolationLevel), true, false), isBlindAppend, unwrapoption(BooleanType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).isBlindAppend), ... 4 more fields) AS commitInfo#71153]
      +- MapPartitions org.apache.spark.sql.delta.Snapshot$$Lambda$3347/0x0000000841486840@56df0e41, obj#71146: org.apache.spark.sql.delta.actions.SingleAction
         +- DeserializeToObject newInstance(class org.apache.spark.sql.delta.actions.SingleAction), obj#71145: org.apache.spark.sql.delta.actions.SingleAction
            +- Union
               :- FileScan parquet [txn#71079,add#71080,remove#71081,metaData#71082,protocol#71083,cdc#71084,commitInfo#71085] Batched: false, DataFilters: [], Format: Parquet, Location: DeltaLogFileIndex[gs://cip-dev-datalake/delta-lake/ad/user_profile/_delta_log/0000000000000000001..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<txn:struct<appId:string,version:bigint,lastUpdated:bigint>,add:struct<path:string,partitio...
               +- FileScan json [txn#71108,add#71109,remove#71110,metaData#71111,protocol#71112,cdc#71113,commitInfo#71114] Batched: false, DataFilters: [], Format: JSON, Location: DeltaLogFileIndex[gs://cip-dev-datalake/delta-lake/ad/user_profile/_delta_log/0000000000000000001..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<txn:struct<appId:string,version:bigint,lastUpdated:bigint>,add:struct<path:string,partitio...

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
    at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:132)
    at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:192)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
    at org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:117)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
    at org.apache.spark.sql.delta.util.StateCache$CachedDS.<init>(StateCache.scala:52)
    at org.apache.spark.sql.delta.util.StateCache.cacheDS(StateCache.scala:100)
    at org.apache.spark.sql.delta.util.StateCache.cacheDS$(StateCache.scala:99)
    at org.apache.spark.sql.delta.Snapshot.cacheDS(Snapshot.scala:55)
    at org.apache.spark.sql.delta.Snapshot.cachedState$lzycompute(Snapshot.scala:123)
    at org.apache.spark.sql.delta.Snapshot.cachedState(Snapshot.scala:122)
    at org.apache.spark.sql.delta.Snapshot.state(Snapshot.scala:126)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$computedState$1(Snapshot.scala:146)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withJobDescription(DeltaProgressReporter.scala:53)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:32)
    at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:27)
    at org.apache.spark.sql.delta.Snapshot.withStatusCode(Snapshot.scala:55)
    at org.apache.spark.sql.delta.Snapshot.computedState$lzycompute(Snapshot.scala:143)
    at org.apache.spark.sql.delta.Snapshot.computedState(Snapshot.scala:142)
    at org.apache.spark.sql.delta.Snapshot.metadata(Snapshot.scala:185)
    at org.apache.spark.sql.delta.Snapshot.toString(Snapshot.scala:296)
    at java.base/java.lang.String.valueOf(String.java:2951)
    at java.base/java.lang.StringBuilder.append(StringBuilder.java:168)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$new$1(Snapshot.scala:299)
    at org.apache.spark.sql.delta.Snapshot.$anonfun$logInfo$1(Snapshot.scala:276)
    at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
    at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)
    at org.apache.spark.sql.delta.Snapshot.logInfo(Snapshot.scala:276)
    at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:299)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot(SnapshotManagement.scala:221)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshot$(SnapshotManagement.scala:209)
    at org.apache.spark.sql.delta.DeltaLog.createSnapshot(DeltaLog.scala:60)
    at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:193)
    at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit$(SnapshotManagement.scala:184)
    at org.apache.spark.sql.delta.DeltaLog.getSnapshotAtInit(DeltaLog.scala:60)
    at org.apache.spark.sql.delta.SnapshotManagement.$init$(SnapshotManagement.scala:47)
    at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:64)
    at org.apache.spark.sql.delta.DeltaLog$$anon$3.$anonfun$call$2(DeltaLog.scala:471)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
    at org.apache.spark.sql.delta.DeltaLog$$anon$3.$anonfun$call$1(DeltaLog.scala:471)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
    at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:368)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:103)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:89)
    at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:368)
    at org.apache.spark.sql.delta.DeltaLog$$anon$3.call(DeltaLog.scala:470)
    at org.apache.spark.sql.delta.DeltaLog$$anon$3.call(DeltaLog.scala:467)
    at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
    at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
    ... 29 more
Caused by: java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:529)
    at scala.None$.get(Option.scala:527)
    at org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion$lzycompute(DataSourceScanExec.scala:178)
    at org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion(DataSourceScanExec.scala:176)
    at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:463)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.UnionExec.$anonfun$doExecute$5(basicPhysicalOperators.scala:644)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:644)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:192)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
    at org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:117)
    at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:106)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:106)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:139)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:137)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:154)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 118 more
tmszk commented 3 years ago

Is there any update on this issue?

I also have this issue when running very similar code (reading from kafka and using DeltaMergeBuilder within a forEachBatch to update or insert in a Delta table). I can also confirm it runs fine until +/- 10 micro-batches on an empty table, so indeed probably an issue during an update.

Libs: Spark 3.0.1 delta-core_2.12-0.8.0 spark-sql-kafka-0-10_2.12-3.0.1

Error (same as above)


py4j.protocol.Py4JJavaError: An error occurred while calling o133.execute.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(coalesce(add#719.path, remove#720.path), 50), false, [id=#411]
+- *(1) Project [txn#718, add#719, remove#720, metaData#721, protocol#722, cdc#723, commitInfo#724, UDF(input_file_name()) AS file#726]
   +- *(1) SerializeFromObject [if (isnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn)) null else named_struct(appId, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).txn).appId, true, false), version, knownnotnull(... knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).isolationLevel), true, false), isBlindAppend, unwrapoption(BooleanType, knownnotnull(knownnotnull(assertnotnull(input[0, org.apache.spark.sql.delta.actions.SingleAction, true])).commitInfo).isBlindAppend), ... 4 more fields) AS commitInfo#724]
      +- MapPartitions org.apache.spark.sql.delta.Snapshot$$Lambda$1771/2101829517@27baa03e, obj#717: org.apache.spark.sql.delta.actions.SingleAction
         +- DeserializeToObject newInstance(class org.apache.spark.sql.delta.actions.SingleAction), obj#716: org.apache.spark.sql.delta.actions.SingleAction
            +- Union
               :- FileScan parquet [txn#657,add#658,remove#659,metaData#660,protocol#661,cdc#662,commitInfo#663] Batched: false, DataFilters: [], Format: Parquet, Location: DeltaLogFileIndex[s3a://dpg-data-test/source/events/profile-aggregation/daily-profiles/_delta_log..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<txn:struct<appId:string,version:bigint,lastUpdated:bigint>,add:struct<path:string,partitio...
               +- FileScan json [txn#679,add#680,remove#681,metaData#682,protocol#683,cdc#684,commitInfo#685] Batched: false, DataFilters: [], Format: JSON, Location: DeltaLogFileIndex[s3a://dpg-data-test/source/events/profile-aggregation/daily-profiles/_delta_log..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<txn:struct<appId:string,version:bigint,lastUpdated:bigint>,add:struct<path:string,partitio...

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
    at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:132)
    at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:192)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
    at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
    at org.apache.spark.sql.execution.SerializeFromObjectExec.inputRDDs(objects.scala:117)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
...
    at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$run$1(MergeIntoCommand.scala:250)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordMergeOperation(MergeIntoCommand.scala:622)
    at org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:249)
    at io.delta.tables.DeltaMergeBuilder.$anonfun$execute$1(DeltaMergeBuilder.scala:239)
    at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError(AnalysisHelper.scala:60)
    at org.apache.spark.sql.delta.util.AnalysisHelper.improveUnsupportedOpError$(AnalysisHelper.scala:48)
    at io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError(DeltaMergeBuilder.scala:123)
    at io.delta.tables.DeltaMergeBuilder.execute(DeltaMergeBuilder.scala:228)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:529)
    at scala.None$.get(Option.scala:527)
    at org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion$lzycompute(DataSourceScanExec.scala:178)
    at org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion(DataSourceScanExec.scala:176)
    at org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:463)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.UnionExec.$anonfun$doExecute$5(basicPhysicalOperators.scala:644)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:644)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.MapPartitionsExec.doExecute(objects.scala:192)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
...
tmszk commented 3 years ago

I have a found a solution which works for me: setting this environment variable (before you start spark context): PYSPARK_PIN_THREAD=true

Issue seems to be caused by running concurrent jobs in Pyspark. The Python threads are by default not in sync with the JVM threads during concurrent jobs, which would explain the "None.get". Setting this env var makes Python in sync with JVM.

According to the doc it's still an experimental feature but at least it works now.

scottsand-db commented 2 years ago

Closing this due to inactivity. Please reopen if this issue is still relevant and/or reproducible on the latest version of Delta.