apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.31k stars 2.41k forks source link

[SUPPORT] Hudi dont put the same day in the same file #1902

Closed rubenssoto closed 4 years ago

rubenssoto commented 4 years ago

Hi Guys,

I have a small dataset 7 gb, because of that, I didn't partition the data, I prefer create some big files, so Hudi created 13 files of 700mb each. My dataset has a auto increment id, its a table primary key. To load old data I made a batch operation, and Hudi put the data of one day in 10 different files. Hudi do some sort based on primary key column or I have to explicit do a sort operation?

For example, I made this query: select _hoodie_file_name,count(1) from "order" where created_date_brt = '2020-07-01' group by _hoodie_file_name order by _hoodie_file_name

Result:

Captura de Tela 2020-08-02 às 19 27 53

My hudi config: hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.recordkey.field': hudi_config.primary_key_column, 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': hudi_config.write_operation, 'hoodie.combine.before.insert': hudi_config.write_operation in ['insert','bulkinsert'] if 'true' else 'false', 'hoodie.combine.before.upsert': hudi_config.write_operation == 'upsert' if 'true' else 'false', 'hoodie.datasource.write.precombine.field': hudi_config.precombined_column, 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator', 'hoodie.parquet.small.file.limit': 800000000, 'hoodie.parquet.max.file.size': 900000000, 'hoodie.parquet.block.size': 800000000, 'hoodie.copyonwrite.record.size.estimate': 30, 'hoodie.cleaner.commits.retained': 1, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': table_name, 'hoodie.datasource.hive_sync.database': 'datalake_raw', 'hoodie.datasource.hive_sync.jdbcurl': 'jdbc:hive2://ip-10-0-62-197.us-west-2.compute.internal:10000' }

I made this query too: select _hoodie_file_name,count(1),min(id), max(id) from "order" where created_date_brt = '2020-07-01' group by _hoodie_file_name order by min(id)

Captura de Tela 2020-08-02 às 19 32 31

I tried with insert and bulk operation and had the same result.

bvaradar commented 4 years ago

If you want ordering based on day during initial bootstrap, you can use bulkInsert but you need to define your record key such that the day column is the prefix for it to be clustered. You can try using complex Key Generator with recordkey being : "day_col,auto_incr_col"

rubenssoto commented 4 years ago

Hi bvaradar, how are you?

I have to configure this option? hoodie.datasource.write.keygenerator.class: org.apache.hudi.ComplexKeyGenerator

So I need to change this option too? hoodie.datasource.write.recordkey.field

because I have a simple PK, so I cant use day and id like composite pk. My pk, Id it is auto increment, I don't understand why hudi doesn't order appropriate , like you could saw in the second image.

bvaradar commented 4 years ago

Yes, You need to configure hoodie.datasource.write.recordkey.field and hoodie.datasource.write.keygenerator.class: org.apache.hudi.ComplexKeyGenerator

The record-key field that Hudi uses is of type string but the original primary key is numeric type. Hence, you are seeing a different order. If you use the record key using complex-key, you can achieve the clustering you are looking for.

rubenssoto commented 4 years ago

Hi bvaradar, how are you?

I had the same behavior

Captura de Tela 2020-08-05 às 12 03 03

My hudi configs

hudi_options = { 'hoodie.table.name': table_name, 'hoodie.datasource.write.recordkey.field':'created_date_brt,id' 'hoodie.datasource.write.table.name': table_name, 'hoodie.datasource.write.operation': hudi_config.write_operation, 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.combine.before.insert': hudi_config.write_operation in ['insert','bulkinsert'] if 'true' else 'false', 'hoodie.combine.before.upsert': hudi_config.write_operation == 'upsert' if 'true' else 'false', 'hoodie.datasource.write.precombine.field': hudi_config.precombined_column, 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.parquet.small.file.limit': 943718400, 'hoodie.parquet.max.file.size': 1073741824, 'hoodie.parquet.block.size': 1073741824, 'hoodie.copyonwrite.record.size.estimate': hudi_config.record_size_estimate, 'hoodie.cleaner.commits.retained': 1, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': table_name, 'hoodie.datasource.hive_sync.database': 'datalake_raw', 'hoodie.datasource.hive_sync.partition_fields': 'partitionpath', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.hive_sync.jdbcurl': 'jdbc:hive2://ec2-52-12-56-39.us-west-2.compute.amazonaws.com:10000' }

Did I something wrong?

Thank you

rubenssoto commented 4 years ago

I reduce my estimate file row to 20, it was 30 and it was the result

Captura de Tela 2020-08-05 às 15 42 10
bvaradar commented 4 years ago

'hoodie.datasource.write.operation': hudi_config.write_operation, 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.combine.before.insert': hudi_config.write_operation in ['insert','bulkinsert'] if 'true' else 'false',

This does suggest you used "bulkinsert" instead of "bulk_insert". Is this the case ? If so, this would have resulted in upsert operation being performed. Let me open a jira to error out instead of silently reverting to upsert.

rubenssoto commented 4 years ago

Thank you so much for your help, it worked.

Last question, Hudi organized data very well by files, but created some small files, is there any way to solve?

Captura de Tela 2020-08-11 às 19 01 56

{ "hoodie.datasource.write.recordkey.field":"created_date_brt,id", "hoodie.datasource.write.table.name":"order", "hoodie.datasource.write.operation":"bulk_insert", "hoodie.datasource.write.partitionpath.field":"partitionpath", "hoodie.datasource.write.hive_style_partitioning":"true", "hoodie.combine.before.insert":"true", "hoodie.combine.before.upsert":"false", "hoodie.datasource.write.precombine.field":"LineCreatedTimestamp", "hoodie.datasource.write.keygenerator.class":"org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.parquet.small.file.limit":943718400, "hoodie.parquet.max.file.size":1073741824, "hoodie.parquet.block.size":1073741824, "hoodie.copyonwrite.record.size.estimate":512, "hoodie.cleaner.commits.retained":5, "hoodie.datasource.hive_sync.enable":"true", "hoodie.datasource.hive_sync.database":"datalake_raw", "hoodie.datasource.hive_sync.table":"order", "hoodie.datasource.hive_sync.partition_fields":"partitionpath", "hoodie.datasource.hive_sync.partition_extractor_class":"org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.jdbcurl":"jdbc:hive2://ip-10-0-82-196.us-west-2.compute.internal:1000", "hoodie.insert.shuffle.parallelism":1500, "hoodie.bulkinsert.shuffle.parallelism":8, "hoodie.upsert.shuffle.parallelism":1500 }

rubenssoto commented 4 years ago

Hi, With bulk_insert my data was organized very well, so I started a streaming job with upsert on the same data.

Captura de Tela 2020-08-11 às 21 03 07

Why upsert didn't keep files organized? Its the same Hudi Options I only changed hoodie.datasource.write.operation

bvaradar commented 4 years ago

With bulk insert, the parallelism configuration determines the lower bound on the number of files. Since, you started with bulk insert, you are seeing that many number of files. Hudi upsert/insert will route "new records" (with new record keys) to these small files. So, If there are new records on the same partition, you will see those smalll files growing.

rubenssoto commented 4 years ago

Thank you so much for your help!