delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.59k stars 1.7k forks source link

[QUESTION] Sink in delta not working as expected #1590

Closed Y3NK closed 1 year ago

Y3NK commented 1 year ago

Bug

Dear all, I am trying to sink a simple pySpark DataFrame in HDFS using delta format, until now without success ...

The session creation is working, I added the 2 delta dependencies:

Previously tried to use the; spark = configure_spark_with_delta_pip(builder).getOrCreate() but since I am using explicit .jar it should not be necessary, right ? (I had issues with .ivy dependencies...)

the sink to HDFS using parquet is working, so I think HDFS & grant permission are fine...

Have you any idea what could be wrong ? Scala 2.12, Spark 3.3.0 and delta 2.2.0 should be compatible if I follow the matrix of compatibility.

I tried with .config("spark.delta.logStore.class","org.apache.spark.sql.delta.storage.HDFSLogStore") and without...

Thank you

...

import pandas as pd
import numpy as np
import findspark

from delta import *

def get_spark_session(name,core=6,memory="1g"):
    findspark.init()
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars spark-sql-kafka-0-10_2.12-3.3.0.jar,spark-streaming-kafka-0-10_2.12-3.3.0.jar,kafka-clients-3.2.0.jar,spark-token-provider-kafka-0-10_2.12-3.3.0.jar,spark-tags_2.12-3.3.0.jar,commons-pool2-2.11.1.jar,mssql-jdbc-10.2.1.jre8.jar,,ojdbc8.jar,xdb.jar,xmlparserv2.jar,delta-core_2.12-2.2.0.jar,delta-storage-2.2.0.jar pyspark-shell'

    my_session_spark = (
        SparkSession.builder.appName(name)
        .config("spark.executor.cores",core).config("spark.memory.offHeap.size",memory) 
        .config("spark.jars", "spark-sql-kafka-0-10_2.12-3.3.0.jar,spark-streaming-kafka-0-10_2.12-3.3.0.jar,kafka-clients-3.2.0.jar,spark-token-provider-kafka-0-10_2.12-3.3.0.jar,spark-tags_2.12-3.3.0.jar,commons-pool2-2.11.1.jar,mssql-jdbc-10.2.1.jre8.jar,,ojdbc8.jar,xdb.jar,xmlparserv2.jar,delta-core_2.12-2.2.0.jar,delta-storage-2.2.0.jar")
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") 
        .config("spark.logConf", "true")
        .config("spark.scheduler.mode", "FAIR") #file config under /apps/spark-3.3.0-bin-hadoop3/conf/fairscheduler.xml  #full info over there: https://spark.apache.org/docs/latest/job-scheduling.html
        .config("spark.scheduler.pool", "production")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        # .config("spark.delta.logStore.class","org.apache.spark.sql.delta.storage.HDFSLogStore")
        .getOrCreate() 
    )   
    # sc = my_session_spark.sparkContext.setLogLevel("ERROR") #("INFO")
    return my_session_spark

spark_session = get_spark_session("delta",core=2,memory="6g")
print("Session creation okay")
dfp = pd.DataFrame([["Alpha","One"],["Bravo","Two"]],columns=["Col1","Col2"])
display(dfp)
print("Pandas dataFrame creation okay")

dfs = spark_session.createDataFrame(dfp)
dfs.show(3)
print("Spark DataFrame creation okay")

dfs.write.format("parquet").save("hdfs://xxx10.corp:9000/user/yen/test5.parquet")
print("Sink to HDFS okay")

print("Try to sink delta on HDFS ...")
dfs.write.format("delta").save("hdfs://xxx10.corp:9000/user/yen/test6.delta")
print("Sink to delta ... ")

Jupyterlab output: image

full Exception output:

java.lang.AssertionError: unsafe symbol Unstable (child of package annotation) in runtime reflection universe
    at scala.reflect.internal.Symbols$Symbol.<init>(Symbols.scala:231)
    at scala.reflect.internal.Symbols$TypeSymbol.<init>(Symbols.scala:3102)
    at scala.reflect.internal.Symbols$ClassSymbol.<init>(Symbols.scala:3288)
    at scala.reflect.internal.Symbols$StubClassSymbol.<init>(Symbols.scala:3559)
    at scala.reflect.internal.Symbols.newStubSymbol(Symbols.scala:214)
    at scala.reflect.internal.Symbols.newStubSymbol$(Symbols.scala:210)
    at scala.reflect.internal.SymbolTable.newStubSymbol(SymbolTable.scala:28)
    at scala.reflect.internal.Symbols$Symbol.newStubSymbol(Symbols.scala:534)
    at scala.reflect.internal.pickling.UnPickler$Scan.$anonfun$readSymbol$6(UnPickler.scala:273)
    at scala.reflect.internal.pickling.UnPickler$Scan.$anonfun$readSymbol$5(UnPickler.scala:260)
    at scala.reflect.internal.pickling.UnPickler$Scan.$anonfun$readSymbol$4(UnPickler.scala:260)
    at scala.reflect.internal.pickling.UnPickler$Scan.readExtSymbol$1(UnPickler.scala:258)
    at scala.reflect.internal.pickling.UnPickler$Scan.readSymbol(UnPickler.scala:281)
    at scala.reflect.internal.pickling.UnPickler$Scan.readSymbolRef(UnPickler.scala:645)
    at scala.reflect.internal.pickling.UnPickler$Scan.readType(UnPickler.scala:413)
    at scala.reflect.internal.pickling.UnPickler$Scan.$anonfun$readTypeRef$1(UnPickler.scala:654)
    at scala.reflect.internal.pickling.UnPickler$Scan.at(UnPickler.scala:188)
    at scala.reflect.internal.pickling.UnPickler$Scan.readTypeRef(UnPickler.scala:654)
    at scala.reflect.internal.pickling.UnPickler$Scan.readAnnotationInfo(UnPickler.scala:488)
    at scala.reflect.internal.pickling.UnPickler$Scan.readSymbolAnnotation(UnPickler.scala:511)
    at scala.reflect.internal.pickling.UnPickler$Scan.$anonfun$run$2(UnPickler.scala:106)
    at scala.reflect.internal.pickling.UnPickler$Scan.run(UnPickler.scala:88)
    at scala.reflect.internal.pickling.UnPickler.unpickle(UnPickler.scala:47)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.unpickleClass(JavaMirrors.scala:660)
    at scala.reflect.runtime.SymbolLoaders$TopClassCompleter.$anonfun$complete$2(SymbolLoaders.scala:37)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.reflect.internal.SymbolTable.slowButSafeEnteringPhaseNotLaterThan(SymbolTable.scala:333)
    at scala.reflect.runtime.SymbolLoaders$TopClassCompleter.complete(SymbolLoaders.scala:34)
    at scala.reflect.internal.Symbols$Symbol.completeInfo(Symbols.scala:1551)
    at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$13.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:221)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.$anonfun$info$1(SynchronizedSymbols.scala:158)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info(SynchronizedSymbols.scala:149)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info$(SynchronizedSymbols.scala:158)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$13.info(SynchronizedSymbols.scala:221)
    at scala.reflect.internal.SymbolTable.openPackageModule(SymbolTable.scala:356)
    at scala.reflect.internal.SymbolTable.openPackageModule(SymbolTable.scala:411)
    at scala.reflect.runtime.SymbolLoaders$LazyPackageType.$anonfun$complete$3(SymbolLoaders.scala:83)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.reflect.internal.SymbolTable.slowButSafeEnteringPhaseNotLaterThan(SymbolTable.scala:333)
    at scala.reflect.runtime.SymbolLoaders$LazyPackageType.complete(SymbolLoaders.scala:80)
    at scala.reflect.internal.Symbols$Symbol.completeInfo(Symbols.scala:1551)
    at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$9.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:209)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.$anonfun$info$1(SynchronizedSymbols.scala:158)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info(SynchronizedSymbols.scala:149)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info$(SynchronizedSymbols.scala:158)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$9.info(SynchronizedSymbols.scala:209)
    at scala.reflect.internal.Types$TypeRef.decls(Types.scala:2283)
    at scala.reflect.internal.Types$Type.findDecl(Types.scala:998)
    at scala.reflect.internal.Types$Type.decl(Types.scala:597)
    at scala.reflect.runtime.SymbolLoaders$PackageScope.$anonfun$lookupEntry$1(SymbolLoaders.scala:154)
    at scala.reflect.runtime.SymbolLoaders$PackageScope.syncLockSynchronized(SymbolLoaders.scala:133)
    at scala.reflect.runtime.SymbolLoaders$PackageScope.lookupEntry(SymbolLoaders.scala:135)
    at scala.reflect.internal.tpe.FindMembers$FindMemberBase.walkBaseClasses(FindMembers.scala:110)
    at scala.reflect.internal.tpe.FindMembers$FindMemberBase.searchConcreteThenDeferred(FindMembers.scala:75)
    at scala.reflect.internal.tpe.FindMembers$FindMemberBase.apply(FindMembers.scala:55)
    at scala.reflect.internal.Types$Type.$anonfun$findMember$1(Types.scala:1043)
    at scala.reflect.internal.Types$Type.findMemberInternal$1(Types.scala:1041)
    at scala.reflect.internal.Types$Type.findMember(Types.scala:1046)
    at scala.reflect.internal.Types$Type.memberBasedOnName(Types.scala:672)
    at scala.reflect.internal.Types$Type.member(Types.scala:636)
    at scala.reflect.internal.Types$Type.packageObject(Types.scala:648)
    at scala.reflect.internal.Symbols$Symbol.packageObject(Symbols.scala:859)
    at scala.reflect.internal.SymbolTable.openPackageModule(SymbolTable.scala:405)
    at scala.reflect.runtime.SymbolLoaders$LazyPackageType.$anonfun$complete$3(SymbolLoaders.scala:83)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.reflect.internal.SymbolTable.slowButSafeEnteringPhaseNotLaterThan(SymbolTable.scala:333)
    at scala.reflect.runtime.SymbolLoaders$LazyPackageType.complete(SymbolLoaders.scala:80)
    at scala.reflect.internal.Symbols$Symbol.completeInfo(Symbols.scala:1551)
    at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$9.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:209)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.$anonfun$info$1(SynchronizedSymbols.scala:158)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info(SynchronizedSymbols.scala:149)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info$(SynchronizedSymbols.scala:158)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$9.info(SynchronizedSymbols.scala:209)
    at scala.reflect.internal.Types$TypeRef.baseClasses(Types.scala:2280)
    at scala.reflect.internal.tpe.FindMembers$FindMemberBase.init(FindMembers.scala:36)
    at scala.reflect.internal.tpe.FindMembers$FindMember.init(FindMembers.scala:257)
    at scala.reflect.internal.Types$Type.$anonfun$findMember$1(Types.scala:1042)
    at scala.reflect.internal.Types$Type.findMemberInternal$1(Types.scala:1041)
    at scala.reflect.internal.Types$Type.findMember(Types.scala:1046)
    at scala.reflect.internal.Types$Type.memberBasedOnName(Types.scala:672)
    at scala.reflect.internal.Types$Type.member(Types.scala:636)
    at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:55)
    at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:51)
    at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:29)
    at org.apache.spark.sql.catalyst.ScalaReflection$$typecreator2$5.apply(ScalaReflection.scala:730)
    at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:237)
    at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:237)
    at org.apache.spark.sql.catalyst.ScalaReflection.localTypeOf(ScalaReflection.scala:963)
    at org.apache.spark.sql.catalyst.ScalaReflection.localTypeOf$(ScalaReflection.scala:961)
    at org.apache.spark.sql.catalyst.ScalaReflection$.localTypeOf(ScalaReflection.scala:51)
    at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:730)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:718)
    at org.apache.spark.sql.catalyst.ScalaReflection$.deserializerForType(ScalaReflection.scala:171)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:56)
    at org.apache.spark.sql.delta.stats.DataSkippingReader$.<init>(DataSkippingReader.scala:177)
    at org.apache.spark.sql.delta.stats.DataSkippingReader$.<clinit>(DataSkippingReader.scala)
    at org.apache.spark.sql.delta.DeltaConfigsBase.$init$(DeltaConfig.scala:407)
    at org.apache.spark.sql.delta.DeltaConfigs$.<init>(DeltaConfig.scala:515)
    at org.apache.spark.sql.delta.DeltaConfigs$.<clinit>(DeltaConfig.scala)
    at org.apache.spark.sql.delta.InitialSnapshot.<init>(Snapshot.scala:444)
    at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$createSnapshotAtInitInternal$2(SnapshotManagement.scala:282)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotAtInitInternal(SnapshotManagement.scala:280)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotAtInitInternal$(SnapshotManagement.scala:269)
    at org.apache.spark.sql.delta.DeltaLog.createSnapshotAtInitInternal(DeltaLog.scala:67)
    at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$1(SnapshotManagement.scala:264)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:139)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:137)
    at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:67)
    at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:258)
    at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit$(SnapshotManagement.scala:257)
    at org.apache.spark.sql.delta.DeltaLog.getSnapshotAtInit(DeltaLog.scala:67)
    at org.apache.spark.sql.delta.SnapshotManagement.$init$(SnapshotManagement.scala:55)
    at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:72)
    at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$4(DeltaLog.scala:737)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
    at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$3(DeltaLog.scala:733)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:139)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:137)
    at org.apache.spark.sql.delta.DeltaLog$.recordFrameProfile(DeltaLog.scala:550)
    at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:132)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
    at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:550)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:131)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:121)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:109)
    at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:550)
    at org.apache.spark.sql.delta.DeltaLog$.createDeltaLog$1(DeltaLog.scala:732)
    at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$5(DeltaLog.scala:748)
    at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
    at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
    at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
    at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
    at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
    at org.apache.spark.sql.delta.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:748)
    at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:755)
    at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:598)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:151)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:357)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:750)
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In [3], line 47
     44 print("Sink to HDFS okay")
     46 print("Try to sink delta on HDFS ...")
---> 47 dfs.write.format("delta").save("hdfs://xxx10.corp:9000/user/yen/test6.delta")
     48 print("Sink to delta ... ")

File /apps/python/venv_edw/lib64/python3.9/site-packages/pyspark/sql/readwriter.py:968, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
    966     self._jwrite.save()
    967 else:
--> 968     self._jwrite.save(path)

File /apps/python/venv_edw/lib64/python3.9/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /apps/python/venv_edw/lib64/python3.9/site-packages/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
    188 def deco(*a: Any, **kw: Any) -> Any:
    189     try:
--> 190         return f(*a, **kw)
    191     except Py4JJavaError as e:
    192         converted = convert_exception(e.java_exception)

File /apps/python/venv_edw/lib64/python3.9/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o80.save.
: com.google.common.util.concurrent.ExecutionError: java.lang.ExceptionInInitializerError
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2261)
    at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
    at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
    at org.apache.spark.sql.delta.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:748)
    at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:755)
    at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:598)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:151)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:357)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ExceptionInInitializerError
    at org.apache.spark.sql.delta.DeltaConfigsBase.$init$(DeltaConfig.scala:407)
    at org.apache.spark.sql.delta.DeltaConfigs$.<init>(DeltaConfig.scala:515)
    at org.apache.spark.sql.delta.DeltaConfigs$.<clinit>(DeltaConfig.scala)
    at org.apache.spark.sql.delta.InitialSnapshot.<init>(Snapshot.scala:444)
    at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$createSnapshotAtInitInternal$2(SnapshotManagement.scala:282)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotAtInitInternal(SnapshotManagement.scala:280)
    at org.apache.spark.sql.delta.SnapshotManagement.createSnapshotAtInitInternal$(SnapshotManagement.scala:269)
    at org.apache.spark.sql.delta.DeltaLog.createSnapshotAtInitInternal(DeltaLog.scala:67)
    at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$1(SnapshotManagement.scala:264)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:139)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:137)
    at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:67)
    at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:258)
    at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit$(SnapshotManagement.scala:257)
    at org.apache.spark.sql.delta.DeltaLog.getSnapshotAtInit(DeltaLog.scala:67)
    at org.apache.spark.sql.delta.SnapshotManagement.$init$(SnapshotManagement.scala:55)
    at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:72)
    at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$4(DeltaLog.scala:737)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
    at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$3(DeltaLog.scala:733)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:139)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:137)
    at org.apache.spark.sql.delta.DeltaLog$.recordFrameProfile(DeltaLog.scala:550)
    at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:132)
    at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
    at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
    at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:550)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:131)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:121)
    at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:109)
    at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:550)
    at org.apache.spark.sql.delta.DeltaLog$.createDeltaLog$1(DeltaLog.scala:732)
    at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$5(DeltaLog.scala:748)
    at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
    at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
    at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
    ... 47 more
Caused by: java.lang.RuntimeException: error reading Scala signature of org.apache.spark.sql.package: unsafe symbol Unstable (child of package annotation) in runtime reflection universe
    at scala.reflect.internal.pickling.UnPickler.unpickle(UnPickler.scala:51)
    at scala.reflect.runtime.JavaMirrors$JavaMirror.unpickleClass(JavaMirrors.scala:660)
    at scala.reflect.runtime.SymbolLoaders$TopClassCompleter.$anonfun$complete$2(SymbolLoaders.scala:37)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.reflect.internal.SymbolTable.slowButSafeEnteringPhaseNotLaterThan(SymbolTable.scala:333)
    at scala.reflect.runtime.SymbolLoaders$TopClassCompleter.complete(SymbolLoaders.scala:34)
    at scala.reflect.internal.Symbols$Symbol.completeInfo(Symbols.scala:1551)
    at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$13.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:221)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.$anonfun$info$1(SynchronizedSymbols.scala:158)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info(SynchronizedSymbols.scala:149)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info$(SynchronizedSymbols.scala:158)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$13.info(SynchronizedSymbols.scala:221)
    at scala.reflect.internal.SymbolTable.openPackageModule(SymbolTable.scala:356)
    at scala.reflect.internal.SymbolTable.openPackageModule(SymbolTable.scala:411)
    at scala.reflect.runtime.SymbolLoaders$LazyPackageType.$anonfun$complete$3(SymbolLoaders.scala:83)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.reflect.internal.SymbolTable.slowButSafeEnteringPhaseNotLaterThan(SymbolTable.scala:333)
    at scala.reflect.runtime.SymbolLoaders$LazyPackageType.complete(SymbolLoaders.scala:80)
    at scala.reflect.internal.Symbols$Symbol.completeInfo(Symbols.scala:1551)
    at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$9.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:209)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.$anonfun$info$1(SynchronizedSymbols.scala:158)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info(SynchronizedSymbols.scala:149)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info$(SynchronizedSymbols.scala:158)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$9.info(SynchronizedSymbols.scala:209)
    at scala.reflect.internal.Types$TypeRef.decls(Types.scala:2283)
    at scala.reflect.internal.Types$Type.findDecl(Types.scala:998)
    at scala.reflect.internal.Types$Type.decl(Types.scala:597)
    at scala.reflect.runtime.SymbolLoaders$PackageScope.$anonfun$lookupEntry$1(SymbolLoaders.scala:154)
    at scala.reflect.runtime.SymbolLoaders$PackageScope.syncLockSynchronized(SymbolLoaders.scala:133)
    at scala.reflect.runtime.SymbolLoaders$PackageScope.lookupEntry(SymbolLoaders.scala:135)
    at scala.reflect.internal.tpe.FindMembers$FindMemberBase.walkBaseClasses(FindMembers.scala:110)
    at scala.reflect.internal.tpe.FindMembers$FindMemberBase.searchConcreteThenDeferred(FindMembers.scala:75)
    at scala.reflect.internal.tpe.FindMembers$FindMemberBase.apply(FindMembers.scala:55)
    at scala.reflect.internal.Types$Type.$anonfun$findMember$1(Types.scala:1043)
    at scala.reflect.internal.Types$Type.findMemberInternal$1(Types.scala:1041)
    at scala.reflect.internal.Types$Type.findMember(Types.scala:1046)
    at scala.reflect.internal.Types$Type.memberBasedOnName(Types.scala:672)
    at scala.reflect.internal.Types$Type.member(Types.scala:636)
    at scala.reflect.internal.Types$Type.packageObject(Types.scala:648)
    at scala.reflect.internal.Symbols$Symbol.packageObject(Symbols.scala:859)
    at scala.reflect.internal.SymbolTable.openPackageModule(SymbolTable.scala:405)
    at scala.reflect.runtime.SymbolLoaders$LazyPackageType.$anonfun$complete$3(SymbolLoaders.scala:83)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at scala.reflect.internal.SymbolTable.slowButSafeEnteringPhaseNotLaterThan(SymbolTable.scala:333)
    at scala.reflect.runtime.SymbolLoaders$LazyPackageType.complete(SymbolLoaders.scala:80)
    at scala.reflect.internal.Symbols$Symbol.completeInfo(Symbols.scala:1551)
    at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$9.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:209)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.$anonfun$info$1(SynchronizedSymbols.scala:158)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info(SynchronizedSymbols.scala:149)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info$(SynchronizedSymbols.scala:158)
    at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$9.info(SynchronizedSymbols.scala:209)
    at scala.reflect.internal.Types$TypeRef.baseClasses(Types.scala:2280)
    at scala.reflect.internal.tpe.FindMembers$FindMemberBase.init(FindMembers.scala:36)
    at scala.reflect.internal.tpe.FindMembers$FindMember.init(FindMembers.scala:257)
    at scala.reflect.internal.Types$Type.$anonfun$findMember$1(Types.scala:1042)
    at scala.reflect.internal.Types$Type.findMemberInternal$1(Types.scala:1041)
    at scala.reflect.internal.Types$Type.findMember(Types.scala:1046)
    at scala.reflect.internal.Types$Type.memberBasedOnName(Types.scala:672)
    at scala.reflect.internal.Types$Type.member(Types.scala:636)
    at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:55)
    at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:51)
    at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:29)
    at org.apache.spark.sql.catalyst.ScalaReflection$$typecreator2$5.apply(ScalaReflection.scala:730)
    at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:237)
    at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:237)
    at org.apache.spark.sql.catalyst.ScalaReflection.localTypeOf(ScalaReflection.scala:963)
    at org.apache.spark.sql.catalyst.ScalaReflection.localTypeOf$(ScalaReflection.scala:961)
    at org.apache.spark.sql.catalyst.ScalaReflection$.localTypeOf(ScalaReflection.scala:51)
    at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:730)
    at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:73)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects(ScalaReflection.scala:948)
    at org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects$(ScalaReflection.scala:947)
    at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:51)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:718)
    at org.apache.spark.sql.catalyst.ScalaReflection$.deserializerForType(ScalaReflection.scala:171)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:56)
    at org.apache.spark.sql.delta.stats.DataSkippingReader$.<init>(DataSkippingReader.scala:177)
    at org.apache.spark.sql.delta.stats.DataSkippingReader$.<clinit>(DataSkippingReader.scala)
    ... 86 more

Describe the problem

Impossible to sink in HDFS with delta Issues: java.lang.AssertionError: unsafe symbol Unstable (child of package annotation) in runtime reflection universe An error occurred while calling o80.save. : com.google.common.util.concurrent.ExecutionError: java.lang.ExceptionInInitializerError

Steps to reproduce

HDFS running Spark session getting created correctly sink to "simple" HDFS working fine failed when trying to use delta

Observed results

Works for simple HDFS parquet sink Does not work for delta format

Expected results

Same result with .delta than with parquet

Further details

Environment information

Java Home | /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.352.b08-2.el8_7.x86_64/jre -- | -- Java Version | 1.8.0_352 (Red Hat, Inc.) Scala Version | version 2.12.15

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

Y3NK commented 1 year ago

I tried to switch to 2.12-2.1.1 but still not working . The JARS are well present in Spark: image JAVA_HOME seems to be present, delta-spark in the correct version ...

image

Have you maybe an idea that I can try ?

Thank you

zsxwing commented 1 year ago

What's your PySpark version? This looks like an issue that there are two Spark versions mixed into the classpath.

Y3NK commented 1 year ago

What's your PySpark version? This looks like an issue that there are two Spark versions mixed into the classpath.

The spark version is 3.3.0 and scala in 2.12.15

Y3NK commented 1 year ago

Thank you @zsxwing , I removed all 3.3.0 tar extracted spark folder, install pyspark 3.3.2 through pypi, retrieve 3.3.2 spark install, update the SPARK_HOME in bashrc and sourced it, and it's seems now to be working !

Thanks