apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.39k stars 2.42k forks source link

[SUPPORT] Need Assistance with Hudi Delta Streamer for Community Video #8309

Closed soumilshah1995 closed 1 year ago

soumilshah1995 commented 1 year ago

Hello All firstly thank you very much for all help from community. i would want to mention i am new to delta streamer i have worked a lot with Glue jobs and i want to experiment with delta streamer so i can make videos and teach the community

i have setup complete pipeline from AWS Aurora Postgres > DMS > S3 and i have EMR cluster 6.9 with Spark 3

Attaching links for sample parquet files and sample json how it looks like image

Link to data files https://drive.google.com/drive/folders/1BwNEK649hErbsWcYLZhqCWnaXFX3mIsg?usp=share_link

image

Here is how i submit jobs

    spark-submit
    --master yarn
    --deploy-mode cluster
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
    --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/lib/hudi/hudi-utilities-bundle.jar
    --table-type COPY_ON_WRITE
    --source-ordering-field replicadmstimestamp
    --source-class org.apache.hudi.utilities.sources.ParquetDFSSource
    --target-base-path s3://sql-server-dms-demo/hudi/public/sales
    --target-table invoice
    --payload-class org.apache.hudi.common.model.AWSDmsAvroPayload
    --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
    --hoodie-conf hoodie.datasource.write.recordkey.field=invoiceid
    --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://sql-server-dms-demo/raw/public/sales/

Error i get

23/03/28 13:08:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/28 13:08:49 INFO DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at ip-172-32-147-4.ec2.internal/172.32.147.4:8032
23/03/28 13:08:50 INFO Configuration: resource-types.xml not found
23/03/28 13:08:50 INFO ResourceUtils: Unable to find 'resource-types.xml'.
23/03/28 13:08:50 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (6144 MB per container)
23/03/28 13:08:50 INFO Client: Will allocate AM container, with 2432 MB memory including 384 MB overhead
23/03/28 13:08:50 INFO Client: Setting up container launch context for our AM
23/03/28 13:08:50 INFO Client: Setting up the launch environment for our AM container
23/03/28 13:08:50 INFO Client: Preparing resources for our AM container
23/03/28 13:08:50 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
23/03/28 13:08:52 INFO Client: Uploading resource file:/mnt/tmp/spark-7f0fabb5-07de-43c3-8a26-a2325d5be63a/__spark_libs__363124573059127100.zip -> hdfs://ip-172-32-147-4.ec2.internal:8020/user/hadoop/.sparkStaging/application_1680007316515_0003/__spark_libs__363124573059127100.zip
23/03/28 13:08:53 INFO Client: Uploading resource file:/usr/lib/hudi/hudi-utilities-bundle_2.12-0.12.1-amzn-0.jar -> hdfs://ip-172-32-147-4.ec2.internal:8020/user/hadoop/.sparkStaging/application_1680007316515_0003/hudi-utilities-bundle_2.12-0.12.1-amzn-0.jar
23/03/28 13:08:53 INFO Client: Uploading resource file:/etc/spark/conf.dist/hive-site.xml -> hdfs://ip-172-32-147-4.ec2.internal:8020/user/hadoop/.sparkStaging/application_1680007316515_0003/hive-site.xml
23/03/28 13:08:54 INFO Client: Uploading resource file:/etc/hudi/conf.dist/hudi-defaults.conf -> hdfs://ip-172-32-147-4.ec2.internal:8020/user/hadoop/.sparkStaging/application_1680007316515_0003/hudi-defaults.conf
23/03/28 13:08:54 INFO Client: Uploading resource file:/mnt/tmp/spark-7f0fabb5-07de-43c3-8a26-a2325d5be63a/__spark_conf__2001263387666561545.zip -> hdfs://ip-172-32-147-4.ec2.internal:8020/user/hadoop/.sparkStaging/application_1680007316515_0003/__spark_conf__.zip
23/03/28 13:08:54 INFO SecurityManager: Changing view acls to: hadoop
23/03/28 13:08:54 INFO SecurityManager: Changing modify acls to: hadoop
23/03/28 13:08:54 INFO SecurityManager: Changing view acls groups to: 
23/03/28 13:08:54 INFO SecurityManager: Changing modify acls groups to: 
23/03/28 13:08:54 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
23/03/28 13:08:54 INFO Client: Submitting application application_1680007316515_0003 to ResourceManager
23/03/28 13:08:54 INFO YarnClientImpl: Submitted application application_1680007316515_0003
23/03/28 13:08:55 INFO Client: Application report for application_1680007316515_0003 (state: ACCEPTED)
23/03/28 13:08:55 INFO Client: 
     client token: N/A
     diagnostics: AM container is launched, waiting for AM container to Register with RM
     ApplicationMaster host: N/A
     ApplicationMaster RPC port: -1
     queue: default
     start time: 1680008934287
     final status: UNDEFINED
     tracking URL: http://ip-172-32-147-4.ec2.internal:20888/proxy/application_1680007316515_0003/
     user: hadoop
23/03/28 13:08:56 INFO Client: Application report for application_1680007316515_0003 (state: ACCEPTED)
23/03/28 13:08:57 INFO Client: Application report for application_1680007316515_0003 (state: ACCEPTED)
23/03/28 13:08:58 INFO Client: Application report for application_1680007316515_0003 (state: ACCEPTED)
23/03/28 13:08:59 INFO Client: Application report for application_1680007316515_0003 (state: ACCEPTED)
23/03/28 13:09:00 INFO Client: Application report for application_1680007316515_0003 (state: ACCEPTED)
23/03/28 13:09:01 INFO Client: Application report for application_1680007316515_0003 (state: RUNNING)
23/03/28 13:09:01 INFO Client: 
     client token: N/A
     diagnostics: N/A
     ApplicationMaster host: ip-172-32-12-21.ec2.internal
     ApplicationMaster RPC port: 34367
     queue: default
     start time: 1680008934287
     final status: UNDEFINED
     tracking URL: http://ip-172-32-147-4.ec2.internal:20888/proxy/application_1680007316515_0003/
     user: hadoop
23/03/28 13:09:02 INFO Client: Application report for application_1680007316515_0003 (state: RUNNING)
23/03/28 13:09:03 INFO Client: Application report for application_1680007316515_0003 (state: RUNNING)
23/03/28 13:09:04 INFO Client: Application report for application_1680007316515_0003 (state: ACCEPTED)
23/03/28 13:09:04 INFO Client: 
     client token: N/A
     diagnostics: AM container is launched, waiting for AM container to Register with RM
     ApplicationMaster host: N/A
     ApplicationMaster RPC port: -1
     queue: default
     start time: 1680008934287
     final status: UNDEFINED
     tracking URL: http://ip-172-32-147-4.ec2.internal:20888/proxy/application_1680007316515_0003/
     user: hadoop
23/03/28 13:09:05 INFO Client: Application report for application_1680007316515_0003 (state: ACCEPTED)
23/03/28 13:09:06 INFO Client: Application report for application_1680007316515_0003 (state: ACCEPTED)
23/03/28 13:09:07 INFO Client: Application report for application_1680007316515_0003 (state: ACCEPTED)
23/03/28 13:09:08 INFO Client: Application report for application_1680007316515_0003 (state: ACCEPTED)
23/03/28 13:09:09 INFO Client: Application report for application_1680007316515_0003 (state: RUNNING)
23/03/28 13:09:09 INFO Client: 
     client token: N/A
     diagnostics: N/A
     ApplicationMaster host: ip-172-32-12-21.ec2.internal
     ApplicationMaster RPC port: 42741
     queue: default
     start time: 1680008934287
     final status: UNDEFINED
     tracking URL: http://ip-172-32-147-4.ec2.internal:20888/proxy/application_1680007316515_0003/
     user: hadoop
23/03/28 13:09:10 INFO Client: Application report for application_1680007316515_0003 (state: RUNNING)
23/03/28 13:09:11 INFO Client: Application report for application_1680007316515_0003 (state: FINISHED)
23/03/28 13:09:11 INFO Client: 
     client token: N/A
     diagnostics: User class threw exception: java.io.IOException: Could not load key generator class org.apache.hudi.keygen.SimpleKeyGenerator
    at org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.createKeyGenerator(HoodieSparkKeyGeneratorFactory.java:74)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:235)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:675)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:146)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:119)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:571)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:742)
Caused by: org.apache.hudi.exception.HoodieException: Unable to instantiate class org.apache.hudi.keygen.SimpleKeyGenerator
    at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:91)
    at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:118)
    at org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.createKeyGenerator(HoodieSparkKeyGeneratorFactory.java:72)
    ... 10 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:89)
    ... 12 more
Caused by: java.lang.IllegalArgumentException: Property hoodie.datasource.write.partitionpath.field not found
    at org.apache.hudi.common.config.TypedProperties.checkKey(TypedProperties.java:67)
    at org.apache.hudi.common.config.TypedProperties.getString(TypedProperties.java:72)
    at org.apache.hudi.keygen.SimpleKeyGenerator.<init>(SimpleKeyGenerator.java:41)
    ... 17 more

     ApplicationMaster host: ip-172-32-12-21.ec2.internal
     ApplicationMaster RPC port: 42741
     queue: default
     start time: 1680008934287
     final status: FAILED
     tracking URL: http://ip-172-32-147-4.ec2.internal:20888/proxy/application_1680007316515_0003/
     user: hadoop
23/03/28 13:09:11 ERROR Client: Application diagnostics message: User class threw exception: java.io.IOException: Could not load key generator class org.apache.hudi.keygen.SimpleKeyGenerator
    at org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.createKeyGenerator(HoodieSparkKeyGeneratorFactory.java:74)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:235)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:675)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:146)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:119)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:571)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:742)
Caused by: org.apache.hudi.exception.HoodieException: Unable to instantiate class org.apache.hudi.keygen.SimpleKeyGenerator
    at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:91)
    at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:118)
    at org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.createKeyGenerator(HoodieSparkKeyGeneratorFactory.java:72)
    ... 10 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:89)
    ... 12 more
Caused by: java.lang.IllegalArgumentException: Property hoodie.datasource.write.partitionpath.field not found
    at org.apache.hudi.common.config.TypedProperties.checkKey(TypedProperties.java:67)
    at org.apache.hudi.common.config.TypedProperties.getString(TypedProperties.java:72)
    at org.apache.hudi.keygen.SimpleKeyGenerator.<init>(SimpleKeyGenerator.java:41)
    ... 17 more

Exception in thread "main" org.apache.spark.SparkException: Application application_1680007316515_0003 finished with failed status
    at org.apache.spark.deploy.yarn.Client.run(Client.scala:1354)
    at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1776)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1006)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1095)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1104)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
23/03/28 13:09:11 INFO ShutdownHookManager: Shutdown hook called
23/03/28 13:09:11 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-dbd4de25-f7b1-4875-b7fd-c599028ae4e0
23/03/28 13:09:11 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-7f0fabb5-07de-43c3-8a26-a2325d5be63a
Command exiting with ret '1'

References

https://cwiki.apache.org/confluence/display/HUDI/2020/01/20/Change+Capture+Using+AWS+Database+Migration+Service+and+Hudi

https://aws.amazon.com/blogs/big-data/apply-record-level-changes-from-relational-databases-to-amazon-s3-data-lake-using-apache-hudi-on-amazon-emr-and-aws-database-migration-service/

https://github.com/apache/hudi/issues/2406

soumilshah1995 commented 1 year ago

@n3nash @bvaradar

bvaradar commented 1 year ago

@soumilshah1995 : This looks like the correct package is not added to classpath. Can you check if org.apache.hudi.keygen.SimpleKeyGenerator is present in the jar being passed.

yihua commented 1 year ago

Hi @soumilshah1995 The bottom exception complains that Property hoodie.datasource.write.partitionpath.field not found. You need to specify the partition path field with --hoodie-conf hoodie.datasource.write.partitionpath.field=<partition_column>.

soumilshah1995 commented 1 year ago

does this looks okay to you @bvaradar | @yihua
if i am missing anything can you please point me out i am new to delta streamer :D

  spark-submit
    --master yarn
    --deploy-mode cluster
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
    --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
    --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
    --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/lib/hudi/hudi-utilities-bundle.jar
    --table-type COPY_ON_WRITE
    --source-ordering-field replicadmstimestamp
    --source-class org.apache.hudi.utilities.sources.ParquetDFSSource
    --target-base-path s3://sql-server-dms-demo/hudi/public/sales
    --target-table invoice
    --payload-class org.apache.hudi.common.model.AWSDmsAvroPayload
    --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
    --hoodie-conf hoodie.datasource.write.recordkey.field=invoiceid
    --hoodie-conf hoodie.datasource.write.partitionpath.field=destinationstate
    --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://sql-server-dms-demo/raw/public/sales/
soumilshah1995 commented 1 year ago

Please correct me if I'm doing something incorrectly. I've attached base files for your reference. I see hudi folders being created, but I don't see any base files (Parquet files) being created. any idea why ?

also i just have two files on S3 which i generated via DMS and i am trying out delta streamer

Links to Base files https://drive.google.com/drive/folders/1BwNEK649hErbsWcYLZhqCWnaXFX3mIsg?usp=share_link

image

soumilshah1995 commented 1 year ago

@yihua @bvaradar i just fired job image

as i was Explaining even after 10 minutes i dont see hudi base files i do see meta data folder created by hudi but base files are not generated and there are only two files on S3 should not take this long ? am i doing something wrong here :D

image

image

pratyakshsharma commented 1 year ago

Do you see any exceptions in logs? Also are you writing to this base path first time? Is your .hoodie folder empty? @soumilshah1995

soumilshah1995 commented 1 year ago

@yihua @pratyakshsharma First of all, I want to thank you for taking time out of your busy schedule to assist me with this matter. Yes i cancelled the job since i am performing the lab from AWS Account to avoid charges the job was in running state from past 10 minutes and i could not see any Base Files. Yes i am writing to base path for first time. also there are only two small files to process i assume it should not take this long

if you all can tell me what am i missing or how can i resolve error that would be great please let me know if you need any further details happy to provide you. attaching screenshot of S3 which shows hudi folder was created but base file was not created image

Also i SSH into cluster to make sure JAR files are present

image

Links to parquet file can be found https://drive.google.com/drive/folders/1BwNEK649hErbsWcYLZhqCWnaXFX3mIsg?usp=share_link

soumilshah1995 commented 1 year ago

Step 1

image

Step 2:

image

Even after running for 10 minutes i dont see base files

i have stopped job and try to run it again then i am seeing following image

soumilshah1995 commented 1 year ago

issue has been resolved Daniel Ford was very kind to help on slack i will post detailed video about delta streamer on Youtube Channel

soumilshah1995 commented 1 year ago
  spark-submit \
    --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension  \
    --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
    --conf spark.sql.hive.convertMetastoreParquet=false \
    --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory \
    --master yarn \
    --deploy-mode client \
    --deploy-mode cluster \
    --executor-memory 1g \
    --driver-memory 2g \
     /usr/lib/hudi/hudi-utilities-bundle.jar \
    --table-type COPY_ON_WRITE \
    --op UPSERT \
    --source-ordering-field replicadmstimestamp  \
    --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
    --target-base-path s3://sql-server-dms-demo/hudi/public/sales \
    --target-table invoice \
    --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator \
    --hoodie-conf hoodie.datasource.write.recordkey.field=invoiceid \
    --hoodie-conf hoodie.datasource.write.partitionpath.field=destinationstate \
    --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://sql-server-dms-demo/raw/public/sales \
    --hoodie-conf  hoodie.datasource.write.precombine.field=replicadmstimestamp
soumilshah1995 commented 1 year ago

image

pratyakshsharma commented 1 year ago

Do you mind sharing what was the issue? @soumilshah1995

soumilshah1995 commented 1 year ago

Do you mind sharing what was the issue? @soumilshah1995

umm sure i think i was missing some config

soumilshah1995 commented 1 year ago

These are config that worked for me

  spark-submit \
    --class                 org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
    --conf                  spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --conf                  spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension  \
    --conf                  spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
    --conf                  spark.sql.hive.convertMetastoreParquet=false \
    --conf                  spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory \
    --master                yarn \
    --deploy-mode           client \
    --deploy-mode           cluster \
    --executor-memory       1g \
     /usr/lib/hudi/hudi-utilities-bundle.jar \
    --table-type            COPY_ON_WRITE \
    --op                    UPSERT \
    --enable-sync \
    --source-ordering-field replicadmstimestamp  \
    --source-class          org.apache.hudi.utilities.sources.ParquetDFSSource \
    --target-base-path      s3://delta-streamer-demo-hudi/raw/public/sales \
    --target-table          invoice \
    --payload-class         org.apache.hudi.common.model.AWSDmsAvroPayload \
    --hoodie-conf           hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator \
    --hoodie-conf           hoodie.datasource.write.recordkey.field=invoiceid \
    --hoodie-conf           hoodie.datasource.write.partitionpath.field=destinationstate \
    --hoodie-conf           hoodie.deltastreamer.source.dfs.root=s3://delta-streamer-demo-hudi/raw/public/sales \
    --hoodie-conf           hoodie.datasource.write.precombine.field=replicadmstimestamp \
    --hoodie-conf           hoodie.database.name=hudidb_raw  \
    --hoodie-conf           hoodie.datasource.hive_sync.enable=true \
    --hoodie-conf           hoodie.datasource.hive_sync.database=hudidb_raw \
    --hoodie-conf           hoodie.datasource.hive_sync.table=tbl_invoices \
    --hoodie-conf           hoodie.datasource.hive_sync.partition_fields=destinationstate
yihua commented 1 year ago

@soumilshah1995 Just got to this again. So it looks like the preCombine field config (hoodie.datasource.write.precombine.field) was missing in your original configs, causing the job failure. If you have the driver logs, you should see an exception because of this.

soumilshah1995 commented 1 year ago

You are right I was missing that )

On Thu, Mar 30, 2023 at 6:53 PM Y Ethan Guo @.***> wrote:

@soumilshah1995 https://github.com/soumilshah1995 Just got to this again. So it looks like the preCombine field config ( hoodie.datasource.write.precombine.field) was missing in your original configs, causing the job failure. If you have the driver logs, you should see an exception because of this.

— Reply to this email directly, view it on GitHub https://github.com/apache/hudi/issues/8309#issuecomment-1491068636, or unsubscribe https://github.com/notifications/unsubscribe-auth/AJMF5PZ6GHJ2UQU2BZRJVOTW6YFGZANCNFSM6AAAAAAWKSFH24 . You are receiving this because you were mentioned.Message ID: @.***>

-- Thanking You, Soumil Nitin Shah

B.E in Electronic MS Electrical Engineering MS Computer Engineering +1-646 204 5957

soumilshah1995 commented 1 year ago

Project : Using Apache Hudi Deltastreamer and AWS DMS Hands on Labs

image


Video Tutorials


Steps

Step 1: Create Aurora Source Database and update the seetings to enable CDC on Postgres

Step 2: Run Python file to create a table called sales in public schema in aurora and lets populate some data into table

Run python run.py


try:
    import os
    import logging

    from functools import wraps
    from abc import ABC, abstractmethod
    from enum import Enum
    from logging import StreamHandler

    import uuid
    from datetime import datetime, timezone
    from random import randint
    import datetime

    import sqlalchemy as db
    from faker import Faker
    import random
    import psycopg2
    import psycopg2.extras as extras
    from dotenv import load_dotenv

    load_dotenv(".env")
except Exception as e:
    raise Exception("Error: {} ".format(e))

class Logging:
    """
    This class is used for logging data to datadog an to the console.
    """

    def __init__(self, service_name, ddsource, logger_name="demoapp"):

        self.service_name = service_name
        self.ddsource = ddsource
        self.logger_name = logger_name

        format = "[%(asctime)s] %(name)s %(levelname)s %(message)s"
        self.logger = logging.getLogger(self.logger_name)
        formatter = logging.Formatter(format, )

        if logging.getLogger().hasHandlers():
            logging.getLogger().setLevel(logging.INFO)
        else:
            logging.basicConfig(level=logging.INFO)

global logger
logger = Logging(service_name="database-common-module", ddsource="database-common-module",
                 logger_name="database-common-module")

def error_handling_with_logging(argument=None):
    def real_decorator(function):
        @wraps(function)
        def wrapper(self, *args, **kwargs):
            function_name = function.__name__
            response = None
            try:
                if kwargs == {}:
                    response = function(self)
                else:
                    response = function(self, **kwargs)
            except Exception as e:
                response = {
                    "status": -1,
                    "error": {"message": str(e), "function_name": function.__name__},
                }
                logger.logger.info(response)
            return response

        return wrapper

    return real_decorator

class DatabaseInterface(ABC):
    @abstractmethod
    def get_data(self, query):
        """
        For given query fetch the data
        :param query: Str
        :return: Dict
        """

    def execute_non_query(self, query):
        """
        Inserts data into SQL Server
        :param query:  Str
        :return: Dict
        """

    def insert_many(self, query, data):
        """
        Insert Many items into database
        :param query: str
        :param data: tuple
        :return: Dict
        """

    def get_data_batch(self, batch_size=10, query=""):
        """
        Gets data into batches
        :param batch_size: INT
        :param query: STR
        :return: DICT
        """

    def get_table(self, table_name=""):
        """
        Gets the table from database
        :param table_name: STR
        :return: OBJECT
        """

class Settings(object):
    """settings class"""

    def __init__(
            self,
            port="",
            server="",
            username="",
            password="",
            timeout=100,
            database_name="",
            connection_string="",
            collection_name="",
            **kwargs,
    ):
        self.port = port
        self.server = server
        self.username = username
        self.password = password
        self.timeout = timeout
        self.database_name = database_name
        self.connection_string = connection_string
        self.collection_name = collection_name

class DatabaseAurora(DatabaseInterface):
    """Aurora database class"""

    def __init__(self, data_base_settings):
        self.data_base_settings = data_base_settings
        self.client = db.create_engine(
            "postgresql://{username}:{password}@{server}:{port}/{database}".format(
                username=self.data_base_settings.username,
                password=self.data_base_settings.password,
                server=self.data_base_settings.server,
                port=self.data_base_settings.port,
                database=self.data_base_settings.database_name
            )
        )
        self.metadata = db.MetaData()
        logger.logger.info("Auroradb connection established successfully.")

    @error_handling_with_logging()
    def get_data(self, query):
        self.query = query
        cursor = self.client.connect()
        response = cursor.execute(self.query)
        result = response.fetchall()
        columns = response.keys()._keys
        data = [dict(zip(columns, item)) for item in result]
        cursor.close()
        return {"statusCode": 200, "data": data}

    @error_handling_with_logging()
    def execute_non_query(self, query):
        self.query = query
        cursor = self.client.connect()
        cursor.execute(self.query)
        cursor.close()
        return {"statusCode": 200, "data": True}

    @error_handling_with_logging()
    def insert_many(self, query, data):
        self.query = query
        print(data)
        cursor = self.client.connect()
        cursor.execute(self.query, data)
        cursor.close()
        return {"statusCode": 200, "data": True}

    @error_handling_with_logging()
    def get_data_batch(self, batch_size=10, query=""):
        self.query = query
        cursor = self.client.connect()
        response = cursor.execute(self.query)
        columns = response.keys()._keys
        while True:
            result = response.fetchmany(batch_size)
            if not result:
                break
            else:
                items = [dict(zip(columns, data)) for data in result]
                yield items

    @error_handling_with_logging()
    def get_table(self, table_name=""):
        table = db.Table(table_name, self.metadata,
                         autoload=True,
                         autoload_with=self.client)

        return {"statusCode": 200, "table": table}

class DatabaseAuroraPycopg(DatabaseInterface):
    """Aurora database class"""

    def __init__(self, data_base_settings):
        self.data_base_settings = data_base_settings
        self.client = psycopg2.connect(
            host=self.data_base_settings.server,
            port=self.data_base_settings.port,
            database=self.data_base_settings.database_name,
            user=self.data_base_settings.username,
            password=self.data_base_settings.password,
        )

    @error_handling_with_logging()
    def get_data(self, query):
        self.query = query
        cursor = self.client.cursor()
        cursor.execute(self.query)
        result = cursor.fetchall()
        columns = [column[0] for column in cursor.description]
        data = [dict(zip(columns, item)) for item in result]
        cursor.close()
        _ = {"statusCode": 200, "data": data}

        return _

    @error_handling_with_logging()
    def execute(self, query, data):
        self.query = query
        cursor = self.client.cursor()
        cursor.execute(self.query, data)
        self.client.commit()
        cursor.close()
        return {"statusCode": 200, "data": True}

    @error_handling_with_logging()
    def get_data_batch(self, batch_size=10, query=""):
        self.query = query
        cursor = self.client.cursor()
        cursor.execute(self.query)
        columns = [column[0] for column in cursor.description]
        while True:
            result = cursor.fetchmany(batch_size)
            if not result:
                break
            else:
                items = [dict(zip(columns, data)) for data in result]
                yield items

    @error_handling_with_logging()
    def insert_many(self, query, data):
        self.query = query
        cursor = self.client.cursor()
        extras.execute_batch(cursor, self.query, data)
        self.client.commit()
        cursor.close()
        return {"statusCode": 200, "data": True}

class Connector(Enum):

    ON_AURORA_PYCOPG = DatabaseAurora(
        data_base_settings=Settings(
            port="5432",
            server="XXXXXXXXX",
            username="postgres",
            password="postgres",
            database_name="postgres",
        )
    )

def main():
    helper = Connector.ON_AURORA_PYCOPG.value
    import time

    states = ("AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL", "IN",
              "IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ",
              "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", "SD", "TN", "TX", "UT", "VT", "VA",
              "WA", "WV", "WI", "WY")
    shipping_types = ("Free", "3-Day", "2-Day")

    product_categories = ("Garden", "Kitchen", "Office", "Household")
    referrals = ("Other", "Friend/Colleague", "Repeat Customer", "Online Ad")

    try:
        query = """
CREATE TABLE public.sales (
  invoiceid INTEGER,
  itemid INTEGER,
  category TEXT,
  price NUMERIC(10,2),
  quantity INTEGER,
  orderdate DATE,
  destinationstate TEXT,
  shippingtype TEXT,
  referral TEXT
);
        """
        helper.execute_non_query(query=query,)
        time.sleep(2)
    except Exception as e:
        print("Error",e)

    try:
        query = """
            ALTER TABLE execute_non_query.sales REPLICA IDENTITY  FULL
        """
        helper.execute(query=query)
        time.sleep(2)
    except Exception as e:
        pass

    for i in range(0, 100):

        item_id = random.randint(1, 100)
        state = states[random.randint(0, len(states) - 1)]
        shipping_type = shipping_types[random.randint(0, len(shipping_types) - 1)]
        product_category = product_categories[random.randint(0, len(product_categories) - 1)]
        quantity = random.randint(1, 4)
        referral = referrals[random.randint(0, len(referrals) - 1)]
        price = random.randint(1, 100)
        order_date = datetime.date(2016, random.randint(1, 12), random.randint(1, 28)).isoformat()
        invoiceid = random.randint(1, 20000)

        data_order = (invoiceid, item_id, product_category, price, quantity, order_date, state, shipping_type, referral)

        query = """INSERT INTO public.sales
                                            (
                                            invoiceid, itemid, category, price, quantity, orderdate, destinationstate,shippingtype, referral
                                            )
                                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)"""

        helper.insert_many(query=query, data=data_order)

main()

Step 3: Create a DMS Replication Instance as shown in video 3 and create S3 bucket and iAM roles refer video part 3

Create Source and target in DMS

{ "CsvRowDelimiter": "\n", "CsvDelimiter": ",", "BucketFolder": "raw", "BucketName": "XXXXXXXXXXXXX", "CompressionType": "NONE", "DataFormat": "parquet", "EnableStatistics": true, "IncludeOpForFullLoad": true, "TimestampColumnName": "replicadmstimestamp", "DatePartitionEnabled": false }

#### Note add this as well in Extra connection attribute
![image](https://user-images.githubusercontent.com/39345855/228972148-10726c19-678b-4d77-a607-77fd7eebb105.png)

parquetVersion=PARQUET_2_0;


### Step 4: create Task  add following settings 

{ "rules": [ { "rule-type": "selection", "rule-id": "861743510", "rule-name": "861743510", "object-locator": { "schema-name": "public", "table-name": "sales" }, "rule-action": "include", "filters": [] } ] }


# Create EMR cluster and fire the delta streamer 
* Note you can download parquert files https://drive.google.com/drive/folders/1BwNEK649hErbsWcYLZhqCWnaXFX3mIsg?usp=share_link
* these are sample data files generated from DMS you can directly copy into your S3 for learning purposes 

spark-submit \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \ --conf spark.sql.hive.convertMetastoreParquet=false \ --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory \ --master yarn \ --deploy-mode client \ --executor-memory 1g \ /usr/lib/hudi/hudi-utilities-bundle.jar \ --table-type COPY_ON_WRITE \ --op UPSERT \ --enable-sync \ --source-ordering-field replicadmstimestamp \ --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \ --target-base-path s3://delta-streamer-demo-hudi/hudi/public/invoice \ --target-table invoice \ --payload-class org.apache.hudi.common.model.AWSDmsAvroPayload \ --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator \ --hoodie-conf hoodie.datasource.write.recordkey.field=invoiceid \ --hoodie-conf hoodie.datasource.write.partitionpath.field=destinationstate \ --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://delta-streamer-demo-hudi/raw/public/sales \ --hoodie-conf hoodie.datasource.write.precombine.field=replicadmstimestamp \ --hoodie-conf hoodie.database.name=hudidb_raw \ --hoodie-conf hoodie.datasource.hive_sync.enable=true \ --hoodie-conf hoodie.datasource.hive_sync.database=hudidb_raw \ --hoodie-conf hoodie.datasource.hive_sync.table=tbl_invoices \ --hoodie-conf hoodie.datasource.hive_sync.partition_fields=destinationstate