apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.48k stars 2.44k forks source link

[SUPPORT] - Hudi 0.12.1 - production job slowing down #10822

Open joshhamann opened 9 months ago

joshhamann commented 9 months ago

Describe the problem you faced We have a production transform job using AWS Glue version 4.0, Hudi version 0.12.1 that loads data into a hudi table on s3. At some point, this job starting taking longer to run. I created a test job to point to the same raw data source, which is loading into a new Hudi table on s3, which completed much faster (5min vs 15min), in line with expectation. We are partitioning by date, and the volume of data has not changed. The job runs every 15 minutes, so the job duration is now becoming an issue. I noticed there are many files in these s3 locations on the prd transform:

.hoodie/metadata/.hoodie .hoodie/archive

I also noticed that both the production and test job seem to transform the data in the same amount of time (~5 minutes), but the production job then has many additional steps after DirectWriteMarkers, which take up the rest of the time difference. These steps are:

FSUtils CleanPlanActionExecutor SparkUpsertDeltaCommitPartitioner SparkUpsertPreppedDeltaCommitActionExecutor

Test Job: Screenshot 2024-03-05 at 9 46 53 AM

Production Job with many more steps at the end: Screenshot 2024-03-05 at 9 47 22 AM

To Reproduce

  1. Run a job every 15 minutes for a long time, and let metadata/timeline/archive files build up
  2. Create new test on same source data (which is processing MORE data given not using bookmarks)
  3. Notice test job finishes quicker than actual production job

Expected behavior

I expect that given the structure of the hudi table, continually building up more days of data should not slow hudi down. I also expect there should be some configs to assist in cleanup. What configs can I set to alleviate these extra steps at the end that I am experiencing in production?

Environment Description AWS Glue 4.0, Hudi 0.12.1, Spark 3.3.0

ad1happy2go commented 9 months ago

@joshhamann Can you please provide the writer configuration to look into this more.

If you are using upsert operation type, The load to a new Hudi Table will be expected to run faster as there is no existing dataset to join with to identify which records need to be upserted. So when we benchmarked 5 min vs 15 min, was the Hudi Table was empty or it had same amount of existing data as old table.

joshhamann commented 9 months ago

Here is my configuration:

{'hoodie.table.name': 'analytics_events', 'hoodie.datasource.write.recordkey.field': 'event_uuid', 'hoodie.datasource.write.partitionpath.field': 'event_create_time_date_partition', 'hoodie.datasource.write.table.name': 'analytics_events', 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'event_create_time', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.SimpleKeyGenerator', 'hoodie.parquet.compression.codec': 'snappy'}

I was under the impression that by not using global bloom, this upsert process would happen within the particular partition in question, and therefore, if the volume stays fairly across partitions (in this case, days), we shouldn't see job duration increasing. Is that not the case?

Let me know if I can provide any more information. Thanks for your help on this!

ad1happy2go commented 9 months ago

@joshhamann That's the correct understanding. If we are not using global bloom, then if your incremental dataset only had data from very few partitions , then index lookup stage will use only those partitions. But still it can't be as fast as we run on a completely fresh Hudi Table.

Can you share spark UI screenshot for the job taking 5 min and 15 min.

joshhamann commented 9 months ago

You can see the timestamps in the above screenshots from the Spark UI if that works. For instance, the test job, which is processing more data, goes from around 23:18 to 23:23 (and the majority of which is actually processing the data from my understanding). The production job, goes from 21:42 to 21:55, and the processing portion of the job is similar to the test job. The only difference is the disparate steps at the end taking more time.

Some additional context which may or may not be helpful, our job is reading off events, and in theory, should only be processing events within that particular UTC date. However, we do get events with past and future timestamps (outside of the particular UTC partition date, which means there are more partitions to upsert into. However, the test job is pointing to the same raw data, so I guess I would expect that to be happening in both places.