avro-kotlin / avro4k

Avro format support for Kotlin
Apache License 2.0
188 stars 36 forks source link

Schema registry compatibility check (why not ?) #130

Closed Chuckame closed 1 year ago

Chuckame commented 1 year ago

Hey, it's me again.

When using avro with schema registry, we serialize the data by using an avro schema, what about the check compat using schema registry ?

In the official avro serializer KafkaAvroSerializer, it checks if the used schema is compatible to the remote current schema, and fails if there is a compat issue, depending on the schema's compat mode registered in the schema registry.

While in avro4k, it just serialize depending on the local schema whatever the schema registry.

To keep the same workflow, we have to first convert the data class instance to generic record using Avro.default.toRecord(kotlinSerializer, schema, value), and then use AbstractKafkaAvroSerializer#serializeImpl to convert the generic record to binary stuff.

Who I implemented it:

inline fun <reified T> KafkaAvroKotlinxSerializationSerde(isKey: Boolean, config: Map<String, *>): Serde<T> =
        Serdes.serdeFrom(KafkaAvroKotlinxSerializationSerializer(isKey, config), KafkaAvroKotlinxSerializationDeserializer(isKey, config))

inline fun <reified T> KafkaAvroKotlinxSerializationSerializer(isKey: Boolean, config: Map<String, *>): Serializer<T> =
    object : AbstractKafkaAvroSerializer(), Serializer<T> {
        private val kotlinSerializer = serializer<T>()
        private val schema = Avro.default.schema(kotlinSerializer)
        private val avroSchema = AvroSchema(schema)

        init {
            configure(KafkaAvroSerializerConfig(config))
        }

        override fun serialize(topic: String, value: T?): ByteArray? {
            if (value == null) {
                return null
            }
            val record = Avro.default.toRecord(kotlinSerializer, schema, value)
            // Then do the exact same as KafkaAvroSerializer
            return serializeImpl(
                getSubjectName(topic, isKey, value, avroSchema),
                record,
                avroSchema
            )
        }
    }

inline fun <reified T> KafkaAvroKotlinxSerializationDeserializer(isKey: Boolean, config: Map<String, *>): Deserializer<T> =
    object : AbstractKafkaAvroDeserializer(), Deserializer<T> {
        private val kotlinSerializer = serializer<T>()
        private val schema = Avro.default.schema(kotlinSerializer)

        init {
            this.isKey = isKey
            configure(KafkaAvroDeserializerConfig(config))
        }

        override fun deserialize(topic: String, record: ByteArray?): T? {
            if (record == null) {
                return null
            }
            // Do the exact same as KafkaAvroDeserializer
            val value = deserialize(topic, isKey, record, schema) as GenericRecord?
            return value?.let { Avro.default.fromRecord(kotlinSerializer, it) }
        }
    }

What we can do

Maybe this implementation is not perfect and can be done in another way, like putting a factory method inside Avro. But we will be happier if using this library removes boilerplate like this. By the way, it will not require any additional library, since it's using the org.apache.kafka.common.serialization package.

First "bad" idea

Normally, the KafkaAvroSerializer should serialize the instance as SpecificRecord since we use a specific schema. But to implement it, you have to get a field value using an index, and here is the tricky part.

I can make a PR, but I prefer to validate with you if it corresponds to the lib scope.

Chuckame commented 1 year ago

(for the moment, it's ok, but regarding performance, it should be really better to avoid the GenericRecord step by just having the schema registry call (cons: for me, too much code to copy/paste/maintain) OR having a way to easily implement SpecificRecord (maybe have a static register with a cache)

thake commented 1 year ago

Did you have a look at https://github.com/thake/avro4k-kafka-serializer? It provides a connection between Confluent Schema-Registry and Avro4k. Have a look at the discussion in #1 for more information.

Chuckame commented 1 year ago

Thanks for the answer. I'm still surprised that it's not inside this repository since it's totally coherent with this library. Because it adds more dependencies to littlest libs that have not much stars, that can afraid for long term vision.

Maybe it was declined for your scope ?

OneCricketeer commented 1 year ago

not inside this repository since it's totally coherent with this library

IMO, not really. This library has no dependencies on Kafka or HTTP clients. Adding the Confluent library would add both... And exclude other implementations of the Registry API like Apicurio.

thake commented 1 year ago

@OneCricketeer thanks for you comment. I'm with you on this. I'll close the issue as not planned.