confluentinc / schema-registry

Confluent Schema Registry for Kafka
https://docs.confluent.io/current/schema-registry/docs/index.html
Other
2.19k stars 1.11k forks source link

schema references, use.latest.version and schema evolution/compatibility issues #2479

Open ashleybill opened 1 year ago

ashleybill commented 1 year ago

Expectation using auto.register.schemas=false and use.latest.version=true (required to make use of schema references ) should still allow old clients to produce messages from older versions of a schema, after it has evolved to add or remove fields

Actual Outcome Serialization Exceptions due to index issues between consumer version of the schema and schema registry version.

Detail Trying to make use of schema references and following the documentation we have updated our apps to use auto.register.schemas=false use.latest.version=true

The issue this gives us is that schema evolution/compatibility appears to break in a number of scenarios.

1) Consumer uses old version of schema ( ie they have not upgraded to latest version ) to produce an event where a field has been removed in the latest version

Consumer has java class based on the following schema : https://goonlinetools.com/snapshot/code/#jxb3x2uohr9mvjxig284sp

where the latest schema in the schema registry is this : https://goonlinetools.com/snapshot/code/#d5ywjof8wcebfacguvhifw

(ie the nullable field someAddedField has been removed)

When trying to Serialize the event ( with use.latest.version=true ) we end up with a problem, due to the indices in the 2 schemas being different ( consumer version index 2 is someAddedField, schema registry version index 2 is someSubSchema )

example: TestEvent testEvent = TestEvent.newBuilder() .setId("ABC") .setSomeAddedField("BCD") .setSomeSubSchema(TestEventSub.newBuilder() .setSubField("EFG") .build()) .build(); byte[] serialized = serializer.serialize("Bob", testEvent);

giving the following error:
Caused by: org.apache.avro.UnresolvedUnionException: Not in union ["null",{"type":"record","name":"TestEventSub","namespace":"com.rbauction.test.events.models.sub","doc":"A test event schema created by terraform","fields":[{"name":"subField","type":["null","string"],"doc":"subField","default":null}]}]: BCD

which makes sense, as it is trying to set what the consumer has for index 2 ( 'BCD' ) to the field at index 2 in the schema registry ( the someSubSchema field ) which obviously does not match in this case.

2) Similar behaviour but adding a new field. at the end - Consumer has older version of schema Schema Registry version has an additional field at the end of the schema as the latest version

Trying to serialize the older version with use.latest.version causes indexOutOfBounds exception as the Serializer is iterating of the fields from the schema registry and get the related value from the consumers version, which causes a look up for an index that does not exist.

For some added context we are using: io.confluent:kafka-avro-serializer:5.5.1 ( but have also tried with 7.3.0 ) our java classes are generated from gradle plugin "com.github.davidmc24.gradle.plugin.avro" version "1.5.0" using the standard velocity templates from avro ( v 1.11.0 )

Are there any recommendations on handling this ( or some additional settings that are missed ), so we don't have to have all clients upgrade at the same time a new schema is pushed that they use?

rayokota commented 1 year ago

For now you can use use.schema.id to pin the schema ID used by a client. In the future, we are working on more sophisticated ways to "version" clients.

drewCliftonAtKroger commented 1 year ago

Would someone be able to direct me to documentation for the use.schema.id property?

I see the auto.register.schemas and use.latest.version properties docuemnted on this page: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#schema-normalization and also mentioned on this one: https://docs.confluent.io/platform/current/schema-registry/security/index.html#sr-security-overview

I've seen a similarly named property mentioned in docs for go avro serde support so I assume this property should be set the same way ( to the specific schema id to be serialized) but when depending on io.confluent:kafka-avro-serializer:6.2.0' I don't see any effect when setting this property. Instead it seems to use schema reflection and used the schema from the jar generated by com.github.davidmc24.gradle.plugin.avro - which generally works fine - but if there is a way to specify the schema id to query the registry w/ I'd like to try that.

rayokota commented 1 year ago

You need 7.0.1 or higher to use use.schema.id

sombraglez commented 10 months ago

We are having a similar issue by deserialization of messages after the producer and owner of the topics updates the avro schema of the corresponding subject adding a new optional field.

Currently we are using version 7.5.1 and as described above using auto.register.schemas=false and use.latest.version=true

What I understood from the documentation of use.latest.version (https://docs.confluent.io/platform/current/schema-registry/connect.html#configuration-options) , this property only applies to the serialization of messages, that means, it only affects the schema used by a producer when it produces a new messages. However, according to your implementation, this property is also relevant during the deserialization of the messages

https://github.com/confluentinc/schema-registry/blob/a23eb03c682f8a91901f713e5f49387cb848611d/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L473

I think the documentation is not as complete as should be since the deserialization and the consumer side implications are not mentioned.

Since I'm not sure if this is an issue or if it's more due to my lack of knowledge but: is there any way to configure a producer to use the latest available schema version for the new messages but allowing at the same time consumer(s) to keep on reading messages using an old version of the subjects schema?

piyush-panda commented 8 months ago

We are having a similar issue during deserialization of messages.

Currently we are using version 7.5.2 and using auto.register.schemas=false and use.latest.version=true, we migrated from version 6.2.0.

Starting version 7.4.0 use.latest.version is used during deserialization and that forces it to use the latest schema from schema registry. The schema in our schema registry does not have Java specific attribute i.e. "avro.java.string":"String" for String and Map types. Due to this String types and Map keys are being deserialized to Utf8 objects instead of String.

In version 6.2.0 AbstractKafkaAvroDeserializer used the schema residing in the generated Java classes that we generate using Gradle Avro Plugin, this schema had "avro.java.string":"String" attribute in all schema elements of String and Map types as a result of which they are deserialized to String instead of Utf8.

As a result of this change in behaviour we have breaking functionality when we read map value using a map key name passed in as a String.

Is it possible to allow the old behaviour of using the schema inside the generated Schema classes during deserilization?

realmohan001 commented 2 months ago

any resolution to this problem? My consumer is failing whenever producer updates the AVRO schema.

I tried to use "use.schema.id", and it is not helping with the deserialization. @rayokota @sombraglez @piyush-panda @ashleybill

Metriximor commented 2 months ago

In my case I figured out that by setting the property avro.remove.java.properties=true everything works flawlessly, and I used to have the same issues that were described in this thread.

The documentation about it is pretty hidden away in the confluent docs:

The avro-maven-plugin generated code adds Java-specific properties such as "avro.java.string":"String", which may prevent schema evolution. You can override this by setting avro.remove.java.properties=true in the Avro serializer configurations.

realmohan001 commented 2 months ago

Thank you for looking it to this issue!

@Metriximor : I have been using below gradle plugin to generate POJO's from AVRO. But my issues is not due to POJO's. https://github.com/davidmc24/gradle-avro-plugin

My issue is during deserialization, Can we use "use.schema.id", so that the consumer can still deserialize the message after schema evolution using the old POJO's?

or I should be able ignore the newly add new/optional fields, during deseialization.

@rayokota @sombraglez @piyush-panda @ashleybill

Metriximor commented 2 months ago

@realmohan001 Yes I use the same plugin, I did not set the "use.schema.id" setting, but the schema is still able to evolve just fine, or work with old POJO's

realmohan001 commented 2 months ago

@Metriximor : Thank you! My application has 2 Kafka consumers, I changed them to use GenericRecord now, instead of SpecificRecord, and behavior is slightly different now,

1) It is able to ignore newly added optional fields(FULL compatibility), and we are able to parse with old schema. 2) When new required field is added(Forward Transitive compatibility) by the producer, it is not ignoring it, and consumer fails.

Not sure why it is failing in case 2, ideally, for Forward Transitive, producer updates first, and consumer can pick up.

thanks in Advance!