apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.18k stars 2.38k forks source link

Slow Write into Hudi Dataset(MOR) #1694

Closed Raghvendradubey closed 1 year ago

Raghvendradubey commented 4 years ago

Hi Team,

I am reading data from Kafka and ingesting data into Hudi Dataset(MOR) using Hudi DataSource Api through Spark Structured Streaming. Pipeline Structure as like -

Kafka(Source) > Spark Structured Streaming(EMR) > MOR Hudi table(S3)

Spark - 2.4.5 Hudi - 0.5.2

I am getting performance issues while writing data into Hudi Dataset. following Hudi Jobs are taking time countByKey at HoodieBloomIndex.java countByKey at WorkloadProfile.java count at HoodieSparkSqlWriter.scala

Configuration used to write hudi data set as followed

new_df.write.format("org.apache.hudi").option("hoodie.table.name", tableName) \
    .option("hoodie.datasource.write.operation", "upsert") \
    .option("hoodie.datasource.write.table.type", "MERGE_ON_READ") \
    .option("hoodie.datasource.write.recordkey.field", "wbn") \
    .option("hoodie.datasource.write.partitionpath.field", "ad") \
    .option("hoodie.datasource.write.precombine.field", "action_date") \
    .option("hoodie.compact.inline", "true") \
    .option("hoodie.compact.inline.max.delta.commits", "300") \
    .option("hoodie.datasource.hive_sync.enable", "true") \
    .option("hoodie.upsert.shuffle.parallelism", "5") \
    .option("hoodie.insert.shuffle.parallelism", "5") \
    .option("hoodie.bulkinsert.shuffle.parallelism", "5") \
    .option("hoodie.datasource.hive_sync.table", tableName) \
    .option("hoodie.datasource.hive_sync.partition_fields", "ad") \
    .option("hoodie.index.type","GLOBAL_BLOOM") \
    .option("hoodie.bloom.index.update.partition.path", "true") \
    .option("hoodie.datasource.hive_sync.assume_date_partitioning", "false") \
    .option("hoodie.datasource.hive_sync.partition_extractor_class",
            "org.apache.hudi.hive.MultiPartKeysValueExtractor") \
    .mode("append").save(tablePath)

Spark Submit command - spark-submit --deploy-mode client --master yarn --executor-memory 6g --executor-cores 1 --driver-memory 4g --conf spark.driver.maxResultSize=2g --conf spark.executor.id=driver --conf spark.executor.instances=300 --conf spark.kryoserializer.buffer.max=512m --conf spark.shuffle.service.enabled=true --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.task.cpus=1 --conf spark.yarn.driver.memoryOverhead=1024 --conf spark.yarn.executor.memoryOverhead=3072 --conf spark.yarn.max.executor.failures=100 --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 --py-files s3://spark-test/hudi_job.py Attaching screen shot for the job details. hudi-job

countByKey at HoodieBloomIndex.java countbykey-hoodiebloomindx countbykeyhoodiebloomindextask

countByKey at WorkloadProfile.java workloadprofile workloadprofiletask

count at HoodieSparkSqlWriter.scala hoodiesparksqlwriter sparksqlwritertask

Please suggest how I can tune this.

Thanks Raghvendra

vinothchandar commented 4 years ago

Is there a reason why you are setting the shuffle parallelism to 5? When it seems like you have more executors?

We can go step by step . Happy to work with you thru the tuning process. Can you please summarize your workload - records per partition, upsets vs insert ratio, ordered vs random keys.

Below are some useful resources

https://cwiki.apache.org/confluence/display/HUDI/Tuning+Guide https://cwiki.apache.org/confluence/display/HUDI/FAQ https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-HowdoImodelthedatastoredinHudi

Raghvendradubey commented 4 years ago

Hello Vinoth,

I was just playing with different combination of shuffle parallelism, I am able to reduce countByKey at WorkloadProfile.java through shuffle parallelism by setting upto 20 or so but there is no impact on countByKey at HoodieBloomIndex.java and count at HoodieSparkSqlWriter.scala Data Stats are as followed - 1 - more than 500 keys/record 2 - 7k to 10k records/ partition 3 - upsets vs insert ratio around 70:30 but this can vary in most cases, it's not fixed
4 - Keys are not ordered/partition, I have oredered the keys while inserting into Hudi Dataset thorugh spark structured streaming.

vinothchandar commented 4 years ago

Beyond the initial shuffle, hudi will auto tune everything so I am not surprised.

On countByKey at HoodieBloomindex, what’s the line number?

count at HoodieSparkSqlWriter, is actual writing of data. We send 100K records to the same insert partition to write larger file sizes. Can you see if there’s a skew in that stage? It’s tunable

Raghvendradubey commented 4 years ago

On job countByKey at HoodieBloomindex, stage mapToPair at HoodieWriteCLient.java:977 is taking longer time more than a minute, and stage countByKey at HoodieBloomindex is executed within seconds. yes there is skew in count at HoodieSparkSqlWriter, all partitions are getting 200 to 500KB data and one partition is getting 100mb+ data.

vinothchandar commented 4 years ago

Does that one partition seem to be inserts? Hudi reduces insert parallelism to ensure larger file sizes.. see http://hudi.apache.org/docs/configurations.html#insertSplitSize May be try lowering this?

vinothchandar commented 4 years ago

https://github.com/apache/hudi/blob/release-0.5.2/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java

https://github.com/apache/hudi/blob/41202da7788193da77f1ae4b784127bb93eaae2c/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java#L977

Altria is doing is reading the input and shuffling for de-duplication. So not sure

Raghvendradubey commented 4 years ago

no luck from insertSplitSize, default is size = 500000, I reduced it upto 5000, but still there is skew in partition. Do I need to set hoodie.copyonwrite.insert.auto.split=false then I need to tune insertSplitSize?

Raghvendradubey commented 4 years ago

After setting hoodie.parquet.small.file.limit=0 I am able to reduce time on count at HoodieSparkSqlWriter and data distribution is also fine. but somehow countByKey at WorkloadProfile.java started getting more time. attaching screenshot for stages. Screenshot from 2020-06-09 19-14-17

vinothchandar commented 4 years ago

Sorry.. slipped off my radar./

After setting hoodie.parquet.small.file.limit=0 This suggests that a lot of records were getting packed into existing files before.. I think you'd want to consider having the old behavior for real production use-cases.. we absorb that cost on the writer side, to queries will have well sized files for reading..

countByKey at WorkloadProfile.java started getting more time. In the screenshot, all I see is indexing dominating the cost.. ?

Raghvendradubey commented 4 years ago

Hey vinoth,

1 - Could you please some shed of light on statement "old behavior for real production use-cases"? 2 - Yes Indexing is dominating, not sure why exactly it is, but it is after setting parameter hoodie.parquet.small.file.limit = 0

vinothchandar commented 4 years ago

2 - Yes Indexing is dominating, not sure why exactly it is, but it is after setting parameter hoodie.parquet.small.file.limit = 0

If you turn off small file handling, you end up writing more files, which means - indexing has to compare ranges/bloom filter across many more files.. This is the same reason why you should consider not doing this for query side as well.. small files will hurt query performance a lot as well..

Let's do a reset here and try to design for your use-case? happy to work through this if you can share more aobut your goals here..

Raghvendradubey commented 4 years ago

sure. I am trying to achieve near real time data( like Read Optimized View) by updating records over S3. eg - let's say I have records a1 b1 t1 a1, b2, t2 a1, b3, t3 t1,t2,t3 incremental timestamps so finally I want a1, b3, t3 record.

Data Pipeline - Reading data from Kafka through Spark Structured Streaming and performing upsert into Hudi table over s3

Data Read from Kafka - Size - 100-300MB/minute Kafka Parallel Partitions - 15 Upsert:Insert - 7:3 No. Of Columns - 550

Please let me know if needs more info.

vinothchandar commented 4 years ago

@Raghvendradubey thanks for the info.. you may also want to understand how much of the existing data changes every minute.. if its 70% updates, I would suggest using MOR as it can absorb updates more quickly..

Let's target a lenient 5 min Spark streaming batch interval, see how the commit durations look like and go from there?

Raghvendradubey commented 4 years ago

@vinothchandar I run the job with 5 min batch interval using MOR, now I can see commit duration are 5 min and compaction is also 5 min, and updated records are only 10% of total records written but now job is running with huge lag. sample commit are as below -

═══════════╗
║ CommitTime     │ Total Bytes Written │ Total Files Added │ Total Files Updated │ Total Partitions Written │ Total Records Written │ Total Update Records Written │ Total Errors ║
╠════════════════╪═════════════════════╪═══════════════════╪═════════════════════╪══════════════════════════╪═══════════════════════╪══════════════════════════════╪══════════════╣
║ 20200625112117 │ 178.0 MB            │ 1                 │ 3                   │ 2                        │ 193777                │ 18939                        │ 0            ║
╟────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20200625111810 │ 104.0 MB            │ 0                 │ 1                   │ 1                        │ 149946                │ 12619                        │ 0            ║
╟────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20200625111610 │ 211.7 MB            │ 0                 │ 3                   │ 2                        │ 259500                │ 14721                        │ 0            ║
vinothchandar commented 3 years ago

@Raghvendradubey we are getting async compaction enabled for spark structured streaming sink that should bring this down by half, since compaction wont block writing..

Can you paste the latest spark UI which shows the most expensive stage? We can work on tuning this down.

Raghvendradubey commented 3 years ago

@vinothchandar Excellent, how can I try this async compaction? I am attaching most expensive stages, I am not sure that Do I need to scale cluster or I can lower this by some config changes. Please suggest HudiSparkSQLWriterNew WorkloadProfileStageNew

vinothchandar commented 3 years ago

1752 is the PR.

What I am seeing is that the range based pruning is not very effective.. and is resulting in lots of shuffled data..

is there a way to not use global index? i.e can you always determine ad for each record.? .option("hoodie.datasource.write.recordkey.field", "wbn"), is there certain ordering to wbn that we can exploit.. I am referring some stuff put together here.. https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-Whatperformance/ingestlatencycanIexpectforHudiwriting

In general, we need to make the upsert process not be dependent on the size of the table, but rather on size of input..

If you are open to trying, you can switch to simple index on master, which. will be lot lighter in this particular scenario, where there does not seem to be any benefits for range/bloom information.

vinothchandar commented 3 years ago

Happy to work more hands-on and get this working for you. lmk

Raghvendradubey commented 3 years ago

Thanks @vinothchandar

comments are inline

is there a way to not use global index? I need partition movement of records, means need latest updated record in latest partition, and probably I can achieve this only by global bloom.

can you always determine ad for each record.? Yes I can, but how it would be helpful for me, by not using global index? what about records which are in older partitions.

is there certain ordering to wbn - no certain ordering, I can order it only by timestamp.

Yes I tried without global bloom, upsert is very fast, but business requirement is like I need latest updated record in latest partition. Any suggestion If I can achieve this without global bloom?

vinothchandar commented 3 years ago

If you want partition movement, then global index is the only option..

no certain ordering, I can order it only by timestamp.

GLOBAL_BLOOM (or even BLOOM index) will work best if the files are sorted by a key .. so it can skip entire file ranges from being compared and then further reduce using bloom filters..

Any suggestion If I can achieve this without global bloom?

We are working on record level indexes that should make it much faster in the mid term. But thats not an immediate option.. master branch has a GLOBAL_SIMPLE which can be faster than GLOBAL_BLOOM in cases where there is no specific range based pruning that can occur.. Give that a shot?

another optimization in master branch is : dynamic bloom filters that will auto tune itself for aspecific false positive rate.

Raghvendradubey commented 3 years ago

Thanks @vinothchandar for clarifications, will try GLOBAL_SIMPLE.

rafaelhbarros commented 3 years ago

@Raghvendradubey did the GLOBAL_SIMPLE index solve your issue?

vinothchandar commented 3 years ago

So to clarify, GLOBAL_SIMPLE helps when the workload is random writes and affecting every file for e.g in each write. But it is indeed slow in the sense, it ll join against the entire dataset. If you want a better option you can try hbase index. Ofc there is extra operational overhead, but it ll fix perf and also the global indexing requirement

nsivabalan commented 1 year ago

we have done lot of improvements around perf w/ hudi. https://hudi.apache.org/blog/2022/06/29/Apache-Hudi-vs-Delta-Lake-transparent-tpc-ds-lakehouse-performance-benchmarks

Can you try out 0.12. Also, we have written a blog around diff indexes in hudi and when to use what. https://hudi.apache.org/blog/2020/11/11/hudi-indexing-mechanisms/ Might benefit in your use-case.

Closing it out due to long inactivity. Feel free to re-open or open a new issue if you need assistance.