apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.46k stars 2.43k forks source link

[SUPPORT] How to configure spark and flink to write mor tables using bucket indexes? #11946

Open xiaofan2022 opened 2 months ago

xiaofan2022 commented 2 months ago

I want to use flink and spark to write to the mor table, and use bucket CONSISTENT_HASHING for the index, but I find that spark is very fast to write the full amount and flink is very slow(flink write 100record/s) to write increments. spark sql:

CREATE TABLE test.tableA ()
USING hudi
TBLPROPERTIES (
'connector' = 'hudi',
'index.type'='BUCKET',
'hoodie.index.type'='BUCKET',
'hoodie.index.bucket.engine'='CONSISTENT_HASHING',
'hoodie.datasource.write.recordkey.field' = '',
'path' = '',
'preCombineField' = 'create_time',
'precombine.field' = 'create_time',
'primaryKey' = '',
'table.type' = 'MERGE_ON_READ',
'write.rate.limit'='10000',--flink配置
'write.tasks'='2',--flink配置
'write.utc-timezone'='false',
 'type' = 'mor');

flink_slow How to optimize?

danny0405 commented 2 months ago

Did you have chance to check the thread dump of the operators?

xiaofan2022 commented 2 months ago

"consistent_bucket_write: test.fin_ipr_inmaininfo_test (1/2)#0" Id=89 TIMED_WAITING on java.util.LinkedList@37d9fd7 at java.lang.Object.wait(Native Method)

Is hdfs write performance problematic? If I use simple index for spark, flink uses bucket index very quickly 1k-2krecord/s

danny0405 commented 2 months ago

The performance should be very near for consistent hashing and simple hashing, but from the stacktrace, it looks like the appending to files takes time.

xiaofan2022 commented 2 months ago

So how can this be optimized? This speed is too slow

danny0405 commented 2 months ago

@beyond1920 can you help with the performance issue here?

danny0405 commented 2 months ago

@xiaofan2022 Did you schedule the clustering for expanding the consistent hashing ring already? Did you check the tablePath/.bucket_index/consistent_hashing_metadata for the number of consistent hashing nodes?

xiaofan2022 commented 2 months ago

hdfs dfs -cat hdfs://nameservice1/apps/spark/warehouse/test.db/file_test/.hoodie/.bucket_index/consistent_hashing_metadata/00000000000000.hashing_meta | grep "value" | wc -l

result=>>>256

danny0405 commented 2 months ago

So you have 256 initial buckets?

xiaofan2022 commented 2 months ago

Yes, I set up 'hoodie. Bucket. Index. Max. Num. Buckets' =' 32 ', 'the hoodie. Bucket. Index. Min. Num. Buckets' =' 4 ', but found that there are still 256 buckets

danny0405 commented 2 months ago

yeah, let's figure out the reason, too many buckets would not perform well for streaming write.

ad1happy2go commented 1 month ago

@xiaofan2022 Any updates on this ticket? Were you able to find out the reason why we see 256 buckets?

xiaofan2022 commented 1 week ago

I first create tables through spark and import full data. Then flink updates incremental data in real time, but the default bucket in spark is 4