apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.45k stars 2.42k forks source link

[SUPPORT] - Cannot Ingest Protobuf records using Hudi Streamer #12301

Open remeajayi2022 opened 1 day ago

remeajayi2022 commented 1 day ago

I’m trying to ingest from a ProtoKafka source using Hudi Streamer but encountering an issue.

Exception in thread "main" org.apache.hudi.utilities.ingestion.HoodieIngestionException: Ingestion service was shut down with exception.
        at ...
Error reading source schema from registry. Please check hoodie.streamer.schemaprovider.registry.url is configured correctly. Truncated URL: https://....ons/latest
     at org.apache.hudi.utilities.schema.SchemaRegistryProvider.parseSchemaFromRegistry(SchemaRegistryProvider.java:111)
        at org.apache.hudi.utilities.schema.SchemaRegistryProvider.getSourceSchema(SchemaRegistryProvider.java:204)
        ... 10 more
...
Caused by: org.apache.hudi.internal.schema.HoodieSchemaException: Failed to parse schema from registry: syntax = "proto3";
package datagen;
...
Caused by: java.lang.NoSuchMethodException: org.apache.hudi.utilities.schema.converter.ProtoSchemaToAvroSchemaConverter.<init>()
        at java.lang.Class.getConstructor0(Class.java:3082)
        at java.lang.Class.newInstance(Class.java:412)
        ... 13 more 

The stack trace points to a misconfigured schema registry URL. However, the same URL works for Hudi streamer jobs ingesting from AvroKafka sources. When I ping the schema registry URL using curl, it correctly returns the schema.

Additional Context

  1. I've verified the Protobuf schema is valid, it is a sample proto schema from Confluent’s Datagen connector.
  2. I've confirmed the schema registry URL is configured correctly, it works fine with a similarAvroKafka spark job.
  3. I added hoodie.streamer.schemaprovider.proto.class.name and hoodie.streamer.source.kafka.proto.value.deserializer.class=org.apache.kafka.common.serialization.ByteArrayDeserializer. I don't think these are required but their presence/absence did not resolve this error.

Environment Details Hudi version: v0.15.0 Spark version: 3.1.3 Scala version: 2.12 Google Dataproc version: 2.0.125-debian10

Spark Submit Command and Protobuf Configuration

gcloud dataproc jobs submit spark --cluster <GCP-CLUSTER> \  
  --region us-central1 \  
  --class org.apache.hudi.utilities.streamer.HoodieStreamer \  
  --project <GCP-PROJECT> \  
  --jars <jar-base-url>/jars/hudi-gcp-bundle-0.15.0.jar,<jar-base-url>/jars/spark-avro_2.12-3.1.1.jar,<jar-base-url>/jars/hudi-utilities-bundle-raw_2.12-0.15.0.jar,<jar-base-url>/jars/kafka-protobuf-provider-5.5.0.jar \  
 --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
 --source-class org.apache.hudi.utilities.sources.ProtoKafkaSource \
  --hoodie-conf sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>' password='<password>';" \
  --hoodie-conf hoodie.streamer.schemaprovider.proto.class.name=<topic-name> \
  --hoodie-conf basic.auth.credentials.source=USER_INFO \
  --hoodie-conf schema.registry.basic.auth.user.info=<schema-registry-key>:<schema-registry-secret> \
  --hoodie-conf hoodie.streamer.schemaprovider.registry.url=https://<schema-registry-key>:<schema-registry-secret>@<schema-registry-url>/subjects/<topic-name>-value/versions/latest \
  --hoodie-conf hoodie.streamer.source.kafka.topic=<topic-name> \
  --hoodie-conf hoodie.streamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer \
  --hoodie-conf hoodie.streamer.schemaprovider.registry.schemaconverter=org.apache.hudi.utilities.schema.converter.ProtoSchemaToAvroSchemaConverter \

Steps to Reproduce

  1. Build a Hudi 0.15.0 JAR with Spark 3.1 and Scala 2.12.
  2. Use a Protobuf schema on an accessible schema registry, preferably an authenticated one.
  3. Configure Hudi Streamer job with the spark submit command above.
  4. Run the Spark job.

I’d appreciate any insights into resolving this issue. Is there an alternative or a workaround for configuring the Protobuf schema? Am I missing any configuration settings? Thank you for your help!

remeajayi2022 commented 9 hours ago

It seems that this was a known issue (#11598) in v0.15.0 documented here with PR fixes(#11373, #11660 ) by @the-other-tim-brown that have been merged to master. I have tested with a JAR compiled from the Master branch and I'm still running into issues.

24/11/21 21:30:36 ERROR HoodieAsyncService: Service shutdown with error java.util.concurrent.ExecutionException: java.lang.AbstractMethodError: Receiver class io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider does not define or inherit an implementation of the resolved method 'abstract java.util.Optional parseSchema(java.lang.String, java.util.List)' of interface io.confluent.kafka.schemaregistry.SchemaProvider. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?] at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]

Do you have any suggestions on how to move past this? Thank you. cc: @the-other-tim-brown