confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
123 stars 896 forks source link

Question: Backward compatible schemas and old messages #892

Open simonwahlgren opened 4 years ago

simonwahlgren commented 4 years ago

Description

I'll try to explain using an example instead:

  1. I have an Avro producer/consumer, the consumer uses latest as auto.offset.reset. The schema registry is using backward as compatibility config.
  2. I produce a message with schema id 1
  3. I consume and decode the message using schema id 1 (since that's what's embedded in the message and used when decoding the message).
  4. I evolve the schema adding a new field foo (with default value set to null) and produce a new message with schema id 2
  5. I consume and decode the message using schema id 2. In my application I print the new field foo, everything so far fine and dandy
  6. I update my consumer config changing auto.offset.reset to earliest and update the group.id. Now the problem begins. After re-starting the consumer it will crash because the first produced message (version 1) doesn't have the foo field, and since we always use the writer schema to decode the message it will always fail when reading the old messages. I can of course use a get and return a default value in the application if a field doesn't exist, but that means we also always have to make sure our applications are backward compatible which feels wrong.

So to my question: Is this the expected behavior or am I missing something?

After reading https://docs.confluent.io/current/schema-registry/avro.html my understanding is that if we are using backward as compatibility config we should always read the latest version of the schema and not the version it was produced with.

Here's a quote from the page from the "Backward Compatibility" section:

Avro implementation details: Take a look at ResolvingDecoder in the Apache Avro project to understand how, for data that was encoded with an older schema, Avro decodes that data with a newer, backward-compatible schema.

mhowlett commented 4 years ago

it does seem like something is off here. if you use the latest schema when reading, and the schema is backwards compatible (i.e. you provide a default value), it shouldn't fail.

side note: forwards compatibility is often important with kafka as well because it's often the case that producers get updated before consumers.

simonwahlgren commented 4 years ago

@mhowlett Exactly. The problem seems to be that the avro serializer always fetches the schema id for the message that it was produced with, and there's no way to control this. See here: https://github.com/confluentinc/confluent-kafka-python/blob/master/confluent_kafka/avro/serializer/message_serializer.py#L163

I would expect something like this:

  1. Read the schema id from the message
  2. Get the compatibility setting for the schema
  3. Depending on the compatibility setting fetch the correct schema according to https://docs.confluent.io/current/schema-registry/avro.html#summary