apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.32k stars 2.41k forks source link

[SUPPORT] hudi deltastreamer jsonkafka source schema registry fail #9132

Open nttq1sub opened 1 year ago

nttq1sub commented 1 year ago

when trying to read data from Kafka which is stored as Json from schema registry. using below configuration. pls tell me where do I'm wrong with the configuration

this is my property file hoodie.datasource.write.recordkey.field=plate hoodie.deltastreamer.schemaprovider.registry.url=http://confluent-schema-registry.kafka.svc:8081/subjects/OCR.VEHICLE.REGISTRATION-value/versions/latest hoodie.deltastreamer.source.kafka.topic=OCR.VEHICLE.REGISTRATION hoodie.datasource.hive_sync.table=ocr_vehicle_registration_cow_latest hoodie.datasource.hive_sync.database=raw hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor

this is my schema registry

    "type": "object",
    "additionalProperties": false,
    "title": "ocr vehicle registration",
    "description": "ocr vehicle registration",
    "properties": {
        "filename": {
            "type": "string"
        },
        "type": {
            "type": "number"
        },
        "name": {
            "type": "string"
        },
        "address": {
            "type": "string"
        },
        "brand": {
            "type": "string"
        },
        "model": {
            "type": "string"
        },
        "engine": {
            "type": "string"
        },
        "chassis": {
            "type": "string"
        },
        "color": {
            "type": "string"
        },
        "sit": {
            "type": "number"
        },
        "type_": {
            "type": "string"
        },
        "capacity": {
            "type": "string"
        },
        "day": {
            "type": "number"
        },
        "month": {
            "type": "number"
        },
        "year": {
            "type": "number"
        },
        "plate": {
            "type": "string"
        },
        "expired_date": {
            "type": "string"
        }
    }
}

Environment Description

Additional context

Add any other context about the problem here.

Stacktrace


23/07/06 06:49:21 INFO HoodieTableMetaClient: Initializing hdfs:///dwh/raw/ocr/ocr_vehicle_registration_cow_latest as hoodie table hdfs:///dwh/raw/ocr/ocr_vehicle_registration_cow_latest
23/07/06 06:49:22 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from hdfs:///dwh/raw/ocr/ocr_vehicle_registration_cow_latest
23/07/06 06:49:22 INFO HoodieTableConfig: Loading table properties from hdfs:/dwh/raw/ocr/ocr_vehicle_registration_cow_latest/.hoodie/hoodie.properties
23/07/06 06:49:23 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from hdfs:///dwh/raw/ocr/ocr_vehicle_registration_cow_latest
23/07/06 06:49:23 INFO HoodieTableMetaClient: Finished initializing Table of type COPY_ON_WRITE from hdfs:///dwh/raw/ocr/ocr_vehicle_registration_cow_latest
23/07/06 06:49:23 INFO SparkUI: Stopped Spark web UI at http://ocr-vehicle-registration-src-9b7f828929f5e5ad-driver-svc.spark.svc:8090
23/07/06 06:49:23 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
23/07/06 06:49:23 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
23/07/06 06:49:23 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
23/07/06 06:49:23 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/07/06 06:49:23 INFO MemoryStore: MemoryStore cleared
23/07/06 06:49:23 INFO BlockManager: BlockManager stopped
23/07/06 06:49:23 INFO BlockManagerMaster: BlockManagerMaster stopped
23/07/06 06:49:23 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/07/06 06:49:23 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.avro.SchemaParseException: Type not supported: object
    at org.apache.avro.Schema.parse(Schema.java:1734)
    at org.apache.avro.Schema$Parser.parse(Schema.java:1430)
    at org.apache.avro.Schema$Parser.parse(Schema.java:1418)
    at org.apache.hudi.utilities.schema.SchemaRegistryProvider.getSchema(SchemaRegistryProvider.java:100)
    at org.apache.hudi.utilities.schema.SchemaRegistryProvider.getSourceSchema(SchemaRegistryProvider.java:107)
    at org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor.getSourceSchema(SchemaProviderWithPostProcessor.java:42)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.registerAvroSchemas(DeltaSync.java:911)
    at org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:243)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:680)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:148)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:121)
    at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:573)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
23/07/06 06:49:23 INFO ShutdownHookManager: Shutdown hook called
23/07/06 06:49:23 INFO ShutdownHookManager: Deleting directory /var/data/spark-a32e287f-3ad6-4bb9-8b35-af6bb8445b85/spark-fb2e8bd9-b281-4391-87d3-75f552ae9e7f
23/07/06 06:49:23 INFO ShutdownHookManager: Deleting directory /tmp/spark-65b0ea58-5e2b-47a3-97fc-d573497d4115```
danny0405 commented 1 year ago

Can the schema be parsed correctly in your local env by using the Schema.parse ?

nttq1sub commented 1 year ago

@danny0405 I tried to learn from this issues (https://github.com/apache/hudi/issues/3835) but it doesn't work. Can you show me which is the right config for this JsonKafkaSource. You will help me and others so much with that. Thanks a lot. I tried to follow the guide on your official page but none any of them were about how to use hudi delta streamer with JsonKafkaSource with the right configuration. I tried to use with this configuration but it's still not work. (follow this issues https://github.com/apache/hudi/pull/7727) hoodie.deltastreamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter then I test another config then it worked, that: I change source class to AvroKafkaSource then I defined its schema in avro, I produced message to the topic using AvroSerializer in reference to that avro schema. But it's still have one problem that if I use JsonKafkaSource as source class then define its schema with avro schema type 'records'. It's then can parse with schema but have some problem with deserializing with the decoding because the data contain some utf-8 character. even when I added this additional config. it's still get some error hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer or KafkaSchemaAvroDeserializer this is the logs

23/07/06 12:53:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
23/07/06 12:53:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (10.233.125.104, executor 1, partition 0, PROCESS_LOCAL, 4391 bytes) taskResourceAssignments Map()
23/07/06 12:53:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.233.125.104:45235 (size: 4.3 KiB, free: 110.0 MiB)
23/07/06 12:53:53 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (10.233.125.104 executor 1): org.apache.hudi.exception.HoodieIOException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
 at [Source: (String)"\00\00\00{"type": 1, "name": "\u00d4 T\u00d4 TR\u01af", "address": "L\u00f4", "brand": "PEUGEOT", "filename": "_E0102376505_"[truncated 11 chars]; line: 1, column: 2]
    at org.apache.hudi.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:96)
    at org.apache.hudi.utilities.sources.helpers.AvroConvertor.fromJson(AvroConvertor.java:87)
    at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$SliceIterator.next(Iterator.scala:273)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
    at scala.collection.AbstractIterator.to(Iterator.scala:1431)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
    at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
 at [Source: (String)"\00\00\00{"type": 1, line: 1, column: 2]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:688)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2408)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:677)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4684)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3548)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3516)
    at org.apache.hudi.avro.MercifulJsonConverter.convert(MercifulJsonConverter.java:93)
    ... 30 more

the log I provided above is changed because of some sensitive information. but it is the context here.

Can you more specific about using Schema.parse, here I am using hoodie deltastreamer packages that you provided in this case. In need, if you need to reproduce this issues, I'm willing to support with that. Thanks a lot.

danny0405 commented 1 year ago

Takes a glimpse of the code: https://github.com/apache/hudi/blob/0258a89112a6071a8074757236e19a7b27539dbd/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java#L58, the JsonKafkaSource still takes a schema provider class, but it does not seem to use that schema provider at all. The Kafka messages are fetched and then desrrialized into json strings specifically.

From the error stacktrace you gave, it seems something is wrong with the schema registry provider. Did you try to parse the avro schema string manually with the Schema.parse?

nttq1sub commented 1 year ago

@danny0405 this is schema registry I used when I'm parsing the schema it does not have any issues with it, in this case I used SchemaRegistryProvider class. Does it have any problem with this class I use in this case or any of mis-configuration.

    "type": "object",
    "additionalProperties": false,
    "title": "ocr vehicle registration",
    "description": "ocr vehicle registration",
    "properties": {
        "filename": {
            "type": "string"
        },
        "type": {
            "type": "number"
        },
        "name": {
            "type": "string"
        },
        "address": {
            "type": "string"
        },
        "brand": {
            "type": "string"
        },
        "model": {
            "type": "string"
        },
        "engine": {
            "type": "string"
        },
        "chassis": {
            "type": "string"
        },
        "color": {
            "type": "string"
        },
        "sit": {
            "type": "number"
        },
        "type_": {
            "type": "string"
        },
        "capacity": {
            "type": "string"
        },
        "day": {
            "type": "number"
        },
        "month": {
            "type": "number"
        },
        "year": {
            "type": "number"
        },
        "plate": {
            "type": "string"
        },
        "expired_date": {
            "type": "string"
        }
    }
}
danny0405 commented 1 year ago

Exception in thread "main" org.apache.avro.SchemaParseException: Type not supported: object at org.apache.avro.Schema.parse(Schema.java:1734) at org.apache.avro.Schema$Parser.parse(Schema.java:1430)

It is kind of apparent that the schema fetched from the provider is wrong, the avro can not parse correctly, not sure whether it is related with the avro version, what version of avro did you use when generating the schema string ?

nttq1sub commented 1 year ago

@danny0405 No. I'm use json schema registry with SchemaRegistryProvider class and JsonKafkaSource, the data I produced to topic also use the json schema that I defined above, but when it run it show some error about this while I'm not even use anything that related to avro. Still not figured it out yet.

Exception in thread "main" org.apache.avro.SchemaParseException: Type not supported: object
at org.apache.avro.Schema.parse(Schema.java:1734)
at org.apache.avro.Schema$Parser.parse(Schema.java:1430)

while I think that it should show me some error related to json schema, not avro. It's kind of weird here.

danny0405 commented 1 year ago

while I think that it should show me some error related to json schema

I meant the same thing, the avro schema string seems incompatible with the avro version Hudi uses. That's why the avro Schema.Parser.parse throws exception.

ad1happy2go commented 10 months ago

@nttq1sub Were you able to resolve this issue? Feel free to close or let us know if you still have issues.