apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.48k stars 2.44k forks source link

[SUPPORT] Slow commit times with Spark Structured Streaming from Kinesis to MOR Hudi table #12412

Open sumosha opened 1 day ago

sumosha commented 1 day ago

Describe the problem you faced

We could use some guidance on a performance issue we are seeing with a Spark Structured Streaming job reading from Kinesis and writing to a MOR Hudi table in EMR 7.2 (Hudi 0.14.1-amzn-1). We have noticed very long-running commits in Hudi and the iterator age gets behind, up to a day or more, on the Kinesis stream as data flows in.

Steps to reproduce the behavior:

Some details about the load, application, and the Hudi table:

What we have noticed is that deltacommits typically take around 15 minutes, however every few commits we notice an excessively long deltacommit which takes around 1.5-3 hours to complete. While this commit is completing, there are no GetRecords calls to the stream to continue processing. We compared the commit metrics between one of these faster commits and a slow commit and there isn't a significant difference in the amount of data being written, maybe about 30k records. What I observed was some log file writes as low as 16 records and 100KB in size took ~4.5 minutes (totalUpsertTime). This seems an excessive amount of time for a log file (not parquet) with so few records.

Things we tried:

The jobs that took the most time in these slow batches:

What is interesting is the jobs Loading latest base files, Getting small files jobs typically run in less than a minute, yet during this period of the "slow commit," these jobs take many minutes. During the "fast commits," it appears the Building Workload Profile job is where most of the commit time is spent. I do observe shuffle spill to disk, so we could use guidance on improving that as well.

We have conducted some stress testing in a comparable test environment and were able to recreate the issue even starting on a blank table. We did the same stress testing earlier in the year, using EMR 6.15 (Hudi 0.14.0-amzn), and did not perceive this behavior.

What I found in the logs during the time of the slow commits are these logs. During the faster commits, these logs don't show up. I noticed around the same time the Delete Archived instants jobs had run, which prompted me to try to turn off the async archive service. 24/12/03 02:50:15 WARN PriorityBasedFileSystemView: Got error running preferred function. Likely due to another concurrent writer in progress. Trying secondary 24/12/03 02:50:17 WARN PriorityBasedFileSystemView: Routing request to secondary file-system view

Turning off async archive service alleviated the periodic extremely slow commit, however we still believe commit times are lackluster given the load and cluster size.

What other configurations should we look at to tune the write speeds? Do we need to reconsider our table design?

Below are the Hudi configurations in our production environment. Attached are some Spark UI screenshots as well.

hudi-defaults on the cluster: { "hoodie.archive.async": "true", "hoodie.archive.automatic": "true", "hoodie.compact.inline.max.delta.seconds": "21600", "hoodie.compact.inline.trigger.strategy": "TIME_ELAPSED", "hoodie.datasource.write.schema.allow.auto.evolution.column.drop": "true", "hoodie.embed.timeline.server.async": "true", "hoodie.schema.cache.enable": "true" } Spark writer configuration: "hoodie.cleaner.policy.failed.writes": "LAZY", "hoodie.write.concurrency.mode": "optimistic_concurrency_control", "hoodie.write.lock.dynamodb.region": "us-east-1", "hoodie.write.lock.dynamodb.table": "oitroot-op1-datasync-hudi-locks", "hoodie.write.lock.provider": "org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider" "path": hudi_output_path, "hoodie.table.name": clean_table_name, "hoodie.datasource.write.storage.type": "MERGE_ON_READ", "hoodie.datasource.write.operation": "upsert", "hoodie.datasource.write.recordkey.field": record_key_fields, "hoodie.datasource.write.precombine.field": precombine_field, "hoodie.datasource.write.partitionpath.field": partition_fields, "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.datasource.write.hive_style_partitioning": "true", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.database": clean_db_name, "hoodie.datasource.hive_sync.table": clean_table_name, "hoodie.datasource.hive_sync.partition_fields": partition_fields, "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.use_jdbc": "false", "hoodie.datasource.hive_sync.mode": "hms", "hoodie.datasource.hive_sync.omit_metadata_fields": "true", "hoodie.datasource.meta_sync.condition.sync": "true"

Thank you for your assistance!

Expected behavior

Consistent and faster commit times, ideally under 10 minutes

Environment Description

Slow Commit slow_commit_sparkui

Faster commit faster_commit_sparkui

Building Workload Profile job / slow stage with shuffle spill building_workload_profile_spill_sparkui building_workload_profile_executor_metrics_sparkui

ad1happy2go commented 11 hours ago

@sumosha The tasks with disk spill is normally a lot slower. Do you see spills in the fast run also? Also noticed very high GC for them. Did you tuned your GC configs. https://hudi.apache.org/docs/tuning-guide/#gc-tuning

sumosha commented 1 hour ago

@ad1happy2go It does appear there is spill even in the faster commit (which explains why that job seems to stay consistent at around 15 minutes).

faster_commit_spill_executors

I haven't been able to recreate the disk spill in my stress testing, so I assume it is the size difference in the underlying table and files being written (production is around 100GB now, I started with a fresh table in testing and haven't built up a good size yet). I was planning to play around with this setting mentioned in your guide: hoodie.memory.merge.fraction. Does this seem the right track? I'm wondering if just a larger instance size is warranted as this grows (maybe fewer instances to get a comparable core count).

We are currently on the default collector in EMR (Parallel) in production. I have updated to the G1 (this is jdk 17) in stress testing, though I didn't see much change in the overall commit times. We'll move forward with the G1 since it's recommended anyway.