apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.34k stars 2.42k forks source link

[SUPPORT] Huge Performance Issue With BLOOM Index On A 1.6 Billion COW Table #11875

Open silly-carbon opened 3 weeks ago

silly-carbon commented 3 weeks ago

Describe the problem you faced

Spark Config: spark.driver.cores=1;spark.driver.memory=18g;spark.executor.cores=10;spark.executor.memory=32g;spark.driver.maxResultSize=8g;spark.default.parallelism=400;spark.sql.shuffle.partitions=400;spark.dynamicAllocation.maxExecutors=20;spark.executor.memoryOverhead=3g;spark.kryoserializer.buffer.max=1024m

But HUDI spends many time on HoodieBloomIndex.tagLocation:

image

Previous stages: they are rather quick. image

And With GC issues: image

To Reproduce

Steps to reproduce the behavior:

  1. Create a table CREATE TABLE temp_db.xxxxxxxxxxx ( _hoodie_is_deleted BOOLEAN, t_pre_combine_field BIGINT, order_type INT, order_no INT, profile_no INT, profile_type STRING, profile_cat STRING, u_version STRING, order_line_no INT, profile_c STRING, profile_i INT, profile_f DECIMAL(20,8), profile_d TIMESTAMP, active STRING, entry_datetime TIMESTAMP, entry_id INT, h_version INT) USING hudi CLUSTERED BY (order_no, profile_type, profile_no, order_type, profile_cat) INTO 2 BUCKETS TBLPROPERTIES ( 'primaryKey' = 'order_no,profile_type,profile_no,order_type,profile_cat', 'hoodie.cleaner.policy.failed.writes' = 'LAZY', 'type' = 'cow', 'hoodie.write.lock.filesystem.expire' = '15', 'preCombineField' = 't_pre_combine_field', 'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider', 'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control', 'hoodie.index.type' = 'BLOOM' ); 2. BULK_INSERT 1.6 Billion Data, this is quick, costing 12 mins

SET spark.sql.parquet.datetimeRebaseModeInWrite = CORRECTED;

set hoodie.datasource.write.operation = bulk_insert;

SET hoodie.combine.before.insert=false;

INSERT OVERWRITE temp_db.xxxxxxxxxxxxx SELECT FALSE, 1, * FROM ods_us.xxxxxx_source;

3. Insert 1 million data, this is the step with mentioned performance issue. INSERT INTO temp_db.xxxxxxxxx ( SELECT TRUE AS _hoodie_is_deleted, -- 0 rows FROM ods_us.xxxxxxxx_dddd UNION ALL SELECT FALSE AS _hoodie_is_deleted, -- 1 million rows FROM ods_us.xxxxxxxxx_stage )

Expected behavior

UPSERT quickly.

Environment Description

Additional context

I've also tried BUKCET index , but BULK_INSERT takes 1h 30 mins, and INSERT failed with this exception:

Job aborted due to stage failure: Task 8 in stage 2.0 failed 4 times, most recent failure: Lost task 8.3 in stage 2.0 (TID 41) (gsc-bissssss.org executor 1): java.lang.RuntimeException: java.lang.NumberFormatException: For input string: "f2c9f2eb" at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121) at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.NumberFormatException: For input string: "f2c9f2eb" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at org.apache.hudi.index.bucket.BucketIdentifier.bucketIdFromFileId(BucketIdentifier.java:79) at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.lambda$loadBucketIdToFileIdMappingForPartition$0(HoodieSimpleBucketIndex.java:60) at java.util.ArrayList.forEach(ArrayList.java:1259) at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.loadBucketIdToFileIdMappingForPartition(HoodieSimpleBucketIndex.java:56) at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex$1.computeNext(HoodieSimpleBucketIndex.java:94) at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex$1.computeNext(HoodieSimpleBucketIndex.java:87) at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119) ... 25 more

Record Level Index failed during BULK_INSERT, also OOM issue.

Stacktrace

Add the stacktrace of the error.

silly-carbon commented 3 weeks ago

Update: This Stage completed after 1.6 hours Other stages completed all very quickly. The performance is so bad and I cannot figure out why. Could anyone help with this? Thanks in advance.

image

danny0405 commented 3 weeks ago

It might be the false-positive, can you try bucket index or simple index instead.

silly-carbon commented 3 weeks ago

It might be the false-positive, can you try bucket index or simple index instead.

@danny0405 I've also tried BUCKET index , but BULK_INSERT takes 1h 30 mins, and UPSERT 1 million data later failed with this exception:

Job aborted due to stage failure: Task 8 in stage 2.0 failed 4 times, most recent failure: Lost task 8.3 in stage 2.0 (TID 41) (gsc-bissssss.org executor 1): java.lang.RuntimeException: java.lang.NumberFormatException: For input string: "f2c9f2eb" at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121) at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.NumberFormatException: For input string: "f2c9f2eb" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at org.apache.hudi.index.bucket.BucketIdentifier.bucketIdFromFileId(BucketIdentifier.java:79) at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.lambda$loadBucketIdToFileIdMappingForPartition$0(HoodieSimpleBucketIndex.java:60) at java.util.ArrayList.forEach(ArrayList.java:1259) at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex.loadBucketIdToFileIdMappingForPartition(HoodieSimpleBucketIndex.java:56) at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex$1.computeNext(HoodieSimpleBucketIndex.java:94) at org.apache.hudi.index.bucket.HoodieSimpleBucketIndex$1.computeNext(HoodieSimpleBucketIndex.java:87) at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119) ... 25 more

For BUCKET Index, if I don't use BULK_INSERT to load initial data, it fails with OOM issue still.

Record Level Index failed during BULK_INSERT, also OOM issue.

With SimpeIndex, it never ends running , also has OOM issue

danny0405 commented 3 weeks ago

This error indicates that bucket index does not take effect:

Caused by: java.lang.NumberFormatException: For input string: "f2c9f2eb"
silly-carbon commented 3 weeks ago

This error indicates that bucket index does not take effect:

Caused by: java.lang.NumberFormatException: For input string: "f2c9f2eb"

Hi @danny0405 I think that this may be a new issue, and it is easy to reproduce:

  1. BULK_INSERT & Overwrite into a BUCKET index table
  2. UPSERT into this table with some data again

Then this error shows up. Maybe BULK_INSERT is not compatible with BUCKET Index?

danny0405 commented 3 weeks ago

BULK_INSERT & Overwrite into a BUCKET index table

The bucket index for Spark bulk insert is introduced since release 0.14.0, did you use that release?

silly-carbon commented 3 weeks ago

BULK_INSERT & Overwrite into a BUCKET index table

The bucket index for Spark bulk insert is introduced since release 0.14.0, did you use that release?

Yes I use 0.14.1 @danny0405

silly-carbon commented 3 weeks ago

Hi @nsivabalan Could you please help with this?

And I reduced the size of the base table to 100 million, UPSERT still took very long to finish (33 mins)

ad1happy2go commented 12 hours ago

@silly-carbon Did we know how much file groups the job is touching. Is it possible to attach the .hoodie zip (without metadata dir) or share one commit file to look further into it.