apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.31k stars 2.41k forks source link

[SUPPORT] current state of promotion type support #11599

Closed parisni closed 1 month ago

parisni commented 1 month ago

hudi 0.14.1 spark 3.3.x

I did not find any mention of hudi promotion type in the documentation

apparently promotion type is not supported #5519 :

there's no way this could be reliably supported w/in Hudi currently

Now from my tests below:

So:

  1. what is the state of promotion type support ?
  2. is it safe ?
  3. should we update the hive sync limitation ?
  4. should we document promotion type support ?
    
    tableName = "hudi_promotion"
    basePath = "/tmp/hudi_nested"
    from pyspark.sql.functions import expr

NB_RECORDS=10_000 # generate enough records to get multiple parquet files hudi_options = { "hoodie.table.name": tableName, "hoodie.datasource.write.recordkey.field": "event_id", "hoodie.datasource.write.partitionpath.field": "event_date", "hoodie.datasource.write.table.name": tableName, "hoodie.datasource.write.operation": "bulk_insert", "hoodie.datasource.write.precombine.field": "version", "hoodie.upsert.shuffle.parallelism": 1, "hoodie.insert.shuffle.parallelism": 1, "hoodie.delete.shuffle.parallelism": 1, "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.database": "default", "hoodie.datasource.hive_sync.table": tableName, "hoodie.datasource.hive_sync.mode": "hms", "hoodie.datasource.hive_sync.enable": "false", "hoodie.datasource.hive_sync.partition_fields": "event_date", "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.metadata.enable": "true", "hoodie.parquet.max.file.size": 1 1024 1024, } spark.sql("drop table if exists hudi_promotion").show()

INIT TABLE w/ INT, FLOAT and INT

df = ( spark.range(1,NB_RECORDS).withColumn("event_id", expr("id")) .withColumn("event_date", expr("current_date()")) .withColumn("version", expr("current_date()")) .withColumn("int_to_bigint", expr("cast(1 as int)")) .withColumn("float_to_double", expr("cast(1.1 as float)")) .withColumn("int_to_float", expr("cast(1 as int)")) ) (df.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath)) spark.read.format("hudi").load(basePath).printSchema()

spark.sql("desc table hudi_promotion").show()

NOW TURN to BIGINT, DOUBLE and FLOAT in an other partition

df = ( spark.range(1,2).withColumn("event_id", expr("id")) .withColumn("event_date", expr("current_date() + 1")) .withColumn("version", expr("current_date()")) .withColumn("int_to_bigint", expr("cast(1 as bigint)")) .withColumn("float_to_double", expr("cast(1.1 as double)")) .withColumn("int_to_float", expr("cast(1.1 as float)")) ) (df.write.format("hudi").options(**hudi_options).mode("append").save(basePath)) spark.read.format("hudi").load(basePath).printSchema()

spark.sql("desc table hudi_promotion").show()

NOW upsert the previous partition

df = ( spark.range(1,2).withColumn("event_id", expr("id")) .withColumn("event_date", expr("current_date()")) .withColumn("version", expr("current_date()")) .withColumn("int_to_bigint", expr("cast(1 as bigint)")) .withColumn("float_to_double", expr("cast(1.1 as double)")) .withColumn("int_to_float", expr("cast(1.1 as float)")) )

(df.write.format("hudi").options(**hudi_options).mode("append").save(basePath)) spark.read.format("hudi").load(basePath).printSchema()

spark.sql("desc table hudi_promotion").show()

danny0405 commented 1 month ago

@jonvex Any insights here?

jonvex commented 1 month ago

Hive sync has more limited type promotion like you described. We have some documentation here https://hudi.apache.org/docs/schema_evolution but as far as I know, this has only been tested for spark

parisni commented 1 month ago

Hive support looks like much broader than the hivesync implementation see bottom of page https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types

Consequently glue metastore should be the same.

I will propose a fix for hive sync then.

danny0405 commented 1 month ago

I will propose a fix for hive sync then.

@parisni That would be very helpful, thanks for the contribution~

parisni commented 1 month ago

A quick comparison in hudi/hive promotion type support lead to conclude that hive supports what hudi supports except:

image image

parisni commented 1 month ago

@jonvex interestingly the same script fails on emr with spark3.2.x and hudi 0.14.1. I don't get this issue locally, and I will have to dig this.

Caused by: org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file s3://<bucket>/test/hudi_promotion/event_date=2024-07-19/2b3377ab-dea4-40e8-b94c-bc7a8daf5c38-0_1-9-0_20240719141850764.parquet. Column: [int_to_bigint], Expected: bigint, Found: INT32
        at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:570)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:265)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:178)
        at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:661)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hasNext(Unknown Source)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:954)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:369)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:133)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
        at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.constructConvertNotSupportedException(ParquetVectorUpdaterFactory.java:1077)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:172)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:154)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:295)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:193)
        at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:178)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:261)
        ... 21 more
parisni commented 1 month ago

definitely, the same spark written with vanilla spark 3.3.4 :

something wrong w/ emr apparently

parisni commented 1 month ago

also turns out glue sync likely does not update fields, only sql.schema:

"float_to_double"",""type"":""double"" 
VS
float_to_double         float

IN:

col_name,data_type,comment
event_id                bigint
int_to_long             int
float_to_double         float
_kafka_timestamp        bigint
version                 string
event_date              string
event_hour              string

# Partition Information
# col_name              data_type               comment

version                 string
event_date              string
event_hour              string

"Detailed Table Information     Table(tableName:hudi_promotion, dbName:db, owner:null, createTime:1721405676, lastAccessTime:1721405711, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:event_id, type:bigint, comment:), FieldSchema(name:int_to_long, type:int, comment:), FieldSchema(name:float_to_double, type:float, comment:), FieldSchema(name:_kafka_timestamp, type:bigint, comment:), FieldSchema(name:version, type:string, comment:), FieldSchema(name:event_date, type:string, comment:), FieldSchema(name:event_hour, type:string, comment:)], location:s3://bucket/test_promotion/hudi_promotion, inputFormat:org.apache.hudi.hadoop.HoodieParquetInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat, compressed:false, numBuckets:0, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, parameters:{hoodie.query.as.ro.table=false, serialization.format=1, path=s3a://bucket/test_promotion/hudi_promotion}), bucketCols:[], sortCols:[], parameters:{}, storedAsSubDirectories:false), partitionKeys:[FieldSchema(name:version, type:string, comment:), FieldSchema(name:event_date, type:string, comment:), FieldSchema(name:event_hour, type:string, comment:)], parameters:{EXTERNAL=TRUE, last_commit_time_sync=20240719161441010, spark.sql.sources.schema.numPartCols=3, hudi.metadata-listing-enabled=FALSE, spark.sql.sources.schema.part.0={""type"":""struct"",""fields"":[{""name"":""event_id"",""type"":""long"",""nullable"":true,""metadata"":{}},{""name"":""int_to_long"",""type"":""long"",""nullable"":true,""metadata"":{}},{""name"":""float_to_double"",""type"":""double"",""nullable"":true,""metadata"":{}},{""name"":""_kafka_timestamp"",""type"":""long"",""nullable"":true,""metadata"":{}},{""name"":""version"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""event_date"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""event_hour"",""type"":""string"",""nullable"":true,""metadata"":{}}]}, last_commit_completion_time_sync=20240719161507000, spark.sql.sources.schema.partCol.0=version, spark.sql.sources.schema.partCol.2=event_hour, spark.sql.sources.schema.partCol.1=event_date, spark.sql.sources.schema.numParts=1, spark.sql.sources.provider=hudi}, viewOriginalText:null, viewExpandedText:null, tableType:EXTERNAL_TABLE)  "

EDIT: Found the issue. Will propose a patch.

parisni commented 1 month ago

I tested athena support for promotion type. It works fine, until a partition get mixed with multiple types (int/float); eg: after an upsert. The reason is in athena each partition has it's own schema, and it has to be updated with the promoted type. Otherwise such error will occur:

HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas. The types are incompatible and cannot be coerced. The column 'int_to_float' in table 'db.hudi_promotion' is declared as type 'float', but partition 'version=0/event_date=2024-01-01/event_hour=08' declared column 'int_to_float' as type 'int'.

or

HIVE_BAD_DATA: Field float_to_double's type DOUBLE in parquet file s3://bucket/test_promotion/hudi_promotion/version=0/event_date=2024-01-01/event_hour=08/45a8c801-26e9-4b57-a47d-640e1287afa1-0_0-55-183_20240719192221903.parquet is incompatible with type real defined in table schema

I guess we should reconsider approach proposed here #9071

parisni commented 1 month ago

FYI amazon redshift spectrum likely does not support all of promotion types. At least int_to_float fails with below error (even if the partition schema is right):

ERROR:  Spectrum Scan Error
DETAIL:
  -----------------------------------------------
  error:  Spectrum Scan Error
  code:      15007
  context:   File 'https://s3.eu-west-1.amazonaws.com/bucket/test_promotion/hudi_promotion4/version%3D0/event_date%3D2024-01-01/event_hour%3D08/2ff34814-8284-41b1-a22e-49fd6ba3785b-0_15-23-40_20240719214644303.parquet' has
  location:  dory_util.cpp:1615
  process:   worker_thread [pid=3007]
parisni commented 1 month ago

here is a small recap from my tests @jonvex @danny0405 . "No" means a select query ordering on the column will fail at runtime

int->bigint int->float float->double int->string
spark yes yes yes yes
athena yes yes yes no
spectrum yes no yes no