awslabs / aws-glue-schema-registry

AWS Glue Schema Registry Client library provides serializers / de-serializers for applications to integrate with AWS Glue Schema Registry Service. The library currently supports Avro, JSON and Protobuf data formats. See https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html to get started.
Apache License 2.0
121 stars 95 forks source link

Reading AVRO messages fails for AWS glue schema registry Deserializers #272

Open raipragyaa opened 1 year ago

raipragyaa commented 1 year ago

I have built a kafka pipeline using MSK, MSK Connect, Debezium Postgres Source connector and AWS Glue schema registry. On producer side, I am able to publish AVRO records with schemas in Glue schema registry, I am using aws glue schema registry library for integration between Kafka connect and glue schema registry. Below is my connector configurations for glue -

 # Glue Schema Registry Specific Converters
    "key.converter" = "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter"
    "key.converter.schemas.enable"   = false
    "value.converter"= "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter"
    "value.converter.schemas.enable" = true

    "key.converter.region"                        = "us-east-1"
    "key.converter.registry.name"                 = "<REGISTRY NAME>"
    "key.converter.compatibility"                 = "FULL"
    "key.converter.schemaAutoRegistrationEnabled" = true
    "key.converter.dataFormat"="AVRO"
    "key.converter.avroRecordType"="GENERIC_RECORD"
    "key.converter.schemaNameGenerationClass" = "<SCHEMA NAME GENERATION CLASS>"

    "value.converter.region"                        = "us-east-1"
    "value.converter.registry.name"                 = "<REGISTRY NAME>"
    "value.converter.compatibility"                 = "FULL"
    "value.converter.schemaAutoRegistrationEnabled" = true
    "value.converter.dataFormat"="AVRO"
    "value.converter.avroRecordType"="GENERIC_RECORD"
    "key.converter.schemaNameGenerationClass" = "<SCHEMA NAME GENERATION CLASS>"

On consumer side, I have an EC2 instance. I am using kafka-avro-console-consumer command provided by confluent to read the message from AVRO topic. I have also added aws glue schema registry library JAR in confluent/share/java/kafka-serde-tools folder to make it available for the command. I am using following command -

kafka-avro-console-consumer --bootstrap-server <bootstrap_server_url> \
--consumer.config client.properties \
--property schema.registry.url=https://glue.us-east-1.amazonaws.com \
--property print.key=true \
--property print.value=true \
--key-deserializer com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer \
--value-deserializer com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer
--topic platform_avro_users --from-beginning

And the consumer config file contains -

security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

key.deserializer=com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer
key.deserializer.region=us-east-1
key.deserializer.registry.name=<REGISTRY NAME>
key.deserializer.avroRecordType=GENERIC_RECORD
key.deserializer.schemaNameGenerationClass=<SCHEMANAME GENERATION CLASS NAME>

value.deserializer=com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer
value.deserializer.region=us-east-1
value.deserializer.registry.name=<REGISTRY NAME>
value.deserializer.avroRecordType=GENERIC_RECORD
value.deserializer.schemaNameGenerationClass=<SCHEMANAME GENERATION CLASS NAME>

I am expecting the command to print the messages for me but I get following error -

Processed a total of 1 messages
ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:44)
java.lang.NullPointerException: Cannot invoke "com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializationFacade.deserialize(com.amazonaws.services.schemaregistry.common.AWSDeserializerInput)" because "this.glueSchemaRegistryDeserializationFacade" is null
    at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserializeByHeaderVersionByte(AWSKafkaAvroDeserializer.java:149)
    at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserialize(AWSKafkaAvroDeserializer.java:114)
    at io.confluent.kafka.formatter.AvroMessageFormatter$AvroMessageDeserializer.deserializeKey(AvroMessageFormatter.java:125)
    at io.confluent.kafka.formatter.SchemaMessageFormatter.writeTo(SchemaMessageFormatter.java:157)
    at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:116)
    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:76)
    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

Looks like the GlueSchemaRegistryDeserializationFacade class isn't being initialised. So that the records are not able to deserialise. Is there a way to fix this issue and read AVRO records when using glue schema registry?

raipragyaa commented 1 year ago

Can someone look into it?

blacktooth commented 1 year ago

From the configs pasted above, I can see two de-serializers configured. GlueSchemaRegistryKafkaDeserializer inside consumer config file and AWSKafkaAvroDeserializer is being used on the CLI. We recommend using only one of them. As long as the Kafka console consumer tool invokes the configure method, the GlueSchemaRegistryDeserializationFacade should get initialized.

Sovenique commented 9 months ago

@raipragyaa Do you have any updates on this?

@blacktooth It would be great if the documentation could include a deserialization example for Kafka Connect.

Could you possibly provide an example (consumer.properties)? Thanks!

kothapet commented 7 months ago

I think the issue is here. Key is configured as schemas enable as false. I.e. key is not serialized to avro. But when deserializing in the consumer its specified as avro.. That wont work. If you serializing key as int/string etc, use the int/string deserializer appropriately. If you want to enable avro serialization make sure set the schemas enable to true. Looks like you are already using the schema naming generation class, that probably does some convention like -key and -value. Otherwise the kafka connect wont work.

    "key.converter" = "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter"
    "key.converter.schemas.enable"   = false