kafkajs / confluent-schema-registry

is a library that makes it easier to interact with the Confluent schema registry
https://www.npmjs.com/package/@kafkajs/confluent-schema-registry
MIT License
157 stars 102 forks source link

Decoding avro messages from a topic with different avro schemas #211

Open avitalriski opened 2 years ago

avitalriski commented 2 years ago

Hello,

I've encountered a problem when trying to decode avro messages that have the same namespace and name, but differ.

I have the following scenario: A kafka topic that has avro messages that are produced with two different schemas, the schemas are not completely different they are actually of the same namespace and name, but one is the updated version of another.

I am using the following code to create the SchemaRegistry:

const registry = new SchemaRegistry(
        {
          host: process.env.SCHEMA_REGISTRY_URL,
        },
        options,
      );

For the sake of simplicity I'll use these two schemas as an example. So let's say schema 1: { type: 'record', name: 'Pet', fields: [ { name: 'kind', type: {type: 'enum', name: 'PetKind', symbols: ['CAT', 'DOG']} }, {name: 'name', type: 'string'} ] }

Schema 2 (updated schema): { type: 'record', name: 'Pet', fields: [ { name: 'kind', type: {type: 'enum', name: 'PetKind', symbols: ['CAT', 'DOG', 'MOUSE']} }, {name: 'name', type: 'string'}, ] }

Now let's assume that an avro message that was encoded with schema 1 has been consumed, the registry will fetch the schema using the registryId that is held within the binary message and save the schema, as long as we keep receiving messages that were encoded with schema 1 everything is fine. Once we receive a message that was encoded with schema 2, schema 2 is being fetched as well with the registryId however since we already have a schema with the name 'Pet' it is being ignored and the schema that is being used is the one we obtained earlier (schema 1), thus we are trying to decode an avro message that was encoded with schema 2 using schema 1, which results in the following error messages: 'trailing data' and 'truncated data'

After realizing the problem I have made two SchemaRegistry instances, one for avro messages that were encoded with schema 1, and one for avro messages that were encoded with schema 2. Right before decoding I extract from the buffer the registryId that is kept in 4 bytes at the beginning of the buffer and based on the Id I decide which registry to use, that way registry 1 keeps only schema 1 within it, and registry 2 keeps only schema 2 within it.

My final thoughts and questions:

  1. Is this behavior expected or is this considered a bug?
  2. Is there support for such scenarios?
  3. Is this already a known problem and perhaps there's a better solution for it?
kenneth-gray commented 2 years ago

Is your issue similar to https://github.com/kafkajs/confluent-schema-registry/issues/75? It sounds like you may have implemented the typeHook solution that is elsewhere, which as you say leaves the registry trying to decode the new message with the first schema.

avitalriski commented 2 years ago

After reading the issue I am not really sure if it is that similar, since I did not get any errors fetching a different schema with the same name. Although the solution in the second comment by ggobbe is very similar to mine. As for the typeHook, yes I have the following default implementation which is used in the forSchemaOptions that is later supplied to the SchemaRegistry class:

function typeHook(schema, opts) {
    let name = schema.name;
    if (!name) {
        return; // Not a named type, use default logic.
    }
    if (!~name.indexOf('.')) {
        // We need to qualify the type's name.
        const namespace = schema.namespace || opts.namespace;
        if (namespace) {
            name = `${namespace}.${name}`;
        }
    }
    // Return the type registered with the same name, if any.
    return opts.registry[name];
}

Could this be the source of my issue? I am not really familiar with how this typeHook is being used, I just followed some default implementations

XavRsl commented 9 months ago

Hi! I still have the exact same behaviour that you describe in your comment. Did you end up finding a suitable workaround/fix for this issue?

Thanks!

avitalriski commented 3 months ago

@XavRsl Don't know if it's still relevant but I haven't found a better solution than what I already described in my first post, which is creating two separate schema registry instances