apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.35k stars 2.42k forks source link

[SUPPORT] Write performance problem - "Tagging" takes too long #9322

Open izdkast opened 1 year ago

izdkast commented 1 year ago

Describe the problem you faced

When writing files into S3 after batch streaming from Kafka, it takes around 2 hours to finish the step "Tagging" while the EMR Cluster looks like being almost idle, image

It looks like only two executors are doing all the tasks (I don't know if this could be an issue), image

This is running on AWS EMR with this setup:

MASTER: 1 x r5.8xlarge CORE: 15 x r5.8xlarge

It does not look like a memory problem, image

spark-config

"maximizeResourceAllocation": "true",
"spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": "2",
"spark.driver.maxResultSize": "0",
"spark.sql.streaming.minBatchesToRetain": "360",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.yarn.maxAppAttempts": "1",
"spark.sql.optimizer.enableJsonExpressionOptimization": "false",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
"spark.sql.adaptive.enabled": "true",
"spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.cleaner.referenceTracking.cleanCheckpoints": "true",
"spark.dynamicAllocation.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true"

HUDI .write Options

These are the options I'm using for the HUDI write:

            .write
            .format("hudi")
            .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
            .option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator")
            .option("hoodie.datasource.write.recordkey.field", "<unique-field>,<timestamp-field>,<hash-field>")
            .option("hoodie.datasource.write.partitionpath.field","<text-field-partition-input>,<text-field-partition2-input>,<Text with Date CCYYMM>")
            .option("hoodie.datasource.write.precombine.field", "<timestamp-field>")
            .option("hoodie.table.name", <hudiTableName>)
            .option("hoodie.datasource.write.hive_style_partitioning", "true")
            .option("hoodie.metadata.enable", "true")
            .option("hoodie.metadata.insert.parallelism", "6")
            .option("hoodie.clean.async", "true")
            .option("hoodie.clean.automatic", "true")
            .option("hoodie.cleaner.policy", "KEEP_LATEST_BY_HOURS")
            .option("hoodie.cleaner.hours.retained", "168")
            .option("hoodie.datasource.write.operation", "upsert")
            .option("hoodie.metrics.on", "true")
            .option("hoodie.metrics.reporter.type", "CLOUDWATCH")
            .option("hoodie.metrics.cloudwatch.metric.prefix", "xxx_")
            .option("hoodie.write.concurrency.mode","optimistic_concurrency_control")
            .option("hoodie.cleaner.policy.failed.writes","LAZY")
            .option("hoodie.write.lock.provider","org.apache.hudi.client.transaction.lock.InProcessLockProvider")
            .mode("append")
            .save(<outputDirectory>)

I've marked with "<.....>" the values I've manually replaced for privacy reasons.

Versions

Thank you for any insights, and let me know if you require any extra information 🙂

brightwon commented 6 months ago

I have the same problem. Is there any progress about this issue?

ad1happy2go commented 6 months ago

@brightwon Can you provide details of your table writer configurations and spark UI screenshot

liiang-huang commented 5 months ago

I'm facing the same problem, it just failed to upsert