awslabs / aws-glue-schema-registry

AWS Glue Schema Registry Client library provides serializers / de-serializers for applications to integrate with AWS Glue Schema Registry Service. The library currently supports Avro, JSON and Protobuf data formats. See https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html to get started.
Apache License 2.0
123 stars 95 forks source link

AWSKafkaAvroConverter throwing because "Didn't find secondary deserializer" when not trying to use a secondary deserializer #136

Open elcorbs opened 2 years ago

elcorbs commented 2 years ago

Context

Using MSK and MSK Connect , with the confluent S3 sink connector and the avro-kafkaconnect-converter from this repository. We have a producer sending messages to a topic which is a lambda function, written in Dotnet using the confluent schema registry serdes class to serialize the data into an Avro Generic Record.

Issue

The converter is throwing an error when it is trying to deserialize messages. It looks like it is trying to find the secondary deserializer but I don't want it to use a secondary deserializer and haven't provided config for one.

The stack trace

[Worker-] org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
--
[Worker-]   at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:119)
[Worker-]   at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
[Worker-]   at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
[Worker-]   at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
[Worker-]   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
[Worker-]   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
[Worker-]   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
[Worker-]   at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
[Worker-]   at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
[Worker-]   at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
[Worker-]   at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
[Worker-]   at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
[Worker-]   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
[Worker-]   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
[Worker-]   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[Worker-]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[Worker-]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[Worker-]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[Worker-]   at java.base/java.lang.Thread.run(Thread.java:829)
[Worker-] Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Didn't find secondary deserializer.
[Worker-]   at com.amazonaws.services.schemaregistry.deserializers.SecondaryDeserializer.deserialize(SecondaryDeserializer.java:65)
[Worker-]   at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserializeByHeaderVersionByte(AWSKafkaAvroDeserializer.java:150)
[Worker-]   at com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer.deserialize(AWSKafkaAvroDeserializer.java:114)
[Worker-]   at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.toConnectData(AWSKafkaAvroConverter.java:117)
[Worker-]   ... 18 more
[Worker-] [2022-02-03 09:26:58,641] ERROR [emma-tenure-changes-v1\|task-1] WorkerSinkTask{id=emma-tenure-changes-v1-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)

This is the config supplied to the connector

connector.class=io.confluent.connect.s3.S3SinkConnector
s3.region=eu-west-2
flush.size=1
schema.compatibility=BACKWARD
tasks.max=2
topics=tenure-api
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=<bucket-name>
s3.sse.kms.key.id=<key-id>
format.class=io.confluent.connect.s3.format.parquet.ParquetFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

value.converter.schemaAutoRegistrationEnabled=true
value.converter.registry.name=schema-registry
value.convertor.schemaName=tenure-api
value.converter.avroRecordType=GENERIC_RECORD
value.converter.region=eu-west-2
value.converter.schemas.enable=true
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter

key.converter=org.apache.kafka.connect.storage.StringConverter

Uploaded the plugin by adding the Glue Schema Registry jar file and all the jar files in the target/dependencies folder for avro-kafkaconnect-converter into lib, zipped it up and uploaded it S3.

aws-glue-schema-registry v1.8.1 Kafka v2.8.1 MSK v2.7.1

msal-priv commented 2 years ago

@elcorbs, I have encountered a similar exception. I think the main issue here is not the secondary deserializer exception but the main one "Converting byte[] to Kafka Connect data failed due to serialization error". The serialization that the confluent serdes class does is not compatible with the aws schema registry message format. In the link you put from confluent it is said: Serialization format: byte 0: Magic byte use to identify the protocol format. bytes 1-4: Unique global id of the Avro schema that was used for encoding (as registered in Confluent Schema Registry), big endian. following bytes: The serialized data.

It is different that the format used here below: https://github.com/awslabs/aws-glue-schema-registry/blob/d1d3cddaa3bfcb8a8f74330c523365e8ba27cc19/common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java#L54-L57

I think you should look for a way to implement the same as the encoder does here: https://github.com/awslabs/aws-glue-schema-registry/blob/017fdc5840ab64f2d297d6a4babac007a7f52b4e/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/SerializationDataEncoder.java#L45-L49

hhkkxxx133 commented 2 years ago

Yes, I agree with @msalistra. Due to the incompatible header format between Confluent and GSR, GSR library cannot deserialize directly and will look for the secondary deserializer to deserialize the message here. This can be supplied in the configuration to avoid the error.

The secondary deserializer is often used if you have already registered schemas in another schema registry and you still want to perform schema lookups successfully. This allows for migrations from other schema registries without having to start anew.

@elcorbs, apologize for our delay. Does this answer your question? Also could you let us know your use case?

elcorbs commented 2 years ago

Thanks @msalistra and @hhkkxxx133 for getting back to me.

We understand that the issue is being caused by different header formats between Confluent and Glue Schema Registry and that this is resulting in the secondary deserialiser error that we are seeing.

I guess our problem is that we don't understand why there is a difference between the header formats and additionally why Glue Schema Registry is incapable of decoding a standard AVRO formatted header.

Our use case is that we have a C# application which generates a schema compliant AVRO formatted payload and encodes it using a standard library for AVRO encoding, therefore generating a Confluent compliant payload that is pushed to Kafka. The C# application loads the schema via the AWS SDK and uses it for the encoding.

We are then trying to remove the payload from Kafka using MSK Connect and write the data out to S3 in Parquet format but we didn't want to write or maintain our own plugin so we tried using this plugin in conjunction with a plugin to write the data to S3. We want to use AWS glue schema registry to decode the message so I'm unsure how using a secondary serializer would help with this.

developerDemetri commented 2 years ago

👋🏻 we appear to be running into the same issue as @elcorbs and @jcaberio (in #205).

We use a custom Python 3.x library with fastavro to serialize / deserialize Avro messages using schemas registered in Glue, but the com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter is not compatible with plain Avro bytes.

Are there any updates on 1) Official workarounds for messages that don't serialize messages using this Java-only SerDe 2) Configs to turn off the non-standard Avro encoding headers for Kafka Connectors?

shalinibnair commented 1 year ago

Hi, Has there been any solution for this ?

mohitpali commented 1 year ago

Confluent Schema Registry is a third party registry and in order to translate the messages and read from third party, we must use SecondaryDeserializer. Glue Schema Registry's default deserializer does not understand Confluent's format unless a Secondary Deserializer is provided. Glue Schema Registry encodes the bytes with it's own magic bytes and schema version id. Confluent may do it differently. Hence the Secondary Deserializer is required.

Please read -

https://github.com/awslabs/aws-glue-schema-registry#migrating-from-a-third-party-schema-registry

We have a test class dedicated to this here - https://github.com/awslabs/aws-glue-schema-registry/blob/a7017fb7a418f30d6302d3822e3464388228a72c/serializer-deserializer/src/test/java/com/amazonaws/services/schemaregistry/deserializers/external/ThirdPartyDeserializer.java

While using Kafka Connect, you would use

 value.converter.<secondaryDeserializerProperty>=<SecondaryDeserializer>
offline commented 1 year ago

When setting value.converter.secondaryDeserializer to io.confluent.connect.avro.AvroConverter I receive the following error:

com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: The secondary deserializer is not from Kafka

What value should I use for secondaryDeserializer if my messages are encoded in Confluent's format?

yiannis-gkoufas commented 11 months ago

Yep, getting the same error as well when using io.confluent.connect.avro.AvroConverter I can't understand why it's so difficult to integrate it. It's just Avro, should be compatible

talnidam commented 10 months ago

Any solution?

vl-kp commented 8 months ago

how to solve this issue?

airbots commented 5 months ago

First of all, based on the example in the repo, if we use json format and want to dump parquet. The producer needs to integrate schema in their producer record. I encounter similar error above when we deal with json to parquet output. Any followup answer for this issue?