apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.2k stars 2.38k forks source link

[SUPPORT] Hudi DeltaStreamer with Flattening Transformer #10499

Closed soumilshah1995 closed 5 months ago

soumilshah1995 commented 5 months ago

Hello, I'm currently experimenting with the Hudi delta streamer and working on creating part 12 of the delta streamer playlist. For the next video, my goal is to cover the Hudi SQL-based transformer and the flattening transformer.

I've encountered a challenge with the flattening transformer. The data schema in the schema registry is structured as follows:

{
  "type": "record",
  "name": "Order",
  "fields": [
    {
      "name": "order_id",
      "type": "string"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "order_value",
      "type": "string"
    },
    {
      "name": "priority",
      "type": "string"
    },
    {
      "name": "order_date",
      "type": "string"
    },
    {
      "name": "ts",
      "type": "string"
    },
    {
      "name": "customer",
      "type": {
        "type": "record",
        "name": "Customer",
        "fields": [
          {
            "name": "customer_id",
            "type": "string"
          }
        ]
      }
    }
  ]
}

Without the flattening transformer, everything works fine using the following configuration:

--transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
hoodie.streamer.transformer.sql=SELECT * FROM <SRC> a

Now, I'm attempting to implement the flattening transformer by following the guide here and official documentation.

Here is the configuration I've tried:

--transformer-class org.apache.hudi.utilities.transform.FlatteningTransformer,org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \

And the SQL query variations:

hoodie.streamer.transformer.sql=SELECT order_id, name, order_value, priority, order_date, ts, customer.customer_id FROM <SRC> a

OR

hoodie.streamer.transformer.sql=SELECT a.order_id, a.name, a.order_value, a.priority, a.order_date, a.ts, a.customer_id FROM <SRC> a

OR
hoodie.streamer.transformer.sql=SELECT a.order_id, a.name, a.order_value, a.priority, a.order_date, a.ts, a.customer_id FROM <SRC> a

OR

hoodie.streamer.transformer.sql=SELECT order_id, name, order_value, priority, order_date, ts, customer_id FROM <SRC> a

However, it seems like the flattening transformer is not working, or I might be using it incorrectly. Any assistance or guidance on this matter would be greatly appreciated. Thanks!

soumilshah1995 commented 5 months ago

Also tried


spark-submit \
    --class org.apache.hudi.utilities.streamer.HoodieStreamer \
    --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0 \
    --properties-file spark-config.properties \
    --master 'local[*]' \
    --executor-memory 1g \
    /Users/soumilshah/IdeaProjects/SparkProject/apache-hudi-delta-streamer-labs/E11/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar \
    --table-type COPY_ON_WRITE \
    --op UPSERT \
    --source-ordering-field ts \
    --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
    --target-base-path file:///Users/soumilshah/IdeaProjects/SparkProject/apache-hudi-delta-streamer-labs/E11/hudidb/orders \
    --target-table orders \
    --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
    --transformer-class 'org.apache.hudi.utilities.transform.FlatteningTransformer,org.apache.hudi.utilities.transform.SqlQueryBasedTransformer'  \
    --props hudi_tbl.props

hudi_tbl.prop

hoodie.datasource.write.recordkey.field=order_id
hoodie.datasource.write.partitionpath.field=order_date
hoodie.datasource.write.precombine.field=ts

bootstrap.servers=localhost:7092
auto.offset.reset=earliest

hoodie.streamer.source.kafka.topic=orders_complex
hoodie.streamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer

schema.registry.url=http://localhost:8081/
hoodie.streamer.schemaprovider.registry.schemaconverter=
hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/orders_complex-value/versions/latest

hoodie.streamer.transformer.sql=SELECT order_id, name, order_value, priority, order_date, ts, customer.customer_id AS customer_customer_id FROM <SRC> a
#hoodie.streamer.transformer.sql=SELECT * FROM <SRC> a

error

4/01/15 16:58:07 INFO Metadata: [Consumer clientId=consumer-spark-executor-null-2, groupId=spark-executor-null] Cluster ID: OfY2BqB-RFiPOfaF84AoGw
24/01/15 16:58:07 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1235 bytes result sent to driver
24/01/15 16:58:07 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 137 ms on soumils-mbp (executor driver) (1/1)
24/01/15 16:58:07 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
24/01/15 16:58:07 INFO DAGScheduler: ResultStage 0 (isEmpty at AvroConversionUtils.scala:99) finished in 0.446 s
24/01/15 16:58:07 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
24/01/15 16:58:07 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
24/01/15 16:58:07 INFO DAGScheduler: Job 0 finished: isEmpty at AvroConversionUtils.scala:99, took 0.464518 s
24/01/15 16:58:08 INFO FlatteningTransformer: Registering tmp table : HUDI_SRC_TMP_TABLE_40151fe5_8414_4137_a1ab_02d5d1946d83
24/01/15 16:58:08 INFO SqlQueryBasedTransformer: Registering tmp table : HOODIE_SRC_TMP_TABLE_c72fc875_0045_49ba_bfc0_31c2e4ccfcdb
24/01/15 16:58:08 INFO StreamSync: Shutting down embedded timeline server
24/01/15 16:58:08 INFO SparkContext: SparkContext is stopping with exitCode 0.
24/01/15 16:58:08 INFO SparkUI: Stopped Spark web UI at http://soumils-mbp:8090
24/01/15 16:58:08 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/01/15 16:58:08 INFO MemoryStore: MemoryStore cleared
24/01/15 16:58:08 INFO BlockManager: BlockManager stopped
24/01/15 16:58:08 INFO BlockManagerMaster: BlockManagerMaster stopped
24/01/15 16:58:08 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/01/15 16:58:08 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.hudi.utilities.exception.HoodieTransformExecutionException: Failed to apply sql query based transformer
    at org.apache.hudi.utilities.transform.SqlQueryBasedTransformer.apply(SqlQueryBasedTransformer.java:68)
    at org.apache.hudi.utilities.transform.ChainedTransformer.apply(ChainedTransformer.java:105)
    at org.apache.hudi.utilities.streamer.StreamSync.lambda$fetchFromSource$0(StreamSync.java:530)
    at org.apache.hudi.common.util.Option.map(Option.java:108)
    at org.apache.hudi.utilities.streamer.StreamSync.fetchFromSource(StreamSync.java:530)
    at org.apache.hudi.utilities.streamer.StreamSync.readFromSource(StreamSync.java:495)
    at org.apache.hudi.utilities.streamer.StreamSync.syncOnce(StreamSync.java:405)
    at org.apache.hudi.utilities.streamer.HoodieStreamer$StreamSyncService.ingestOnce(HoodieStreamer.java:840)
    at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:72)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
    at org.apache.hudi.utilities.streamer.HoodieStreamer.sync(HoodieStreamer.java:205)
    at org.apache.hudi.utilities.streamer.HoodieStreamer.main(HoodieStreamer.java:584)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `customer`.`customer_id` cannot be resolved. Did you mean one of the following? [`a`.`customer_customer_id`, `a`.`order_id`, `a`.`order_date`, `a`.`name`, `a`.`order_value`].; line 1 pos 62;
'Project [order_id#21, name#22, order_value#23, priority#24, order_date#25, ts#26, 'customer.customer_id AS customer_customer_id#36]
+- SubqueryAlias a
   +- SubqueryAlias hoodie_src_tmp_table_c72fc875_0045_49ba_bfc0_31c2e4ccfcdb
      +- View (`HOODIE_SRC_TMP_TABLE_c72fc875_0045_49ba_bfc0_31c2e4ccfcdb`, [order_id#21,name#22,order_value#23,priority#24,order_date#25,ts#26,customer_customer_id#27])
         +- Project [order_id#7 AS order_id#21, name#8 AS name#22, order_value#9 AS order_value#23, priority#10 AS priority#24, order_date#11 AS order_date#25, ts#12 AS ts#26, customer#13.customer_id AS customer_customer_id#27]
            +- SubqueryAlias hudi_src_tmp_table_40151fe5_8414_4137_a1ab_02d5d1946d83
               +- View (`HUDI_SRC_TMP_TABLE_40151fe5_8414_4137_a1ab_02d5d1946d83`, [order_id#7,name#8,order_value#9,priority#10,order_date#11,ts#12,customer#13])
                  +- LogicalRDD [order_id#7, name#8, order_value#9, priority#10, order_date#11, ts#12, customer#13], false

    at org.apache.spark.sql.errors.QueryCompilationErrors$.unresolvedAttributeError(QueryCompilationErrors.scala:221)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$$failUnresolvedAttribute(CheckAnalysis.scala:143)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5(CheckAnalysis.scala:258)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5$adapted(CheckAnalysis.scala:256)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:295)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:294)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:294)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:294)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$4(CheckAnalysis.scala:256)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$4$adapted(CheckAnalysis.scala:256)
    at scala.collection.immutable.Stream.foreach(Stream.scala:533)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$1(CheckAnalysis.scala:256)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$1$adapted(CheckAnalysis.scala:163)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:295)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:163)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:160)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:188)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:156)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:146)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:188)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:211)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:671)
    at org.apache.hudi.utilities.transform.SqlQueryBasedTransformer.apply(SqlQueryBasedTransformer.java:64)
    ... 23 more
24/01/15 16:58:08 INFO ShutdownHookManager: Shutdown hook called
24/01/15 16:58:08 INFO ShutdownHookManager: Deleting directory /private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-992e8443-ead9-4f3d-8f5e-da9329fb95c0
24/01/15 16:58:08 INFO ShutdownHookManager: Deleting directory /private/var/folders/qq/s_1bjv516pn_mck29cwdwxnm0000gp/T/spark-225adb28-c729-49d9-84c6-c58d5b9f27b0
soumilshah@Soumils-MBP E11 % 
soumilshah1995 commented 5 months ago

Following works

spark-submit \
    --class org.apache.hudi.utilities.streamer.HoodieStreamer \
    --packages org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0 \
    --properties-file spark-config.properties \
    --master 'local[*]' \
    --executor-memory 1g \
    /Users/soumilshah/IdeaProjects/SparkProject/apache-hudi-delta-streamer-labs/E11/jar/hudi-utilities-slim-bundle_2.12-0.14.0.jar \
    --table-type COPY_ON_WRITE \
    --op UPSERT \
    --source-ordering-field ts \
    --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
    --target-base-path file:///Users/soumilshah/IdeaProjects/SparkProject/apache-hudi-delta-streamer-labs/E11/hudidb/orders \
    --target-table orders \
    --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
    --transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
    --props hudi_tbl.props

prop

hoodie.datasource.write.recordkey.field=order_id
hoodie.datasource.write.partitionpath.field=order_date
hoodie.datasource.write.precombine.field=ts

bootstrap.servers=localhost:7092
auto.offset.reset=earliest

hoodie.streamer.source.kafka.topic=orders_complex
hoodie.streamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer

schema.registry.url=http://localhost:8081/
hoodie.streamer.schemaprovider.registry.schemaconverter=
hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/orders_complex-value/versions/latest

hoodie.streamer.transformer.sql=SELECT * FROM <SRC> a

Hudi table created


root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- order_id: string (nullable = false)
 |-- name: string (nullable = false)
 |-- order_value: string (nullable = false)
 |-- priority: string (nullable = false)
 |-- ts: string (nullable = false)
 |-- customer: struct (nullable = false)
 |    |-- customer_id: string (nullable = false)
 |-- order_date: string (nullable = false)

would love to learn how to use flatten transformer :D

soumilshah1995 commented 5 months ago

let me get back to this issue after some more tries want to try out few things

ad1happy2go commented 5 months ago

@soumilshah1995 Let us know your findings and in case you need any help. Thanks.

soumilshah1995 commented 5 months ago

I would need some time to play with flattening transformer need to setup a test project to see if works let me close this and reopen it later again as I would be doing these test most likely next week