Open rshanmugam1 opened 1 year ago
Hi @rshanmugam1 how do you read your table? It looks like both the first and second base files for both commits are being read. Hudi should only show you records from the base file for latest commit.
@kazdy was using trino + open source hudi to read this.
to simplify this, tried using spark shell inside the emr, which also gives same thing.
spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
spark.read.format("hudi").load("....").show(false)
or
spark.sql("select * from db.table").show(false)
also, commit history. in second commit, it shows only 1 upsert, it should had been 3 right?.
Anothre thing I see is lack of precombine field set in hudi table, spark requires this field to do updates, otherwise, it might fallback to insert and create duplicates. Maybe it's worth to define precombine field and give it a try?
tried with precombine key same behavior.
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
file_format = 'hudi',
options={
'type': 'cow',
'primaryKey': 'id',
'preCombineKey': 'ts',
},
unique_key = 'id',
) }}
{% if not is_incremental() %}
select cast(1 as bigint) as id, 'yo' as msg, current_timestamp() as ts
union all
select cast(2 as bigint) as id, 'anyway' as msg, current_timestamp() as ts
union all
select cast(3 as bigint) as id, 'bye' as msg, current_timestamp() as ts
{% else %}
select cast(1 as bigint) as id, 'yo_updated' as msg, current_timestamp() as ts
union all
select cast(2 as bigint) as id, 'anyway_updated' as msg, current_timestamp() as ts
union all
select cast(3 as bigint) as id, 'bye_updated' as msg, current_timestamp() as ts
{% endif %}
dbt queries
create table analytics.test_merge_3
using hudi
options (type "cow" , primaryKey "id" , preCombineKey "ts")
as
select cast(1 as bigint) as id, 'yo' as msg, current_timestamp() as ts
union all
select cast(2 as bigint) as id, 'anyway' as msg, current_timestamp() as ts
union all
select cast(3 as bigint) as id, 'bye' as msg, current_timestamp() as ts
merge into analytics.test_merge_3 as DBT_INTERNAL_DEST
using test_merge_3__dbt_tmp as DBT_INTERNAL_SOURCE
on
DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id
when matched then update set
*
when not matched then insert *
spark stages
hoodie.properties
#Properties saved on Fri Dec 02 12:15:42 UTC 2022
#Fri Dec 02 12:15:42 UTC 2022
hoodie.table.partition.fields=
hoodie.table.type=COPY_ON_WRITE
hoodie.archivelog.folder=archived
hoodie.timeline.layout.version=1
hoodie.table.version=3
hoodie.table.recordkey.fields=id
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.table.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
hoodie.table.name=test_merge_3
hoodie.datasource.write.hive_style_partitioning=true
hoodie.table.create.schema={"type"\:"record","name"\:"topLevelRecord","fields"\:[{"name"\:"_hoodie_commit_time","type"\:["string","null"]},{"name"\:"_hoodie_commit_seqno","type"\:["string","null"]},{"name"\:"_hoodie_record_key","type"\:["string","null"]},{"name"\:"_hoodie_partition_path","type"\:["string","null"]},{"name"\:"_hoodie_file_name","type"\:["string","null"]},{"name"\:"id","type"\:"long"},{"name"\:"msg","type"\:"string"},{"name"\:"ts","type"\:{"type"\:"long","logicalType"\:"timestamp-micros"}}]}
@jonvex : Can you follow up here.
You also need to set a partitionpath
@jonvex Can you give more information about this answer? That doesn't seem to solve the issue
@jonvex @nsivabalan I'm running into the same issue as well. Is anyone currently investigating this?
Running into the same issue too. Hope this gets looked at
@faizhasan @rshanmugam1 Working on reproducing this with precombineField. Will update soon.
Without precombine key, its anyway fallback to insert and you will see duplicates.
@faizhasan @rshanmugam1 Apologies for the delay here. I tried to reproduce and found out that it is working fine. I tried with 0.12.1 version. Model I used, exactly like we have in ticket
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
file_format = 'hudi',
options={
'type': 'cow',
'primaryKey': 'id',
'preCombineKey': 'ts',
},
unique_key = 'id',
location_root='file:///tmp/dbt/issue_7244_1/'
) }}
{% if not is_incremental() %}
select cast(1 as bigint) as id, 'yo' as msg, current_timestamp() as ts
union all
select cast(2 as bigint) as id, 'anyway' as msg, current_timestamp() as ts
union all
select cast(3 as bigint) as id, 'bye' as msg, current_timestamp() as ts
{% else %}
select cast(1 as bigint) as id, 'yo_updated' as msg, current_timestamp() as ts
union all
select cast(2 as bigint) as id, 'anyway_updated' as msg, current_timestamp() as ts
union all
select cast(3 as bigint) as id, 'bye_updated' as msg, current_timestamp() as ts
{% endif %}
here are the results after first and second run --
Verified using the latest master using the same model as @ad1happy2go above and successfully ran the model
DBT run
18:45:29 Running with dbt=1.5.3
18:45:29 [WARNING]: Deprecated functionality
The `source-paths` config has been renamed to `model-paths`. Please update your
`dbt_project.yml` configuration to reflect this change.
18:45:29 [WARNING]: Deprecated functionality
The `data-paths` config has been renamed to `seed-paths`. Please update your
`dbt_project.yml` configuration to reflect this change.
18:45:29 Registered adapter: spark=1.5.0
18:45:29 Found 1 model, 2 tests, 0 snapshots, 0 analyses, 357 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups
18:45:29
18:45:31 Concurrency: 1 threads (target='dev')
18:45:31
18:45:31 1 of 1 START sql incremental model default.issue_7244_model .................... [RUN]
18:45:38 1 of 1 OK created sql incremental model default.issue_7244_model ............... [OK in 7.93s]
18:45:39
18:45:39 Finished running 1 incremental model in 0 hours 0 minutes and 9.22 seconds (9.22s).
18:45:39
18:45:39 Completed successfully
18:45:39
18:45:39 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
amrish@Amrishs-MBP github-issue-7244 %
spark-sql verification
spark-sql> show databases;
default
test_database1
Time taken: 2.562 seconds, Fetched 2 row(s)
spark-sql> use default
> ;
23/07/25 11:47:20 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
Time taken: 0.125 seconds
spark-sql> show tables;
issue_7244_model
my_first_dbt_model
my_first_dbt_model1
my_second_dbt_model
Time taken: 0.263 seconds, Fetched 4 row(s)
spark-sql> select * from issue_7244_model
> ;
23/07/25 11:47:43 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
23/07/25 11:47:43 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
20230725114531327 20230725114531327_1_4 2 fbb84dbc-e72f-4ac6-990a-d0205e2aaab3-0_1-33-0_20230725114531327.parquet 2 anyway 2023-07-25 11:45:31.367
20230725114531327 20230725114531327_2_5 3 c1d85730-7a1a-4845-bb4a-1b7128f6de3d-0_2-34-0_20230725114531327.parquet 3 bye 2023-07-25 11:45:31.367
20230725114531327 20230725114531327_0_6 1 1da126fe-eb3a-4982-ab77-f294458eefea-0_0-32-0_20230725114531327.parquet 1 yo 2023-07-25 11:45:31.367
Time taken: 4.461 seconds, Fetched 3 row(s)
Also verified against Hudi version 0.12.3 and 0.13.1. 'dbt run' was successful in both case with Hudi tables getting created with data without duplicate rows when dbt is run twice in a row. @faizhasan @rshanmugam1 wondering if you are able to move to a more recent version of Hudi? (0.12.3 or 13.1 for example)?
Note: Hudi 0.10.0 is not supported with Spark 3.2. Please see version support matrix here.
Hi @amrishlal apologies for the delay.
I was able to test this and saw the following behavior. with dbt 1.6.2, dbt-spark adapter to execute models on thriftserver:
@faizhasan Can you please try with emr-6.12.0 also , as that include 0.13.1 which includes some fixes. Thanks.
I did try emr 6.13.0 and found duplicates. Unfortunately I dont work for that org anymore so dont have the stacks to test with any further.
@faizhasan You should only find dups if you don't have precombine key configured. As without precombineField it works as operation type insert. I and @amrishlal was never able to reproduce this issue and ran multiple models as discussed above.
this is my dbt model. if i run twice that creates duplicates. am i missing any obvious configurations?.
A clear and concise description of the problem. dbt merge model product duplicates.
To Reproduce
Steps to reproduce the behavior:
Expected behavior duplicates should not present
Environment Description
Hudi version : 0.10.1
Spark version : 3.2.0
Hive version : 2.3.3
Hadoop version :
Storage (HDFS/S3/GCS..) : S3
Running on Docker? (yes/no) : no
Additional context it is EMR emr-6.6.0 and running thrift server on it.
table properties
Create Query
Merge second run