apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.45k stars 2.43k forks source link

[SUPPORT] Performance Tuning: Slow stages (Building Workload Profile & Getting Small files from partitions) during Hudi Writes #2620

Open codejoyan opened 3 years ago

codejoyan commented 3 years ago

Hi, I am seeing some performance issues while upserting data especially in the below 2 jobs:

15 (SparkUpsertCommitActionExecutor) 17 (UpsertPartitioner)

Attached are some of the stats regarding the slow jobs/stages. Configurations used: --driver-memory 5G --executor-memory 10G --executor-cores 5 --num-executors 10 Upsert config parameters: option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator"). option("hoodie.upsert.shuffle.parallelism","2"). option("hoodie.insert.shuffle.parallelism","2"). option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT_BYTES, 128 1024 1024). option(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES, 128 1024 1024). option("hoodie.copyonwrite.record.size.estimate", "40")

Can you please guide how to approach tuning this performance problem? Let me know if you need any further details.

Below are some of the stats:

Screenshot 2021-03-03 at 1 52 04 AM

Environment Description

bvaradar commented 3 years ago

@nsivabalan is looking into this.

codejoyan commented 3 years ago

Thanks @bvaradar and @nsivabalan. Please let me know how to improve the performance or if you need any further details to investigate. I used the below configurations (SIMPLE INDEX and turned off compaction) to speed up the inserts and see much improvement: hoodie.parquet.small.file.limit 0 hoodie.index.type SIMPLE

But what are the downsides of not using the DEFAULT Bloom filter. In my use-case I would have late arriving data, so will the performance suffer because of this choice?

Also I would like to understand why these specific steps are taking time. From Spark web-UI it seems the execution of the below methods are taking too long. Any insights to understand what is happening in the background please?

org.apache.hudi.index.bloom.SparkHoodieBloomIndex.findMatchingFilesForRecordKeys(SparkHoodieBloomIndex.java:266) org.apache.hudi.index.bloom.SparkHoodieBloomIndex.tagLocationBacktoRecords(SparkHoodieBloomIndex.java:287) org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:433)

nsivabalan commented 3 years ago

Hey hi @codejoyan : Few clarifying questions on your use-case and record keys.

If your record keys are completely random, then using SIMPLE makes sense, as we may not do any filtering. While with default BLOOM index, we do filtering based on min/max ranges which may not be required(since in this step we read parquet footers to parse the min/max ranges).

Once you clarify these details, I can look into it further.

codejoyan commented 3 years ago

Apologies for the delay @nsivabalan Below are the answers to the questions you asked:

Few additional Questions: Use Case:

Based on the above scenario do you suggest:

  1. What other partition strategy or record key strategy might be used to take advantage of bloom filter?
  2. There are 2 jobs that take time. Are both related to index lookup time. Or something else is also contributing to the increased load time?
kimberlyamandalu commented 3 years ago

I have a similar issue where bloom index performance is very slow for upsert into a Hudi MOR table. Does anyone know if when Hudi performs an upsert, does it only lookup index for the related partitions or does it lookup against the entire data set? I have partitions of year and month from 1998 to 2020. My upserts are mostly to recent partitions (95%). I also notice a lot of calls to build fs view for older partitions i know should not have any upserts

AbstractTableFileSystemView: Building file system view for partition (message_year=2002/message_month=9)

image

Obtain key ranges for file slices (range pruning=on) collect at HoodieSparkEngineContext.java:73+details org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45) org.apache.hudi.client.common.HoodieSparkEngineContext.map(HoodieSparkEngineContext.java:73) org.apache.hudi.index.bloom.SparkHoodieBloomIndex.loadInvolvedFiles(SparkHoodieBloomIndex.java:176) org.apache.hudi.index.bloom.SparkHoodieBloomIndex.lookupIndex(SparkHoodieBloomIndex.java:119) org.apache.hudi.index.bloom.SparkHoodieBloomIndex.tagLocation(SparkHoodieBloomIndex.java:84) org.apache.hudi.index.bloom.SparkHoodieBloomIndex.tagLocation(SparkHoodieBloomIndex.java:60) org.apache.hudi.table.action.commit.AbstractWriteHelper.tag(AbstractWriteHelper.java:69) org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:51) org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitActionExecutor.execute(SparkUpsertDeltaCommitActionExecutor.java:46) org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:82) org.apache.hudi.table.HoodieSparkMergeOnReadTable.upsert(HoodieSparkMergeOnReadTable.java:74) org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:146) org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:214) org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:181) org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:134) org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)

codejoyan commented 3 years ago

@nsivabalan, any inputs would be very helpful.

nsivabalan commented 3 years ago

@codejoyan : sorry, somehow slipped from my radar. May I know whats the scale of data you are dealing with? I see your parallelism is very less (2). Can you try w/ 100 or more and see how it goes.

Among 3 methods you have quoted, 2 of them are index related and 3rd is actual write operation.

Best way to decide partitionin strategy is to see what your queries usually filter based on. If its date based, then you definitely need to have date in your partitioning strategy which you already do. And if adding region would cut down most of the data to be looked up, sure. I assume this would also blow up your # partitions in general since its no of dates * no of regions.

wrt record keys and bloom: You can try to use regular bloom "BLOOM" as index. With this, there are few config knobs. with simple bloom, we don't lot of config knobs to play around. within a single batch of writes, does records have some ordering to it or is it just random. From your response I guess its random. So, you can turn of range pruning since that may not help much. https://hudi.apache.org/docs/configurations.html#bloomIndexPruneByRanges to false. (default value is true).

@n3nash : do you have any pointers here.

nsivabalan commented 3 years ago

@kimberlyamandalu : do you have a support ticket for your question. lets not pollute this issue. we can create a new one for your use-case and can discuss over there

kimberlyamandalu commented 3 years ago

@kimberlyamandalu : do you have a support ticket for your question. lets not pollute this issue. we can create a new one for your use-case and can discuss over there

hi @nsivabalan no, i do not have a separate ticket for my question. I thought it might be related to this so I chimed in. I can open a new ticket for my use case so we can isolate. Sorry for the confusion. Thanks.

njalan commented 3 years ago

I face the same issue. It usually takes 1-2 minutes for getting small files from partitions in one micro batch(60 seconds interval). My storage is s3. But it looks like it is working fine on hdfs.

n3nash commented 3 years ago

@kimberlyamandalu @njalan @codejoyan There are a few problems when using BLOOM_INDEX

  1. Depending on the number of entries in the parquet file, if the BLOOM_INDEX num_entries is not configured correctly, it will lead to lots of false positives that results in bloom index spending more time looking up data. You can check the default bloom index entries here -> https://github.com/apache/hudi/blob/5be3997f70415e1752a0b5214f9398880fc8fd1f/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java#L47. You can either increase this or use dynamic bloom filter. We are working on adding metrics to emit how many such false positives happened.
  2. The BLOOM_INDEX step needs to perform a "listing" of the partitions to find the candidate files. On S3 without hoodie.metadata.table being enabled, this listing can take time. Enable the config to eliminate these file listings.
  3. Depending on your workload, BLOOM_INDEX could, in some cases not be the ideal choice. For example, if you have updates across all your partitions, then using SIMPLE_INDEX is better since bloom will just do some extra work and then do the work that SIMPLE_INDEX would have done anyways.
nsivabalan commented 3 years ago

FAQ link on how to configure bloom configs.

codejoyan commented 3 years ago

Problem Statement: I am using COW table and receiving roughly 1GB of incremental data. The batch has data quality check and upsert. Attached is the spark UI stages screenshot:

SnapShot Count before the Upsert Below is the snapshot view before running the upsert.

scala> val svsSnapshotDF = spark.read.format("org.apache.hudi").
     | load(targetPath + "/*/*/*")

scala> svsSnapshotDF.groupBy("v_date").count().sort(col("v_date")).show(false)
21/07/22 11:53:55 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
+----------+---------+                                                          
|v_date|count    |
+----------+---------+
|2021-07-02|266836321|
|2021-07-03|270866302|
|2021-07-04|198333856|
|2021-07-05|212205824|
|2021-07-06|198391165|
|2021-07-07|188043723|
|2021-07-08|445      |
+----------+---------+

Incremental Count after the Upsert:

scala> svsSnapshotDF.select(col("_hoodie_commit_time")).distinct.sort($"_hoodie_commit_time".desc).show(false)
+-------------------+                                                           
|_hoodie_commit_time|
+-------------------+
|20210721051130     |
|20210721045241     |
|20210721043446     |
|20210720185844     |
|20210720113928     |
|20210720110235     |
|20210720093310     |
|20210720073405     |
|20210720055244     |
|20210720051405     |
|20210720041607     |
|20210719181512     |
|20210719150715     |
|20210719140407     |
|20210719134750     |
|20210719133012     |
|20210719131145     |
|20210719063351     |
|20210719061724     |
+-------------------+

scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._

scala> val beginTime = "20210721051130"
beginTime: String = 20210721051130

scala> val svsIncrementalDF = spark.read.format("hudi").
     | option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
     | option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
     | load(targetPath)
svsIncrementalDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 125 more fields]

scala> svsIncrementalDF.groupBy("v_date").count().sort(col("v_date")).show(false)
+----------+-------+                                                            
|v_date|count  |
+----------+-------+
|2021-07-07|2680595|
|2021-07-08|25260  |
+----------+-------+

Completed Jobs Screenshot:

Screenshot 2021-07-22 at 6 11 33 PM

Slow Jobs Details ScreenShot

Screenshot 2021-07-22 at 6 38 10 PM Screenshot 2021-07-22 at 6 37 52 PM
codejoyan commented 3 years ago

Some additional details for the above runs.

  1. The configs I am using - REGULAR BLOOM.
  2. Max and Min file size in older partitions - 116 MB and 6 MB respectively
  3. Avg record size - 50 bytes
  4. Avg no of data files in older partitions - Between 157 to 225

I then changed the configs as below to have roughly 100k entries per file. But the performance is worse now. It basically gets stuck. Attached Spark Web UI screenshot

  1. Configs - hoodie.insert.shuffle.parallelism - 1500 hoodie.upsert.shuffle.parallelism - 1500 hoodie.parquet.small.file.limit - 4200000 hoodie.parquet.max.file.size - 5000000 hoodie.index.type - BLOOM hoodie.copyonwrite.record.size.estimate - 50 hoodie.copyonwrite.insert.split.size - 100000 hoodie.bloom.index.prune.by.ranges - false hoodie.bloom.index.filter.type - DYNAMIC_V0 hoodie.index.bloom.num_entries - 30000

The performance is now okay witth BLOOM index when the incremental batch size is around 100 MB (around 4-5 mins for upsert). But it gets worse when batch size increases (> 5 GB) and the countByKey at BaseSparkCommitActionExecutor.java:154 step gets stuck.

**

**

Screenshot 2021-07-24 at 10 01 31 PM
awplxz commented 2 years ago

Is there any progress on this problem, I have the same problem, According to my observation,the amount of shuffle data has tripled for the second time upsert

codope commented 2 years ago

Is there any progress on this problem, I have the same problem, According to my observation,the amount of shuffle data has tripled for the second time upsert

@awplxz There were a bunch of fixes regarding performance that we landed recently. Do you see the same behavior with the latest master?

koldic commented 1 year ago

Hi, I have the same problem with slow stages. Firstly it runs well, however when more and more small files are inserted it slows, and the Getting Small files stage with the Doing partition and writing data stage takes even an hour to finish. I tried to change hoodie.parquet.small.file.limit to the smallest possible value (1MB) to limit the small files that it collects, but it won´t help. When I changed it to 0 it helped, since the stage and collecting small files is disabled with this value. Is there any way how to turn on this setting back without slowing down all jobs or just try to use offline compaction? I also use a simple Index, as the key is random and version 0.12.1.

zyclove commented 1 year ago

I also encountered the same problem with 0.14.0, how to solve it? disable metadata ? set hoodie.metadata.table=false;

change hoodie.parquet.small.file.limit ?

set hoodie.bloom.index.prune.by.ranges = false ?

change hoodie.memory.merge.max.size ?

Can this be optimized in hudi 1.0? This stage is simply too time consuming.

image

FFCMSouza commented 8 months ago

I'm having the same problema on hudi version 0.14.1 and spark 3.4.1.