apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT] Spark Write into MoR type hudi table small parquets issue + Athena Internal Error #10716

Open huliwuli opened 7 months ago

huliwuli commented 7 months ago

Tips before filing an issue

Describe the problem you faced

Background: Currently, I have around 100 mb data each day (batch process), so I am using the delete operation with broadcast join in spark to delete the unused data(data will be updated the next day), I try to avoid global index scanning, so I insert delta data into a new partition.

Parquet size Issue: Set hoodie. shuffle. parallelism =2, does not change the parquet size when using insert. It generates 40 parquets for one partitiom(daily), total size: 180 mb

Athena Issue: Due to the small size of parquets, I implemented clustering (inline) with max commits =1 for test.

Athena Raises Error: Generic_INTERNAL_ERROR: Can not read value at 0 in block -1 in S3/XXXXXX/XXX/XXXX/XXXXX/date=20XX-XX-XX.parquet.

To Reproduce

Steps to reproduce the behavior:

  1. Hudi Options like below: Table ddl (_rt, _ro) was created automatically when enabling the hive sync tool. 00a427c6f6f4d8e2a73b07ccf75bdf5 record keys are strings, partition keys are just dates in yyyy-mm-dd
  2. e3471b52ec345815027c1dd2a759cce

Expected behavior

I wish to keep my parquet size around 120 MB for each file (one partition) or two parquet files (around 64 MB).

Environment Description

ad1happy2go commented 7 months ago

@huliwuli What operation type you are using? In case you are using bulk_insert then it will not handle small file merging out of the box.

huliwuli commented 7 months ago

@ad1happy2go Thanks for the reply. I used insert operation.

huliwuli commented 7 months ago

Regarding Athena Issue: Due to the small size of parquets, I implemented clustering (inline) with max commits =1 for test.

Athena Raises Error: Generic_INTERNAL_ERROR: Can not read value at 0 in block -1 in S3/XXXXXX/XXX/XXXX/XXXXX/date=20XX-XX-XX.parquet

I checked commits, hudi-cli shows two commits one is before the clustering, another one is created after the clustering, but when querying data using pyspark from _rt table it only has the old commit time.
I think the Hudi table did not sync with glue data catalog in this case

ad1happy2go commented 7 months ago

@huliwuli "insert" operation type should handle merging small files. I see you set up small file size limit as 10 MB. can you remove that config (default 104857600) or increase that and see if that helps.

huliwuli commented 7 months ago

@huliwuli "insert" operation type should handle merging small files. I see you set up a small file size limit of 10 MB. Can you remove that config (default 104857600) or increase that and see if that helps?

I will try it, continuing with the clustering issue. It still raises an internal error if I use inline clustering for Athena. Additionally, async clustering worked greate, I can see the large parquets after clustering and replace commit. However, when I query using pyspark and Athena, I am not able to see the latest commit (timeline).

ad1happy2go commented 7 months ago

@huliwuli Do you see a successful .replacecommit if the clustering was successful. Can you post screenshot for the timeline

huliwuli commented 7 months ago

@huliwuli Do you see a successful .replacecommit if the clustering was successful. Can you post screenshot for the timeline

Thanks for help, I did not see a named "successful.replacecommit" Here is the timeline

bfcad332be78dded470ae28b60d5db2

huliwuli commented 7 months ago

@huliwuli "insert" operation type should handle merging small files. I see you set up small file size limit as 10 MB. can you remove that config (default 104857600) or increase that and see if that helps.

Not working with EMR 6.15, it still generates 40 files -50 files for one-day data.

"hoodie.copyonwrite.record.size.estimate" This setting works, however I am using MoR type table. Not sure whether it will occur any risks if I use this setting on the MoR-type table.

I know bulk_insert works and controls the file size, but is it okay to use bulk_insert with append for daily delta data?

My use case is only to insert data into a partition(date), and sometimes need to delete previous data. That's why I use MoR

ad1happy2go commented 7 months ago

@huliwuli So, It looks like your per record size is really small. Hudi uses previous commit's statistics to guess future record sizes. For very first commit, it relies on the config "hoodie.copyonwrite.record.size.estimate" (default 1024). So setting it to a lower value might worked for you. Is that correct?

bulk_insert don't merge the small files out of the box. So you need to run clustering job for merging small files. If most of the time you just get inserts, then you may just use COW table. I assume by delete previous data you mean delete old partitions only.

huliwuli commented 7 months ago

@huliwuli So, It looks like your per record size is really small. Hudi uses previous commit's statistics to guess future record sizes. For very first commit, it relies on the config "hoodie.copyonwrite.record.size.estimate" (default 1024). So setting it to a lower value might worked for you. Is that correct?

bulk_insert don't merge the small files out of the box. So you need to run clustering job for merging small files. If most of the time you just get inserts, then you may just use COW table. I assume by delete previous data you mean delete old partitions only.

Thanks for the reply. "hoodie.copyonwrite.record.size.estimate" works on my MOR table when I set it to 30-40.

In most cases, we delete some rows for one old partition, but the number of rows is not predictable. We currently use MoR, if you suggest we use the COW table, can I switch to COW directly from the hudi options?