apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.33k stars 2.42k forks source link

[SUPPORT] INSERT_OVERWRITE_TABLE operation not working on Hudi 0.12.2 using EMR Deltastreamer #8672

Open ankitchandnani opened 1 year ago

ankitchandnani commented 1 year ago

Describe the problem you faced

Hi Everyone,

I am testing out hudi 0.12.2 on EMR Deltastreamer with version emr-6.9.0 to perform the INSERT_OVERWRITE_TABLE operation on a set of parquet files in S3 (Source). I am trying to overwrite the entire table in S3 (Target) every time there is a new parquet file in the source folder coming in from DMS CDC. However, after the first initial commit from the INSERT operation, the INSERT_OVERWRITE_TABLE operation completes in Deltastreamer and creates the .replacecommit file in the .hoodie folder at the target but when querying through Athena engine version 2 and spark-sql, I am seeing the count of records to be including records from both commits, instead of correctly seeing the records from the latest commit hence overwrite not working properly.

To Reproduce

Steps to reproduce the behavior:

  1. Download any full and cdc parquet sample files

ex: https://transer-files.s3.amazonaws.com/full.parquet https://transer-files.s3.amazonaws.com/cdc.parquet

  1. Run DeltaStreamer Full and CDC: Make sure to update Source, Target, and jar locations. Below is what I am running:

Full Deltastreamer config:

spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.shuffle.service.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.initialExecutors=1 --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=60s --conf spark.dynamicAllocation.executorIdleTimeout=30s --conf spark.dynamicAllocation.schedulerBacklogTimeout=3s --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false --conf yarn.nodemanager.vmem-check-enabled=false --conf yarn.nodemanager.pmem-check-enabled=false --conf spark.kryoserializer.buffer.max=512m --conf spark.driver.memory=4g --conf spark.driver.memoryOverhead=1024 --conf spark.driver.maxResultSize=2g --conf spark.executor.memory=8g --conf spark.executor.memoryOverhead=2048 --conf spark.executor.cores=2 --conf spark.app.name=insert_overwrite_test_full --jars /usr/lib/spark/external/lib/spark-avro.jar /usr/lib/hudi/hudi-utilities-bundle.jar --table-type COPY_ON_WRITE --op INSERT --source-ordering-field seq_no --hoodie-conf hoodie.datasource.write.recordkey.field=ID1,ID2 --hoodie-conf hoodie.datasource.write.partitionpath.field= --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator --hoodie-conf hoodie.datasource.hive_sync.table=INSERT_OVERWRITE_TEST --hoodie-conf hoodie.datasource.hive_sync.enable=true --hoodie-conf hoodie.datasource.hive_sync.assume_date_partitioning=false --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor --hoodie-conf hoodie.cleaner.commits.retained=10 --hoodie-conf hoodie.deltastreamer.transformer.sql=select 1==2 AS _hoodie_is_deleted, 'I' as Op,* from --hoodie-conf hoodie.datasource.hive_sync.support_timestamp=false --target-base-path s3:///POC/LANDING/INSERT_OVERWRITE_TEST --target-table insert_overwrite_test --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer --enable-sync --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3:///POC/DMS/FULL/RECENT/TEST_FOLDER/TEST_SCHEMA/TEST_TABLE --source-class org.apache.hudi.utilities.sources.ParquetDFSSource

CDC Deltastreamer config:

spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.initialExecutors=1 --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=60s --conf spark.dynamicAllocation.executorIdleTimeout=30s --conf spark.dynamicAllocation.schedulerBacklogTimeout=3s --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false --conf yarn.nodemanager.vmem-check-enabled=false --conf yarn.nodemanager.pmem-check-enabled=false --conf spark.kryoserializer.buffer.max=512m --conf spark.driver.memory=2g --conf spark.driver.memoryOverhead=512 --conf spark.executor.memory=3g --conf spark.executor.memoryOverhead=512 --conf spark.executor.cores=1 --conf spark.task.maxFailures=8 --conf spark.yarn.am.attemptFailuresValidityInterval=1h --conf spark.app.name=insert_overwrite_test_cdc --jars /usr/lib/spark/external/lib/spark-avro.jar /usr/lib/hudi/hudi-utilities-bundle.jar --table-type COPY_ON_WRITE --op INSERT_OVERWRITE_TABLE --source-ordering-field seq_no --hoodie-conf hoodie.datasource.write.recordkey.field=ID1,ID2 --hoodie-conf hoodie.datasource.write.partitionpath.field= --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator --hoodie-conf hoodie.datasource.hive_sync.table=INSERT_OVERWRITE_TEST --hoodie-conf hoodie.datasource.hive_sync.enable=true --hoodie-conf hoodie.datasource.hive_sync.assume_date_partitioning=false --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor --hoodie-conf hoodie.parquet.small.file.limit=134217728 --hoodie-conf hoodie.parquet.max.file.size=268435456 --hoodie-conf hoodie.cleaner.commits.retained=10 --hoodie-conf hoodie.deltastreamer.transformer.sql=select CASE WHEN Op='D' THEN TRUE ELSE FALSE END AS _hoodie_is_deleted,* from --hoodie-conf hoodie.datasource.hive_sync.support_timestamp=false --hoodie-conf hoodie.bloom.index.filter.type=DYNAMIC_V0 --hoodie-conf hoodie.upsert.shuffle.parallelism=25 --target-base-path s3:///POC/LANDING/INSERT_OVERWRITE_TEST --target-table insert_overwrite_test --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer --enable-sync --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3:///POC/DMS/CDC/RECENT/TEST_FOLDER/TEST_SCHEMA/TEST_TABLE --source-class org.apache.hudi.utilities.sources.ParquetDFSSource

  1. Create table in Athena to read parquet files from respective S3 Location
  2. Validate the output in Athena only includes records from CDC file.

Expected behavior

The entire table should be overwritten with the new records in the CDC file. Querying through Athena/spark-sql should return only the records from the latest commit.

Environment Description

ad1happy2go commented 1 year ago

@ankitchandnani Able to reproduce this issue. Will Look into it why this is happening.

#Put full.parquet into the input dir

~/spark/spark-3.2.3-bin-hadoop3.2/bin/spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.12.2.jar \
--table-type COPY_ON_WRITE \
--source-ordering-field seq_no \
--hoodie-conf hoodie.datasource.write.recordkey.field=driver_id \
--hoodie-conf hoodie.datasource.write.partitionpath.field= \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
--hoodie-conf hoodie.cleaner.commits.retained=10 \
--hoodie-conf "hoodie.deltastreamer.transformer.sql=select *, 1==2 AS _hoodie_is_deleted from <SRC> a" \
--hoodie-conf hoodie.datasource.hive_sync.support_timestamp=false \
--target-base-path file:///tmp/issue_8672_2 \
--target-table insert_overwrite_test \
--transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
--hoodie-conf hoodie.deltastreamer.source.dfs.root=file:///tmp/issue_8672_input \
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
--op INSERT

scala> spark.read.format("hudi").load("file:///tmp/issue_8672_2").count()
23/05/09 20:44:28 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
23/05/09 20:44:28 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
res0: Long = 2

scala> spark.read.format("hudi").load("file:///tmp/issue_8672_2").show()
+-------------------+--------------------+------------------+----------------------+--------------------+---+---------+-----------+-----+------+-----+------+------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| op|driver_id|driver_name|state|salary|  car|seq_no|_hoodie_is_deleted|
+-------------------+--------------------+------------------+----------------------+--------------------+---+---------+-----------+-----+------+-----+------+------------------+
|  20230509203417073|20230509203417073...|     driver_id:101|                      |ddef0460-f824-43b...|  I|      101|       John|   NY|8000.0|Honda|      |             false|
|  20230509203417073|20230509203417073...|     driver_id:102|                      |ddef0460-f824-43b...|  I|      102|       Mike|   CA|9000.0|  KIA|      |             false|
+-------------------+--------------------+------------------+----------------------+--------------------+---+---------+-----------+-----+------+-----+------+------------------+

#Put cdc.parquet into the input dir

~/spark/spark-3.2.3-bin-hadoop3.2/bin/spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
        packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.12.2.jar \
        --table-type COPY_ON_WRITE \
        --source-ordering-field seq_no \
        --hoodie-conf hoodie.datasource.write.recordkey.field=driver_id \
        --hoodie-conf hoodie.datasource.write.partitionpath.field= \
        --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
        --hoodie-conf hoodie.cleaner.commits.retained=10 \
        --hoodie-conf "hoodie.deltastreamer.transformer.sql=select *, 1==2 AS _hoodie_is_deleted from <SRC> a" \
        --hoodie-conf hoodie.datasource.hive_sync.support_timestamp=false \
        --target-base-path file:///tmp/issue_8672_2 \
        --target-table insert_overwrite_test \
        --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
        --hoodie-conf hoodie.deltastreamer.source.dfs.root=file:///tmp/issue_8672_input \
        --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
        --op INSERT_OVERWRITE_TABLE

scala> spark.read.format("hudi").load("file:///tmp/issue_8672_2").count()
23/05/09 20:49:05 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
23/05/09 20:49:05 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
res0: Long = 5

scala> spark.read.format("hudi").load("file:///tmp/issue_8672_2").show()
+-------------------+--------------------+------------------+----------------------+--------------------+---+---------+-----------+-----+-------+------+------+------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| op|driver_id|driver_name|state| salary|   car|seq_no|_hoodie_is_deleted|
+-------------------+--------------------+------------------+----------------------+--------------------+---+---------+-----------+-----+-------+------+------+------------------+
|  20230509204818837|20230509204818837...|     driver_id:101|                      |13fd4e9c-53a2-4e7...|  U|      101|       null|   NJ|15000.0|  null|  0001|             false|
|  20230509204818837|20230509204818837...|     driver_id:101|                      |13fd4e9c-53a2-4e7...|  U|      101|       null|   PA|   null|  null|  0002|             false|
|  20230509204818837|20230509204818837...|     driver_id:102|                      |13fd4e9c-53a2-4e7...|  U|      102|       null| null|   null|Toyota|  0003|             false|
|  20230509203417073|20230509203417073...|     driver_id:101|                      |ddef0460-f824-43b...|  I|      101|       John|   NY| 8000.0| Honda|      |             false|
|  20230509203417073|20230509203417073...|     driver_id:102|                      |ddef0460-f824-43b...|  I|      102|       Mike|   CA| 9000.0|   KIA|      |             false|
+-------------------+--------------------+------------------+----------------------+--------------------+---+---------+-----------+-----+-------+------+------+------------------+
ankitchandnani commented 1 year ago

Hi @ad1happy2go any update on the above? Urgent to implement on my side. Thanks

ankitchandnani commented 1 year ago

Hi @codope @ad1happy2go any update? Urgent on our side. Thanks

ad1happy2go commented 1 year ago

@ankitchandnani Actually a bit swamped so didn't got time to debug. If interested, Will you be able to give it a try to debug the issue?

ankitchandnani commented 1 year ago

Tried to debug along with another engineer on my team but no luck. Would highly appreciate some help here @ad1happy2go

ankitchandnani commented 1 year ago

Any update here @ad1happy2go @codope

codope commented 1 year ago

@ankitchandnani We will ve working on the fix and it will be released in 0.14.0. https://issues.apache.org/jira/browse/HUDI-6251

ad1happy2go commented 1 year ago

@ankitchandnani I want to understand your use case also. Delta Streamer is mainly used for streaming sources and INSERT_OVERWRITE_TABLE mode doesn't makes much sense for a delta streaming table.

xicm commented 1 year ago

You can try 0.12.3, this problem may be fixed.

https://github.com/apache/hudi/blob/372fcd8784f7af65adad27047735ea24f4d50128/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java#L45-L57

ankitchandnani commented 1 year ago

Hi @ad1happy2go, sure. This is a table that will be mostly static with minimal changes on a per day basis. We do not have a way to stream the cdc changes for this table so we plan to pull the entire table once a day (its a small table) from the database and want the entire table to be overwritten at the target (S3) with an INSERT_OVERWRITE_TABLE operation once a day on EMR.

ad1happy2go commented 1 year ago

@ankitchandnani Did you got a chance to try out 0.12.3 yet as @xicm suggested?

ankitchandnani commented 1 year ago

Hi @ad1happy2go , apologize for the delayed response. I will be testing out the above with Hudi 0.13.0 version in the next week and provide an update. Thank you!

ankitchandnani commented 1 year ago

Hi @ad1happy2go , looks like the patch is not applied in Hudi 0.13.0. Unfortunately, no AWS EMR version supports a hudi version with a fix. I will be waiting for the next EMR version to test it out with Hudi 0.13.1 in the coming months. Thanks for the assistance here!

ad1happy2go commented 1 year ago

@ankitchandnani Sorry for missing it from my radar, but you can use the any OSS version of hudi with EMR using --packages or --jars option. No need to depend on EMR to provide support.

ad1happy2go commented 10 months ago

@ankitchandnani Were you able to resolve this? Do you have any more questions on this?