awslabs / aws-glue-schema-registry

AWS Glue Schema Registry Client library provides serializers / de-serializers for applications to integrate with AWS Glue Schema Registry Service. The library currently supports Avro, JSON and Protobuf data formats. See https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html to get started.
Apache License 2.0
126 stars 96 forks source link

is it possible to used secondaryDeserializer in Kafka connect ? #266

Open izijoss opened 1 year ago

izijoss commented 1 year ago

hi ! I try to migrate our kafka connect from confluent schema registry to AWS Glue SR. I followed the migration documentation (https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations-migration.html) and i try to adapt it for a s3 sink connector :

"name": "sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "3",
    "key.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
    "value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
    "transforms": "addTS,InsertTopic,InsertOffset,InsertPartition",
    "errors.retry.timeout": "6",
    "topics.regex": "topic.*.avro",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "flush.size": "1000",
    "rotate.schedule.interval.ms": "60000",
    "schema.compatibility": "BACKWARD",
    "s3.bucket.name": "my-bucket",
    "s3.region": "eu-west-1",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "store.url": "http://motoserver:3000",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "partition.duration.ms": "600000",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale": "en-US",
    "timezone": "UTC",
    "key.converter.endpoint": "http://motoserver:3000",
    "key.converter.region": "us-west-2",
    "value.converter.endpoint": "http://motoserver:3000",
    "value.converter.avroRecordType": "GENERIC_RECORD",
    "key.converter.avroRecordType": "GENERIC_RECORD",

    "value.converter.registry.name": "test",
    "value.converter.region": "us-west-2",
    "key.converter.registry.name": "test",

    "value.converter.secondaryDeserializer": "io.confluent.kafka.serializers.KafkaAvroDeserializer",
    "value.converter.schema.registry.url": "http://schema-registry2:8181",
    "key.converter.secondaryDeserializer": "io.confluent.kafka.serializers.KafkaAvroDeserializer",
    "key.converter.schema.registry.url": "http://schema-registry2:8181",
    "transforms.addTS.timestamp.field": "op_ts",
    "transforms.addTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertTopic.topic.field": "source_topic",
    "transforms.InsertOffset.offset.field": "source_offset",
    "transforms.InsertOffset.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertPartition.partition.field": "source_partition",
    "transforms.InsertPartition.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "key.converter.specific.avro.reader": "true",
    "value.converter.specific.avro.reader": "true",
  }
}

I have this error :

io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:53)
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: org.apache.kafka.common.errors.SerializationException:
Could not find class keySchema specified in writer's schema whilst finding reader's schema for a SpecificRecord.

I think, it is a logical erreor with specific.avro.reader= true for kafkaAvroDeserializer.

if I remove "[value|key].converter.specific.avro.reader": "true",, I have this error :

Caused by: com.amazonaws.services.schemaregistry.exception.GlueSchemaRegistryIncompatibleDataException: Invalid schema registry header version byte in data

AWSKafkaAvroConverter ignore the secondaryDeserializer

this setup works with KafkaAvroDeserializer or AWS Kafka AvroConverter without a secondary (with the correct serialization in the topic) What is wrong in my conf ? thanks

YangSan0622 commented 1 year ago

I think there is nothing wrong with your setup. Currently there our converter for sink connector does not support secondary de-serializer and PR is welcomed for this feature.

I believe the issue here is, the message is not serialized with Schema Registry serializer and we cannot de-serialize it in our converter.