delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.62k stars 1.71k forks source link

[BUG] Uniform-Hudi | Unable to read Metadata #3760

Open ambujshukla1998 opened 1 month ago

ambujshukla1998 commented 1 month ago

Hello team, I'm encountering an issue when attempting to read a Hudi table generated through Delta Lake Uniform.

Environment: Delta Lake Version: 3.2 Hudi Version: 0.14.1 (using hudi-spark_2.12) Spark Version: 3.5

Describe the problem

We have created Hudi Metadata using Uniform by setting relevant table properties and we also confirm the Hudi Metadata is generated at the folder/table_name location. But as mentioned in the official documentation steps to read this Hudi Metadata, we see we are getting an null dataframe as described below We also see Hudi table generated using Uniform is also not getting registered in Hive Metastore unlike we see Iceberg table getting registered.

Steps to reproduce

  1. Created Hudi tables using Uniform.
  2. Generated metadata for both Hudi tables.
  3. Tried to read the table through Hive Metastore (But getting registered as External Delta Table by setting spark_catalog to Hoodie Catalog). 4.Read the table from Hudi Metadata but while reading this table with Spark version 3.3 and Hudi with version 0.14.1 as mentioned in the Delta official release the table must be read with Spark version less than

Code for creating the table

spark.sql(s"""s"""create external table if not exists {SCHEMA}.{TABLENAME} ( | wm_item_nbr BIGINT, | base_div_nbr INTEGER, | catlg_item_id BIGINT, | vendor_nbr INTEGER, | buyr_rpt_postn_id INTEGER, | plnr_rpt_postn_id INTEGER, | acctg_dept_nbr INTEGER | ) | using delta | TBLPROPERTIES ("delta.enableIcebergCompatV2"="true", "delta.universalFormat.enabledFormats"="hudi", "minReaderVersion"=2, "minWriterVersion"=7, 'delta.columnMapping.mode' = 'name') | location '$bucket/{TABLENAME}'"""""")

Code while Inserting into delta - uniform enabled Table: val df = spark .table(config.sourceTable) .select( "wm_item_nbr", "base_div_nbr", "catlg_item_id", "vendor_nbr", "buyr_rpt_postn_id", "plnr_rpt_postn_id", "acctg_dept_nbr" ) logger.info("Table is read") df.write .format(DELTA) .option(DeltaOptions.PARTITION_OVERWRITE_MODE_OPTION, "dynamic") .mode(SaveMode.Overwrite) .insertInto(targetTable) logger.info("Loading to table is complete ") Thread.sleep(180000)

Also POM dependency added:

org.apache.hudi hudi-common 0.14.1
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-hudi_2.12</artifactId>
        <version>3.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hudi</groupId>
        <artifactId>hudi-hive-sync</artifactId>
        <version>0.14.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hudi</groupId>
        <artifactId>hudi-spark_2.12</artifactId>
        <version>0.14.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hudi</groupId>
        <artifactId>hudi-sync-common</artifactId>
        <version>0.14.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hudi</groupId>
        <artifactId>hudi-gcp</artifactId>
        <version>0.14.1</version>
    </dependency>

Now to read the table: Spark version: 3.3 And Hudi Version 0.14.1

  val spark_hudi = SparkSession
    .builder()
    .master("yarn")
    .enableHiveSupport()
    .appName("HudiSession")
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
    .config(
      "spark.sql.catalog.spark_catalog",
      "org.apache.spark.sql.hudi.catalog.HoodieCatalog"
    )
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.kryo.registrationRequired", "true")
    .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
    .getOrCreate()

  logger.info("Configured Spark for Hudi")

var data = spark_hudi.read .format("hudi") .schema(sparkSchema) .option("hoodie.metadata.enable", "true") .option("hoodie.datasource.write.hive_style_partitioning", "true") .load(bucket + "/" + config.targetTable.split("\.")(1)) logger.info(s"Count is ${data.count()}") data.show(10, false) data.filter(col("base_div_nbr").isNotNull).show(10, false)

spark_hudi.sql(s"""select * from ${config.targetTable} """).show(10, false) spark_hudi.sql(s"""show tblproperties ${config.targetTable} """).show(1000, false) spark_hudi.sql(s"""desc extended ${config.targetTable} """).show(1000, false)

Observed results

Count is coorect but Dataframe has null values 24-10-09 06:49:56 INFO UniformDriver$: Count is 24400891 +-----------+------------+-------------+----------+-----------------+-----------------+--------------+ |wm_item_nbr|base_div_nbr|catlg_item_id|vendor_nbr|buyr_rpt_postn_id|plnr_rpt_postn_id|acctg_dept_nbr| +-----------+------------+-------------+----------+-----------------+-----------------+--------------+ |null |null |null |null |null |null |null | |null |null |null |null |null |null |null | |null |null |null |null |null |null |null | |null |null |null |null |null |null |null | |null |null |null |null |null |null |null | |null |null |null |null |null |null |null | |null |null |null |null |null |null |null | |null |null |null |null |null |null |null | |null |null |null |null |null |null |null | |null |null |null |null |null |null |null | +-----------+------------+-------------+----------+-----------------+-----------------+--------------+

Thows as error while reading from Hive Metastore 24-10-09 06:50:05 ERROR ApplicationMaster: User class threw exception: org.sparkproject.guava.util.concurrent.ExecutionError: java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/FileSourceOptions$

Environment information