awslabs / aws-glue-schema-registry

AWS Glue Schema Registry Client library provides serializers / de-serializers for applications to integrate with AWS Glue Schema Registry Service. The library currently supports Avro, JSON and Protobuf data formats. See https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html to get started.
Apache License 2.0
125 stars 95 forks source link

AVROUtils "Unsupported data type" exception for converting non GenericContainer instances #212

Open vmironichev opened 2 years ago

vmironichev commented 2 years ago

Hi, Currently I'm exploring migration from Confluent schema registry to GSR and noticed different data serialisation behaviour when wanted to use GSR AVRO serialisation for string keys and got "Unsupported data type" exception in AVROUtils. I'm wondering if in case of primitive objects for serialisation I must use custom encoder? (in order to archive similar behaviour like in Cofnluent avro utils to encode primitive)? Please let me know what is recommendation of primitive types serialisation if I use GSR. Sorry, I'm new to GSR.

UPD: Code snippet (taken from AWS samples) enriched with messageKey to trigger exception:

package com.amazon.gsrkafka;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer;
import com.amazonaws.services.schemaregistry.deserializers.avro.AWSAvroDeserializer;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import java.util.Collections;
import java.util.Properties;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.File;

public class gsrkafka {
private static final Properties properties = new Properties();
private static final String topic = "test";
public static void main(final String[] args) throws IOException {
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.0.106:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
        properties.put(AWSSchemaRegistryConstants.AWS_REGION, "eu-central-1");
        properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "test-glue-schema-registry");
        properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "customer");
        properties.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL);
        properties.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
        Schema schema_customer = new Schema.Parser().parse(new ClassPathResource("Customer.avsc").getFile());
        GenericRecord customer = new GenericData.Record(schema_customer);

        try (KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties)) {
            String messageKey = UUID.randomUUID().toString();
            final ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topic, messageKey, customer);
//          Without message key - works fine, since it is passing into serialiser as null
//          final ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topic, customer);
            customer.put("first_name", "Ada");
            customer.put("last_name", "Lovelace");
            customer.put("full_name", "Ada Lovelace");
            producer.send(record);
        } catch (final InterruptedException | SerializationException e) {
             e.printStackTrace();
        }
    }

}

Avro schema

{
  "namespace": "Customer.avro",
  "type": "record",
  "name": "Customer",
  "fields": [
    {"name": "first_name", "type": "string"},
    {"name": "last_name", "type": "string"},
    {"name": "full_name", "type": ["string", "null"], "default": "null"}
  ]
}
Exception in thread "main" com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Unsupported Type of Record received
    at com.amazonaws.services.schemaregistry.utils.AVROUtils.getSchemaDefinition(AVROUtils.java:55)
    at com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer.prepareInput(AWSKafkaAvroSerializer.java:159)
    at com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer.serialize(AWSKafkaAvroSerializer.java:115)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:870)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:758)
    at org.msk.AwsProducer.main(AwsProducer.java:46)

Thanks, Valeriy

blacktooth commented 1 year ago

Can you please explain your use-case for using strings? As of now we only support serializing / deserializing valid GenericRecord / SpecificRecord (POJOs).

vmironichev commented 1 year ago

With confluent schema registry we were using same serialiser for key and value regardless if it was POJO or String or UUID.., e.g. AvroSerialiser was working with that types fine, in most of cases we are putting uuid as a message key which was serialised to avro format. Once I tried to use AWSKafkaAvroSerializer I've got exception mentioned above. So as I understood in case of uuids in kafka message keys I can't use AWSKafkaAvroSerializer but need to use StringSerialiser instead.

nextdude commented 1 year ago

Can you please explain your use-case for using strings? As of now we only support serializing / deserializing valid GenericRecord / SpecificRecord (POJOs).

This is a common use case where Kafka keys are strings (or some other primitive type) and values are records. Confluent handles this transparently with a NonRecordContainer that it interposes between primitives and the registry. Would be convenient if GSR had a similar facility so users didn’t have to implement it themselves.