apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.38k stars 2.42k forks source link

[SUPPORT]Unable to interpret Child JSON fields value as a separate columns rather it is loaded as one single field value. Any way to interpret that. #2101

Closed getniz closed 4 years ago

getniz commented 4 years ago

Issue details: With in a nested JSON schema with below format is there a way to consume the child fields of parent as table columns. The child fields of parent field:data are taken as a single column value while loading in Hudi table. If something I'm doing wrong please guide otherwise what is the practice to integrate with Apache Hudi for the example scenario below.

Example scenario: Current JSON Format from Kafka topic {"data":{"NUMBER": "2223","NAME": "SAMUEL","CHANGED_BY": "YYYYYYY"}} {"data":{"NUMBER": "2224","NAME": "JACKSON PAUL","CHANGED_BY": "YYYYYYY"}}

test_schema.avsc: { "name": "test", "type": "record", "namespace": "com.acme.avro", "fields": [ { "name": "data", "type": { "name": "data", "type": "record", "fields": [ { "name": "NUMBER", "type": "string" }, { "name": "NAME", "type": "string" }, { "name": "CHANGED_BY", "type": "string" } ] } } ] }

How it is loaded in hive test_data table (ignored the hoodie commit fields): RecordKey | data 2223 | "NUMBER": "2223", "NAME": "XXXXXXXXXXXXXX", "CHANGED_BY": "2019-08-01" 2224 | "NUMBER": "2224", "NAME": "XXXXXXXXXXXXXX", "CHANGED_BY": "2020-08-01"

How it is expected in Hive test_data table RecordKeyNUMBER)| NUMBER | Name | Changed_by 2223 | 2223 | XXXXXXXXXXXXXX | 2019-08-01
2223 | 2223 | XXXXXXXXXXXXXX | 2020-08-01

Here the Hudi batch command:

spark-submit --jars \ "/mnt/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.6.1-SNAPSHOT.jar" \ --deploy-mode "client" \ --class "org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer" \ /mnt/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.6.1-SNAPSHOT.jar \ --props file:///mnt/hudi/docker/demo/config/kafka-source.properties \ --table-type MERGE_ON_READ \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --source-ordering-field data.Changed_by \ --target-base-path s3://landing/test-data \ --target-table test-data \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ --enable-hive-sync --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://localhost:10000 \ --hoodie-conf hoodie.datasource.hive_sync.username=hive \ --hoodie-conf hoodie.datasource.hive_sync.database=default \ --hoodie-conf hoodie.datasource.hive_sync.table=test_data \ --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator \ --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor \ --hoodie-conf hoodie.compact.inline=true

Kafka-source.properties

include=base.properties

Key fields, for kafka example

hoodie.datasource.write.recordkey.field=data.Changed_by hoodie.datasource.write.partitionpath.field=date

Schema provider props (change to absolute path based on your installation)

hoodie.deltastreamer.schemaprovider.source.schema.file=file:///mnt/hudi/docker/demo/config/test_schema.avsc hoodie.deltastreamer.schemaprovider.target.schema.file=file:///mnt/hudi/docker/demo/config/test_schema.avsc

Kafka Source topic

hoodie.deltastreamer.source.kafka.topic=test

Kafka props

bootstrap.servers=kafka.com:9092 auto.offset.reset=earliest

Environment Description Everything is installed manually in aws emr

n3nash commented 4 years ago

@getniz Your JSON is a nested data type which will end up being inferred as a nested AVRO record type. My understanding is this is expected behavior for nested data. There are 2 ways for you to flatten your data:

1) Transform your incoming data and flatten it 2) Flatten it using a transformer for the DeltaStreamer -> https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java 3) Flatten it during your query using Lateral View.

getniz commented 4 years ago

@n3nash thanks for the response with details, 1 & 3 option I may not be able to consider as I need to build this layer as immediate target tables for further consumption in Reporting layer. If I use option 2, can I able to consume the topic and flatten the schema in deltastreamer with-out staging and then load directly to the immediate target layer using above Spark submit batch command. Also, I came to know that Hudi supports Confluent schema registry, in that case if I get the JSON schema from Source and register with Confluent registry can I achieve in flattening the file. Sorry, my questions may be silly sometimes please bear with me, I'm a learner here : ) Objective of what I'm trying to do is to consume Data from several topics in near real-time(all the topics data are formatted/structured) and push to DataLake using Hudi. If I stage and transform it, then I may end up eating time.

n3nash commented 4 years ago

@getniz Those are good questions, we're all learners here :) You can definitely create a schema in confluent schema registry but the flattening will depend on what is your schema structure. If you create a AVRO schema similar to your JSON structure, then you will have a nested column in your avro schema and you will have the same problem. If you want to create a flat schema in avro, then your JSON has to be flattened as well. I would recommend trying option 2 which is straight forward and see what performance penalty (if at all) is there.

getniz commented 4 years ago

@n3nash Thanks for your inputs, while I'm experimenting this on the other side I saw there is another easy way to achieve this using ksql from Kafka receiving side which flattens the Kafka topics and post it to another topics by creating ksql streams. I can just plugin the Hudi source to the ksql stream topic which is flattened to my reqmts. Reference: https://www.confluent.io/stream-processing-cookbook/ksql-recipes/nested-json-data/ . I think this turns out to be the solution from Kafka side to interpret very nature of the kafka topics as they are always encapsulated with additional Kafka metadata like schema, offset, etc rather typical flattened files.

getniz commented 4 years ago

Closing this issue as I could use KSQL to flatten the incoming Nested JSON and then consume in HUDI instantaneously without staging.