apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.23k stars 2.39k forks source link

[SUPPORT]the compaction of the MOR hudi table keeps the old values #7897

Open menna224 opened 1 year ago

menna224 commented 1 year ago

I am having a glue job in which I write to hudi table, and I write it as MOR here's the config:

conf = { 'className': 'org.apache.hudi', 'hoodie.table.name': hudi_table_name, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.table.type': 'MERGE_ON_READ', 'hoodie.datasource.write.precombine.field': 'timestamp', 'hoodie.datasource.write.recordkey.field': 'user_id',

'hoodie.datasource.write.partitionpath.field': 'year:SIMPLE,month:SIMPLE,day:SIMPLE',

#'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.CustomKeyGenerator',
#'hoodie.deltastreamer.keygen.timebased.timestamp.type': 'DATE_STRING',
#'hoodie.deltastreamer.keygen.timebased.input.dateformat': 'yyyy-mm-dd',
#'hoodie.deltastreamer.keygen.timebased.output.dateformat': 'yyyy/MM/dd'

}

hudiGlueConfig = { 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.sync_as_datasource': 'false', 'hoodie.datasource.hive_sync.database': database_name, 'hoodie.datasource.hive_sync.table': hudi_table_name, 'hoodie.datasource.hive_sync.use_jdbc': 'false',

'hoodie.datasource.write.hive_style_partitioning': 'false',

#'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
#'hoodie.datasource.hive_sync.partition_fields': 'year,month,day'

}

config_={conf, hudiGlueConfig}

I noticed that for each new record I append I had parquet file,so, first parquet has the first record, then when i insert new row a second parquet file created with both records, and when I insert for the third time a third parquet file is created with the 3 rows and when I update any of them I have a log file contains the update, and after number of appends the parqeut files compacted into one parquet file(the newest parquet file is kept (which has the three records appended) however , the other two parquet files are removed. However, this file contains the old values of initially added records, not the updated ones, any clue what I might be doing wrong?

the rt view reflects the correct data, the ro doesn't.

I am writing it as: glueContext.forEachBatch( frame=data_frame_DataSource0, batch_function=processBatch, options={ "windowSize": window_size, "checkpointLocation": s3_path_spark } )

glueContext.write_dynamic_frame.from_options(
    frame=DynamicFrame.fromDF(df, glueContext, "df"),
    connection_type="custom.spark",
    connection_options=config_
)

is it expected for each time I insert new record, a parquet file is created with accumulative records that were added? shouldn't it be reflected in the logfiles? in my case only the updates in existing records reflected in delta log files, but it's never written to the parquet files, even when I reach the number of commits, and the compaction is happening by deleting the oldest files and keeping only the last one created with the 3 rows inserted before separately but now they are in the same parquet?

case:

when I insert a row (id=3,name=mg) to the db:

spark streaming job creates a parquet file in the s3 path for hudi that contains (id=3,name=mg) --> file1.parquet

and the record is reflected in both rt, and ro tables

then when I add a new row (id=4,name=sa) :

spark streaming job creates a parquet file in the s3 path for hudi that contains both records --->file2.paruet (id=3,name=mg) (id=4,name=sa)

and both records is reflected in both rt, and ro tables

then when I add a new row (id=5,name=john) :

spark streaming job creates a parquet file in the s3 path for hudi that contains the three records --->file3.paruet (id=3,name=mg) (id=4,name=sa) (id=5,name=john)

and the three records is reflected in both rt, and ro tables.

when I do multiple updates (say 9 updates) to the last record, I can see all these updates in the log files so my bucket contains 3 parquet files & 9 log files,

then after the 10th update where i changed the name to "joe", I can see 10 log files, and only 1 parquet file, the parquet file that is kept is the last one (file3.parquet) with the old values not the updates ones: (id=3,name=mg) (id=4,name=sa) (id=5,name=john)

and file1.parquet &file2.parquet were delted. rt table contained the right values (the three records and the last record has a value joe for the coloum name) ro contained the values that's in the parquet I was expecting to find totally newly created parquet file that contains the values: (id=3,name=mg) (id=4,name=sa) (id=5,name=joe) and the ro to have the most updated ones.

menna224 commented 1 year ago

update: this issue was with glue version 3 and hudi connector from AWS market place. but the same happened when I tried glue 4 and I used a jar for hudi version [hudi-spark3.3-bundle_2.12-0.12.2.jar]

I edited in config because I had some issues while trying to run (added 'hoodie.datasource.hive_sync.use_jdbc':'false', 'hoodie.datasource.hive_sync.mode':'hms',):

hudiWriteConfig = { 'className': 'org.apache.hudi', 'hoodie.table.name': hudi_table_name_for_replica, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.table.type': 'MERGE_ON_READ', 'hoodie.datasource.write.precombine.field': 'timestamp', 'hoodie.datasource.write.recordkey.field': 'user_id', 'metadata.compaction.delta_commits':4, 'hoodie.datasource.hive_sync.use_jdbc':'false', 'hoodie.datasource.hive_sync.mode':'hms',

#'hoodie.datasource.write.partitionpath.field': 'year:SIMPLE,month:SIMPLE,day:SIMPLE',
#'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.CustomKeyGenerator',
#'hoodie.deltastreamer.keygen.timebased.timestamp.type': 'DATE_STRING',
#'hoodie.deltastreamer.keygen.timebased.input.dateformat': 'yyyy-mm-dd',
#'hoodie.deltastreamer.keygen.timebased.output.dateformat': 'yyyy/MM/dd'

}

menna224 commented 1 year ago

@nsivabalan can you please help in this? :)

danny0405 commented 1 year ago

I noticed that for each new record I append I had parquet file,so, first parquet has the first record, then when i insert new row a second parquet file created with both records, and when I insert for the third time a third parquet file is created with the 3 rows and when I update any of them I have a log file contains the update, and after number of appends the parqeut files compacted into one parquet file(the newest parquet file is kept (which has the three records appended) however , the other two parquet files are removed.

This is actually how the BLOOM_FILTER index works, all the inserts are written into a new FileSlice, only delta updates are written into logs.(Because you know, for UPDATEs, Hudi needs to know where its old records are located). And there are also small file/fileSlice strategy here so that things are kind of more complex, like you have perceived that new records are written into the same file group.

The rt view would merge all the base parquet and delta logs so that the result is correct.

menna224 commented 1 year ago

Thank you @danny0405 for your response. I am aware that rt would reflect it and indeed it reflected the changes instantly, but as far as I understand that the when compaction is done ro will fetch the changes, when will it happen or how? I was expecting that after the 10 commits and the parquet files deleted and only one was left should be reflected?

danny0405 commented 1 year ago

The file removing timing depends on your cleaning strategy, by default it keeps about 30 commits on the timeline, take https://hudi.apache.org/docs/hoodie_cleaner for a reference.

nsivabalan commented 1 year ago

hey @menna224 : let me clarify something and then will ask some clarification.

Commit1: Key1, val1 : file1_v1.parquet.

Commit2: key2, val2: file1_v2.parquet

both file1_v1 and file1_v2 belongs to same file group. When you do read query, hudi will only read file1_v2.parquet. this is due to small file handling. Cleaner when its get executed later, will clean up file1_v1.parquet. but once file1_v2.parquet is created, none of your snapshot queries will read from file1_v1.

Commit3: key3, val3.: again due to small file handling, file1_v3.parquet.

Commit4: key3, val4 (same key as before, but an update) Hudi will add a log file to file1 (file group).

So, on disk its file1_v3.parquet and log_file1.parquet.

with rt, hudi will read both of them, merge and server. incase of ro, hudi will read just file1_v3.parquet.

Lets say, we keep adding more updates for key3. more log files will be added. once compaction kicks in, a new parquet file will be created file1_v4.parquet (which is a merged version of file1_v3 + all associated log files).

Can you clarify whats the issue you are seeing. your example wasn't very clear for me. esply on these statements.

then after the 10th update where i changed the name to "joe", I can see 10 log files, and only 1 parquet file, the parquet file that is kept is the last one (file3.parquet) with the old values not the updates ones:
(id=3,name=mg)
(id=4,name=sa)
(id=5,name=john)

and file1.parquet &file2.parquet were delted.
rt table contained the right values (the three records and the last record has a value joe for the coloum name)
ro contained the values that's in the parquet