apache / incubator-xtable

Apache XTable (incubating) is a cross-table converter for lakehouse table formats that facilitates interoperability across data processing systems and query engines.
https://xtable.apache.org/
Apache License 2.0
919 stars 147 forks source link

Error while writing hudi tables with the extensions jar #415

Open sagarlakshmipathy opened 7 months ago

sagarlakshmipathy commented 7 months ago

Steps to reproduce:

Note: This issue only happens when the hudi-extensions-0.1.0-SNAPSHOT-bundled.jar is added to the class path, and its important for Big Query integration as iceberg tables.

spark-shell \
        --jars /Users/sagarl/latest/incubator-xtable/hudi-support/extensions/target/hudi-extensions-0.1.0-SNAPSHOT-bundled.jar \
        --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.14.0 \
        --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
        --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
        --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \
        --conf "spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar" \
        --conf "hoodie.avro.write.support.class=org.apache.xtable.hudi.extensions.HoodieAvroWriteSupportWithFieldIds" \
        --conf "hoodie.client.init.callback.classes=org.apache.xtable.hudi.extensions.AddFieldIdsClientInitCallback" \
        --conf "hoodie.datasource.write.row.writer.enable=false" \
val data = Seq(("John", "Doe", 30), ("Jane", "Doe", 25))
val df = data.toDF("FirstName", "LastName", "Age")

val tableName = "sample_hudi_table"
val hudiOptions = Map(
    "hoodie.table.name" -> tableName
)

df.write.format("hudi")
.options(hudiOptions)
.mode("overwrite")
.save(s"/tmp/personal-bq-benchmarking/$tableName")

Error:

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2450)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2399)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2398)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2398)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1156)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1156)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1156)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2638)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2580)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2569)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2224)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2245)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2264)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2289)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
    at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
    at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
    at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
    at org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:177)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.setCommitMetadata(BaseSparkCommitActionExecutor.java:289)
    at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.autoCommit(BaseCommitActionExecutor.java:197)
    at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.commitOnAutoCommit(BaseCommitActionExecutor.java:183)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.updateIndexAndCommitIfNeeded(BaseSparkCommitActionExecutor.java:279)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.execute(BaseSparkCommitActionExecutor.java:184)
    at org.apache.hudi.table.action.deltacommit.SparkUpsertPreppedDeltaCommitActionExecutor.execute(SparkUpsertPreppedDeltaCommitActionExecutor.java:44)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:126)
    at org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsertPrepped(HoodieSparkMergeOnReadTable.java:88)
    at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:156)
    at org.apache.hudi.client.SparkRDDWriteClient.upsertPreppedRecords(SparkRDDWriteClient.java:63)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.commitInternal(HoodieBackedTableMetadataWriter.java:1132)
    at org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.commit(SparkHoodieBackedTableMetadataWriter.java:117)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.processAndCommit(HoodieBackedTableMetadataWriter.java:855)
    at org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.update(HoodieBackedTableMetadataWriter.java:1032)
    at org.apache.hudi.table.action.BaseActionExecutor.writeTableMetadata(BaseActionExecutor.java:105)
    ... 124 more
  Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:342)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:257)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1513)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1423)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1487)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1310)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
  Caused by: java.lang.NoSuchMethodError: 'void org.apache.hudi.common.table.log.block.HoodieHFileDataBlock.<init>(java.util.List, java.util.Map, org.apache.hudi.org.apache.hadoop.hbase.io.compress.Compression$Algorithm, org.apache.hadoop.fs.Path)'
    at org.apache.hudi.io.HoodieAppendHandle.getBlock(HoodieAppendHandle.java:669)
    at org.apache.hudi.io.HoodieAppendHandle.appendDataAndDeleteBlocks(HoodieAppendHandle.java:469)
    at org.apache.hudi.io.HoodieAppendHandle.doAppend(HoodieAppendHandle.java:450)
    at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:83)
    at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:335)
    ... 28 more

scala> 
sagarlakshmipathy commented 7 months ago

Two things I noticed while trying with hudi 0.14.1

When I initially hit the .save() method I get this error

java.lang.NoSuchFieldError: SET_NULL_FOR_MISSING_COLUMNS
  at org.apache.hudi.DataSourceWriteOptions$.<init>(DataSourceOptions.scala:540)
  at org.apache.hudi.DataSourceWriteOptions$.<clinit>(DataSourceOptions.scala)
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:146)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
  at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
  ... 47 elided

If I retry, I get a different error

java.lang.NoClassDefFoundError: Could not initialize class org.apache.hudi.DataSourceWriteOptions$
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:146)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
  at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
  at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
  ... 47 elided

I'm quite confused.