confluentinc / schema-registry

Confluent Schema Registry for Kafka
https://docs.confluent.io/current/schema-registry/docs/index.html
Other
2.23k stars 1.12k forks source link

dynamic aliases for Avro Key and Value classes #2028

Open msillence opened 3 years ago

msillence commented 3 years ago

We are using Debezium to create messages, the topic names and avro schemas created include the hostname and db schema, when we deploy this from dev, uat production those names change also we quite like having more than one schema in the same cluster.

The problem is that with the names in the schema there is no way to add in a dyanmic mapping to our class that we've generated from the schema for each environment.

This is partially related to https://github.com/confluentinc/schema-registry/issues/793 the core logic being in AbstractKafkaAvroDeserializer getDatumReader

Would you consider a PR so we can add a map of aliases to classes that could be used as well as the existing name in the schema.

It might be worth considering protobuf configuration is so different requiring specific.protobuf.key.type and specific.protobuf.value.type to be configureand not supporting an equivalent of specific.avro.reader as far as I can see.

OneCricketeer commented 3 years ago

the topic names and avro schemas created include the hostname and db schema,

The topic name can be fixed with transforms property within your connector configs.

The registry subject-name can be fixed with setting a different strategy class than the default for the converter/serializer.

msillence commented 3 years ago

I've tried setting key.subject.name.strategy with my own class and I see it in the config but I don't see where that's used in the code nor do I see it called

msillence commented 3 years ago

from my understanding of the code, the class AbstractKafkaAvroDeserializer.getSpecificReaderSchema calls the SpecificData.getClass

I can see AbstractKafkaSchemaSerDeConfig.keySubjectNameStrategy being called and having the new class I've configured, that sets the AbstractKafkaSchemaSerDe.keySubjectNameStrategy and that variable seems to only be used in subjectNameStrategy which in my trace is never called

I'm setting:

key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
key.subject.name.strategy=com.company.OurSchemaNameStrategy
specific.avro.reader=true
OneCricketeer commented 3 years ago

It's the serializer/Debezium you'll want to modify in order to change the topic/subject

msillence commented 3 years ago

OK we can do that but shouldn't there be a mechanism in the same way protobuf allows for avro to use a specified class? Something equivalent to:

specific.protobuf.value.type

OneCricketeer commented 3 years ago

When you consume a Protobuf message, you are using a specific class already, not some generic Message Protobuf type, yes? So, I'm not sure I understand the problem there.

My original comment was simply referring to the fact that you can modify the topic/subject names such that they wouldn't include anything specific to an environment/database schema, and AFAIK, that is not a function of a certain serialization format

msillence commented 3 years ago

I believe (but I've not tested it yet) that in protobuf we can use (in the kafka config) derive.type=true and include in the (protobuf schema)

option java_package = "com.company";
option java_outer_classname = "CurrencyValue";

And that should then do the same thing as the avro schema with connect.name and the kafka config specific.avro.reader=true

I don't see any equivalent in avro for the protobuf kafka config

specific.protobuf.key.type
OneCricketeer commented 3 years ago

With Avro, it is not necessary to use a property to specify a specific type, since the type can be derived directly from the Avro schema, using the namespace and name of the Avro type.

https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-avro.html

msillence commented 3 years ago

so in debezium I set:

key.converter.key.subject.name.strategy=xxx.FixedSchemaNamingStrategy
value.converter.value.subject.name.strategy=xxx.FixedSchemaNamingStrategy

these classes implement SubjectNameStrategy and that says it will "record type that is published to Kafka, registers the schema in the registry under the fully-qualified record name"

What happens is that the schema name stays the same as far as I can see from debugging however no schema is registerd for the topic. Nothing ends up in the registry but I see data in the topic.

I don't want to change the topic names as the different db/schema still need to go into seperate topics but it might be OK to share the schema across these topics. I can imagine that the schema might be different between the two but always in a compatible way. I'm not quite sure what the impact of that would be - I presume one client can continue reading/writing with the old schema when there is a new one.

msillence commented 3 years ago

It look like the avro namespace/connect name are built up in AvroData.fromConnectData -> fromConnectSchema

I don't see anything in ther that would use the name strategy

msillence commented 3 years ago

I still like the idea of configuring the deserialiser I take it form the discussion that you'd prefer not to accept changes to allow this? from the fact theres alreay issue #793 I don't think I'm alone wanting some flexibility

To modify the schema name at poduction that looks like it would require a change to either the AvroData - kafka-connect-avro-data module (which is still part of this project) or something new in debezium the TableSchemaBuilder there is a SchemaNameAdjuster but that is really intended for fixing names so they comply with avro rules/restrictions.