apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT] Hudi COW performance issue, bottleneck in "Doing partition and writing data" stage #9791

Open jeguiguren-cohere opened 1 year ago

jeguiguren-cohere commented 1 year ago

Describe the problem you faced We are using Hudi on AWS Glue to continuously merge small batches of data to bronze tables, and noticing slow write performance in upsert mode to COW table (20+ minutes).

The target table is ~small, approximately 6 million rows x 1000 columns, and the incoming batches have less than 50,000 records (which during preCombine step are reduced to less than 10,000 unique records). The table is not partitioned because it is small, and currently configured with simple global index.

Expected behavior

I would expect writes of this size to take a few minutes, similar to vanilla Spark job writing to S3 in parquet files.

Environment Description

Additional context

Table config in /.hoodie/hoodie.properties:

#Updated at 2023-08-14T16:51:53.434Z
#Mon Aug 14 16:51:53 UTC 2023
hoodie.table.timeline.timezone=LOCAL
hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
hoodie.table.precombine.field=clusterTime
hoodie.table.version=5
hoodie.database.name=
hoodie.datasource.write.hive_style_partitioning=false
hoodie.table.checksum=3456772992
hoodie.partition.metafile.use.base.format=false
hoodie.archivelog.folder=archived
hoodie.table.name=hudi_raw_mytable
hoodie.populate.meta.fields=true
hoodie.table.type=COPY_ON_WRITE
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.table.base.file.format=PARQUET
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.metadata.partitions=
hoodie.timeline.layout.version=1
hoodie.table.recordkey.fields=documentKey
hoodie.table.partition.fields=

Hudi config:

 "hoodie.table.name": TABLE,
  "hoodie.datasource.write.recordkey.field": "documentKey",
  "hoodie.datasource.write.precombine.field": "clusterTime",
  "hoodie.datasource.write.reconcile.schema": "false",
  "hoodie.schema.on.read.enable": "true",
  "hoodie.bulkinsert.sort.mode": "GLOBAL_SORT",
  "hoodie.metadata.enable": "false",
  "hoodie.datasource.hive_sync.database": DB_NAME,
  "hoodie.datasource.hive_sync.table": TABLE,
  "hoodie.datasource.hive_sync.use_jdbc": "false",
  "hoodie.datasource.hive_sync.enable": "true",
  "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.NonPartitionedExtractor",
  "hoodie.datasource.hive_sync.partition_value_extractor": "org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor",
  "hoodie.index.type": "GLOBAL_SIMPLE",
  "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.NonpartitionedKeyGenerator"

Spark stages show that majority of time (20+ minutes) is spent in "Doing partition and writing data"

Screen Shot 2023-09-26 at 7 10 54 PM
jeguiguren-cohere commented 1 year ago

Any guidance here on how to investigate further or what optimizations to try ?

ad1happy2go commented 1 year ago

@jeguiguren-cohere What spark configuration you are using?

jeguiguren-cohere commented 1 year ago

@ad1happy2go Spark configuration is

 conf = SparkConf()
    conf.set('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')
    conf.set('spark.sql.legacy.pathOptionBehavior.enabled',True)
    conf.set('spark.sql.hive.convertMetastoreParquet', False)
    conf.set("spark.sql.extensions","org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
    spark_context = SparkContext.getOrCreate(conf)
    glue_context = GlueContext(spark_context)

and df write is df.write.format("hudi").options(**hudi_config).mode("append").save()

hudi_config = {
  "hoodie.table.name": TABLE,
  "hoodie.datasource.write.recordkey.field": "documentKey",
  "hoodie.datasource.write.precombine.field": "clusterTime",
  "hoodie.datasource.write.reconcile.schema": "false",
  "hoodie.schema.on.read.enable": "true",
  "hoodie.bulkinsert.sort.mode": "GLOBAL_SORT",
  "hoodie.metadata.enable": "false",
  "hoodie.datasource.hive_sync.database": DB_NAME,
  "hoodie.datasource.hive_sync.table": TABLE,
  "hoodie.datasource.hive_sync.use_jdbc": "false",
  "hoodie.datasource.hive_sync.enable": "true",
  "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.NonPartitionedExtractor",
  "hoodie.datasource.hive_sync.partition_value_extractor": "org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor",
  "hoodie.index.type": "GLOBAL_SIMPLE",
  "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.NonpartitionedKeyGenerator"
}
ad1happy2go commented 1 year ago

@jeguiguren-cohere Can you try tuning spark configuration. https://hudi.apache.org/docs/tuning-guide/

Please let us know your findings. Thanks.