apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

[SUPPORT] Async compaction & ingestion performance #8217

Open haripriyarhp opened 1 year ago

haripriyarhp commented 1 year ago

Tips before filing an issue

Describe the problem you faced

I have a spark structured streaming job writing to MoR in S3. Based on the docs here https://hudi.apache.org/docs/next/compaction#spark-structured-streaming , I have set the properties for async compaction. As per my understanding, the compaction runs asynchronously so that ingestion also takes place in parallel. But what I observed is that the spark assigns all the resources to the compaction job and only when it is finished, it continues with the ingestion even though both the jobs are running in parallel. Is there something like --delta-sync-scheduling-weight", "--compact-scheduling-weight", ""--delta-sync-scheduling-minshare", and "--compact-scheduling-minshare" for spark structured streaming for ingestion and compaction to run in parallel with resource allocation ?

To Reproduce

Steps to reproduce the behavior:

1. 2. 3. 4.

Expected behavior

I expect compaction and ingestion is happening in parallel

Environment Description

Here is the screenshot of the micro batches for the spark structured streaming job. Normally each batch takes 4-6 mins. But the batches immediately following compaction is taking some time. async_compaction

Time taken for a normal batch = 206 sensor_morbloom_124_details

Batch=207 Here you can see the last job 2961 is HoodieCompactionPlanGenerator which is adding 8.6 mins overhead to this batch. sensor_morbloom_125_details

Batch=208 Here you can see it starts with job 2963 but 2964-2972 is for compaction and you can see it continues with 2973 only after compaction jobs are finished even though it runs on a different thread(see below pic). And time taken for different stages like Load base files, Building workload profile and getting small files has drastically increased. Refer batch 206 for normal time taken. sensor_morbloom_126_details

compaction jobs (2964-2972) sensor_morbloom_async_compaction

Is there something that can be done to improve this performance?

Additional context

using hudi-spark3.1-bundle_2.12-0.13.0.jar along with hadoop-aws_3.1.2.jar and aws-java-sdk-bundle_1.11.271.jar.

job configs

"hoodie.table.name" -> tableName, "path" -> "s3a://path/Hudi/".concat(tableName), "hoodie.datasource.write.table.name" -> tableName, "hoodie.datasource.write.table.type" -> MERGE_ON_READ, "hoodie.datasource.write.operation" -> "upsert", "hoodie.datasource.write.recordkey.field" -> "col5,col6,col7", "hoodie.datasource.write.partitionpath.field" -> "col1,col2,col3,col4", "hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.datasource.write.hive_style_partitioning" -> "true", //Cleaning options "hoodie.clean.automatic" -> "true", "hoodie.clean.max.commits" -> "3", //"hoodie.clean.async" -> "true", //hive_sync_options "hoodie.datasource.hive_sync.partition_fields" -> "col1,col2,col3,col4", "hoodie.datasource.hive_sync.database" -> dbName, "hoodie.datasource.hive_sync.table" -> tableName, "hoodie.datasource.hive_sync.enable" -> "true", "hoodie.datasource.hive_sync.mode" -> "hms", "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.upsert.shuffle.parallelism" -> "200", "hoodie.insert.shuffle.parallelism" -> "200", "hoodie.datasource.compaction.async.enable" -> true, "hoodie.compact.inline.max.delta.commits" -> "10", "hoodie.index.type" -> "BLOOM"

Stacktrace

Add the stacktrace of the error.

haripriyarhp commented 1 year ago

Update: As per the suggestion given here in slack forum https://apache-hudi.slack.com/archives/C4D716NPQ/p1679068718111579?thread_ts=1678987926.576359&cid=C4D716NPQ (https://medium.com/@simpsons/efficient-resource-allocation-for-async-table-services-in-hudi-124375d58dc), I tried setting the spark_scheduler_allocation_file. Still I see not much improvement in performance even though it goes to seperate pools. The below job looking for files to compact goes to the sparkdatasourcewriter which is taking ~10 mins which increases the time of the streaming query Job1494

The other jobs go to the hoodiecompact pool Job1495 Job1497

But still if you see the overall time taken for the queries remain as the same as above. streamingqueries

Is there any other way to keep the time taken for the streaming queries consistent?

haripriyarhp commented 1 year ago

Hi, is there some insights on this?

abhisheksahani91 commented 11 months ago

@haripriyarhp Did you get any workaround for this? I have just onboarded the Hudi deltastreamer pipeline in production and I am also observing that during compaction no writes are happening.