apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT] Comments for partition columns not synced to Hive/AWS Glue #11922

Open machadoluiz opened 1 month ago

machadoluiz commented 1 month ago

Describe the problem you faced

When using Apache Hudi to sync tables with Hive/AWS Glue using the hoodie.datasource.hive_sync.sync_comment = true option, comments for non-partition columns are synced correctly. However, the comments for partition fields are not being synced. This leads to partition columns in the schema missing their associated comments when viewed in AWS Glue Data Catalog.

To Reproduce

Steps to reproduce the behavior:

  1. Create a Hudi table in PySpark with both non-partition and partition columns.
  2. Define metadata comments for all columns, including the partition column.
  3. Write the table to Hudi with Hive sync enabled, using hoodie.datasource.hive_sync.sync_comment = true.

Such as:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder \
    .appName("HudiTableWithComments") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") \
    .getOrCreate()

schema = StructType([
    StructField("id", StringType(), nullable=False, metadata={"comment": "Unique identifier"}),
    StructField("name", StringType(), nullable=True, metadata={"comment": "Name of the person"}),
    StructField("age", IntegerType(), nullable=True, metadata={"comment": "Age of the person"}),
    StructField("date", StringType(), nullable=True, metadata={"comment": "Partition field, date of entry"})  # Partition field
])

data = [
    ("1", "John", 30, "2023-01-01"),
    ("2", "Jane", 25, "2023-01-01"),
    ("3", "Bob", 35, "2023-01-02")
]

df = spark.createDataFrame(data, schema=schema)

hudi_options = {
    'hoodie.table.name': 'hudi_table_with_comments',
    "hoodie.datasource.write.table.name": "hudi_table_with_comments",
    'hoodie.datasource.write.operation': 'insert',
    'hoodie.datasource.write.partitionpath.field': 'date',
    'hoodie.datasource.write.precombine.field': 'id',
    'hoodie.datasource.write.recordkey.field': 'id',
    "hoodie.datasource.write.schema.allow.auto.evolution.column.drop": "true",
    'hoodie.datasource.hive_sync.database': 'default',
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.hive_sync.partition_fields': 'date',
    'hoodie.datasource.hive_sync.sync_comment': 'true',
    'hoodie.datasource.hive_sync.table': 'hudi_table_with_comments',
    "hoodie.datasource.hive_sync.mode": "hms",
    "hoodie.datasource.hive_sync.support_timestamp": "true",
    "hoodie.datasource.hive_sync.use_jdbc": "false",
    "hoodie.merge.allow.duplicate.on.inserts": "false",
    "hoodie.schema.on.read.enable": "true",
}

df.write.format("hudi").options(**hudi_options).mode("append").save("...")

After running it, you'll find that comments for non-partition columns (id, name, age) are correctly synced, but the partition field (date) does not have its comment.

Expected behavior

That comments for both non-partition and partition columns should be synced. The partition field should also have its comment displayed in the table schema.

Environment Description

Additional context

I am running it in a AWS EMR cluster (version 7.2.0).

rangareddy commented 1 week ago

Hi @machadoluiz

I have tried the following code and not able to see comments in hive table. Could you please run the following code and see comments are worked for you.

Cluster details: Spark : 3.5.1 Hive: 4.0.0 Hudi: 0.15.0

pyspark \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("id", StringType(), nullable=False, metadata={"comment": "Unique identifier"}),
    StructField("name", StringType(), nullable=True, metadata={"comment": "Name of the person"}),
    StructField("age", IntegerType(), nullable=True, metadata={"comment": "Age of the person"}),
    StructField("date", StringType(), nullable=True, metadata={"comment": "Partition field, date of entry"})  # Partition field
])

data = [
    ("1", "John", 30, "2023-01-01"),
    ("2", "Jane", 25, "2023-01-01"),
    ("3", "Bob", 35, "2023-01-02")
]

df = spark.createDataFrame(data, schema=schema)

databaseName = "test"
tableName = "Hudi_Table_With_Commentss"
tablePath = f"/tmp/{databaseName}/{tableName}"

hudi_options = {
    'hoodie.table.name': tableName,
    'hoodie.database.name': databaseName,
    "hoodie.datasource.write.table.name": tableName,
    'hoodie.datasource.write.operation': 'insert',
    'hoodie.datasource.write.partitionpath.field': 'date',
    'hoodie.datasource.write.precombine.field': 'id',
    'hoodie.datasource.write.recordkey.field': 'id',
    'hoodie.datasource.write.schema.allow.auto.evolution.column.drop': 'true',
    'hoodie.datasource.hive_sync.database': databaseName,
    'hoodie.datasource.hive_sync.table': tableName,
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.hive_sync.partition_fields': 'date',
    'hoodie.datasource.hive_sync.sync_comment': 'true',
    "hoodie.datasource.hive_sync.mode": "hms",
    "hoodie.datasource.hive_sync.support_timestamp": "true",
    "hoodie.datasource.hive_sync.use_jdbc": "false",
    "hoodie.merge.allow.duplicate.on.inserts": "false",
    "hoodie.schema.on.read.enable": "true",
}

df.write.format("hudi").options(**hudi_options).mode("overwrite").save(tablePath)

spark.read.format("hudi").load(tablePath).show(20, False)

Hive:

show create table hudi_table_with_commentss;

+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE EXTERNAL TABLE `hudi_table_with_commentss`( |
|   `_hoodie_commit_time` string COMMENT '',         |
|   `_hoodie_commit_seqno` string COMMENT '',        |
|   `_hoodie_record_key` string COMMENT '',          |
|   `_hoodie_partition_path` string COMMENT '',      |
|   `_hoodie_file_name` string COMMENT '',           |
|   `id` string COMMENT '',                          |
|   `name` string COMMENT '',                        |
|   `age` int COMMENT '')                            |
| PARTITIONED BY (                                   |
|   `date` string COMMENT '')                        |
| ROW FORMAT SERDE                                   |
|   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
| WITH SERDEPROPERTIES (                             |
|   'hoodie.query.as.ro.table'='false',              |
|   'path'='/tmp/test/Hudi_Table_With_Commentss')    |
| STORED AS INPUTFORMAT                              |
|   'org.apache.hudi.hadoop.HoodieParquetInputFormat'  |
| OUTPUTFORMAT                                       |
|   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION                                           |
|   'hdfs://spark-hive-datalake:8020/tmp/test/Hudi_Table_With_Commentss' |
| TBLPROPERTIES (                                    |
|   'last_commit_completion_time_sync'='20241008045756144',  |
|   'last_commit_time_sync'='20241008045743162',     |
|   'spark.sql.create.version'='3.5.1',              |
|   'spark.sql.sources.provider'='hudi',             |
|   'spark.sql.sources.schema.numPartCols'='1',      |
|   'spark.sql.sources.schema.numParts'='1',         |
|   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":true,"metadata":{}},{"name":"date","type":"string","nullable":true,"metadata":{}}]}',  |
|   'spark.sql.sources.schema.partCol.0'='date',     |
|   'transient_lastDdlTime'='1728363476')            |
+----------------------------------------------------+