apache / incubator-xtable

Apache XTable (incubating) is a cross-table converter for lakehouse table formats that facilitates interoperability across data processing systems and query engines.
https://xtable.apache.org/
Apache License 2.0
919 stars 147 forks source link

Enhance UUID Compatibility in Iceberg to Hudi Conversion #543

Open danielhumanmod opened 2 months ago

danielhumanmod commented 2 months ago

Feature Request / Improvement

Context

In #112 , we are working on adding UUID support in XTable. Currently, only Iceberg has the concept of a UUID, which is stored as a fixed-size byte array in the underlying Parquet files. To facilitate conversion between Iceberg and formats like Delta or Hudi, we introduced a UUID internal type.

Problem Statement

While converting UUIDs from Iceberg to Hudi, we encountered an issue with Spark’s ParquetSchemaConverter. It appears that Spark cannot read FIXED_LEN_BYTE_ARRAY fields annotated with the UUID logical type in Parquet. You can refer to the relevant source code here: ParquetSchemaConverter.

As of now, we have successfully implemented UUID conversion for Iceberg to Delta. However, Iceberg to Hudi remains a limitation due to this issue, which we hope to address in the future.

Error Log


Caused by: org.apache.spark.sql.AnalysisException: Illegal Parquet type: FIXED_LEN_BYTE_ARRAY (UUID).
    at org.apache.spark.sql.errors.QueryCompilationErrors$.illegalParquetTypeError(QueryCompilationErrors.scala:1762)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.illegalType$1(ParquetSchemaConverter.scala:206)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertPrimitiveField$2(ParquetSchemaConverter.scala:310)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertPrimitiveField(ParquetSchemaConverter.scala:224)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertField(ParquetSchemaConverter.scala:187)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertInternal$3(ParquetSchemaConverter.scala:147)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertInternal$3$adapted(ParquetSchemaConverter.scala:117)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.immutable.Range.foreach(Range.scala:158)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertInternal(ParquetSchemaConverter.scala:117)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convert(ParquetSchemaConverter.scala:87)
    at org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormatHelper$.buildImplicitSchemaChangeInfo(HoodieParquetFileFormatHelper.scala:31)
    at org.apache.spark.sql.execution.datasources.parquet.Spark34LegacyHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark34LegacyHoodieParquetFileFormat.scala:254)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:231)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:293)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Are you willing to submit PR?

Code of Conduct

vinishjail97 commented 1 month ago

@danielhumanmod Can we close this issue ?

danielhumanmod commented 1 month ago

Maybe not for now. This is a problem we can’t resolve in issue #112 . After discussing it with @the-other-tim-brown , we decided to leave this issue open as a reminder if we can find a way to make it work on the Hudi or Spark side in the future.