apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.36k stars 2.42k forks source link

[SUPPORT] Data discrepancies on COW upon joining table #8135

Open samirakarki opened 1 year ago

samirakarki commented 1 year ago

We are using AWS Glue and Hudi connector 0.10.1.2. Spark version:3.1 Our ELT pipeline full load is capturing all the output records based out of join among different tables. However, on the incremental load, we are having a large discrepancy in the output table.

Here's one scenario for incremental load where the discrepancy is happening:

We join multiple tables to create a view in s3 bucket.

Spark sql join : This is inner join when the first table in join does not have any update in any of the records, the last_commit_time remains unchanged, returns empty dataframe. The second table does have some update in records with same join key

We have our built in function for get_dynamic_frame

def hudi_get_dynamic_frame(glue_context: glue_context, glue_table_name:glue_catalog, input_bucket:raw_bucket, input_path:bucket_to_read, partitioned:true, Transformation_ctx: last_hudi_commit_time):

First_table_in _join = old timestamp (since no change in any records, returns empty dataframe) second_table_in _join = latest timestamp (record changed in the incremental load)

Here,the join logic doesn't work properly because the second table doesn’t know which table/key to join with the first_table since that returns a none dynamic frame. Hence the output table doesn't capture update in second table based on the join key.

In the above scenario I have just stated inner join as an example. We have inner and left join among different tables. But we are missing records or there are duplicate records for primary_key in the final joined table.

We want join logic to get all updates & inserts from first_table in the join with the corresponding updates and inserts from second_table in join based on the join_keys in the incremental load.

Hudi Configurations: HUDI_COMMON_CONFIG = { 'connectionName': 'hudi-connection', 'className': 'org.apache.hudi', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.database': 'default', 'hoodie.datasource.write.operation': 'upsert', 'hoodie.combine.before.insert': 'true', 'hoodie.datasource.write.reconcile.schema': 'true', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator', 'hoodie.datasource.hive_sync.mode': "hms"

}

we have custom function to get Last_read_commit Hudi config: connection_options['hoodie.datasource.query.type'] = 'incremental' connection_options['hoodie.datasource.read.begin.instanttime']= last_read_commit

Partitioned Hudi config: connection_options['hoodie.datasource.hive_sync.partition_extractor_class'] = 'org.apache.hudi.hive.MultiPartKeysValueExtractor' connection_options['hoodie.datasource.hive_sync.partition_fields'] = partition_field

Add hudi connection options specific to partitions

connection_options['hoodie.datasource.hive_sync.partition_extractor_class'] = \ 'org.apache.hudi.hive.MultiPartKeysValueExtractor' connection_options['hoodie.datasource.hive_sync.partition_fields'] = '_hoodie_partition_path'

Use consistent timestamps between row-writer and non-row-writer writes

connection_options['hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled'] = True

nsivabalan commented 1 year ago

I am not sure I understand the use-case here. can you help w/ some illustration or examples. for eg, what is old timestamp and what is latest timestamp. also, is first and second table is like bronze and silver table? or these both are two tables in same tier (for eg, both are bronze).

samirakarki commented 1 year ago

@nsivabalan Both tables are in the same zone i.e. landing Zone (this is the zone, s3 bucket, where data is landed from RDS database).

As I stated above, for our use case table 1 data didn’t alter in the incremental load.

  1. The screen shot below is the table 2(after incremental load : Here id = 26, record changed for “complete” column to False.

Screen Shot 2023-03-19 at 7 25 20 PM

  1. The screenshot below is output table from s3 target bucket (result from joined table 1 and 2) execution_id is the primary key from table 2 and step_execution_id is primary key from table 1. In this target table step_execution-id is the primary key. After the incremental load, six rows with execution_id =26, completed column data hasn’t updated to “False”.Although, completed flag changed to “false” for that id (table2) as show in above screenshot.

Screen Shot 2023-03-19 at 7 30 01 PM

This is one use case we are trying to solve.