confluentinc / confluent-kafka-javascript

Confluent's Apache Kafka JavaScript client
https://www.npmjs.com/package/@confluentinc/kafka-javascript
MIT License
83 stars 7 forks source link

AVRO serialization issues with union while rule is present #184

Closed apeloquin-agilysys closed 2 days ago

apeloquin-agilysys commented 5 days ago

Environment Information

Steps to Reproduce

Given a schema with a field that is a union of string and an enum, we can do a serialization/deserialization without issue. If, however, the same schema has a rule applied the serialization fails, even if the rule is not related to the field.

The test code below illustrates the issue (modeled from existing avro tests for the schemaregistry). The test without a rule is successful. The test with a rule fails:

Error: invalid ["string",{"name":"test.Color","type":"enum","symbols":["RED","BLUE"]}]: null

    at throwInvalidError (/Users/peloquina/src/github/confluent-kafka-javascript/node_modules/avsc/lib/types.js:3042:9)
    at WrappedUnionType.Object.<anonymous>.WrappedUnionType._write (/Users/peloquina/src/github/confluent-kafka-javascript/node_modules/avsc/lib/types.js:1498:7)
    at RecordType.writeUnionTest [as _write] (eval at Object.<anonymous>.RecordType._createWriter (/Users/peloquina/src/github/confluent-kafka-javascript/node_modules/avsc/lib/types.js:2343:10), <anonymous>:8:8)
    at RecordType.Object.<anonymous>.Type.toBuffer (/Users/peloquina/src/github/confluent-kafka-javascript/node_modules/avsc/lib/types.js:658:8)
    at AvroSerializer.serialize (/Users/peloquina/src/github/confluent-kafka-javascript/schemaregistry/dist/serde/avro.js:85:37)
import {afterEach, describe, expect, it} from '@jest/globals';
import {
  AvroDeserializer,
  AvroDeserializerConfig,
  AvroSerializer,
  AvroSerializerConfig,
  AwsKmsDriver,
  AzureKmsDriver,
  ClientConfig,
  Clock,
  FieldEncryptionExecutor,
  GcpKmsDriver,
  HcVaultDriver,
  LocalKmsDriver,
  Rule,
  RuleMode,
  RuleSet,
  SchemaRegistryClient,
  SerdeType
} from "@confluentinc/schemaregistry";
import {JsonataExecutor} from "@confluentinc/schemaregistry/rules/jsonata/jsonata-executor";

const unionSchema = `
{
  "type": "record",
    "name": "UnionTest",
    "namespace": "test",
    "fields": [
    {
      "name": "color",
      "type": [
        "string",
        {
          "type": "enum",
          "name": "Color",
          "symbols": [
            "RED",
            "BLUE"
          ]
        }
      ],
      "default": "BLUE"

    }
  ],
  "version": "1"
}`;

class FakeClock extends Clock {
  fixedNow: number = 0

  override now() {
    return this.fixedNow;
  }
}

const fieldEncryptionExecutor = FieldEncryptionExecutor.registerWithClock(new FakeClock());
JsonataExecutor.register();
AwsKmsDriver.register();
AzureKmsDriver.register();
GcpKmsDriver.register();
HcVaultDriver.register();
LocalKmsDriver.register();

const baseURL = 'mock://';

const topic = 'topic1';
const subject = topic + '-value';

describe('AvroSerializer', () => {
  afterEach(async () => {
    const conf: ClientConfig = {
      baseURLs: [baseURL],
      cacheCapacity: 1000
    };
    const client = SchemaRegistryClient.newClient(conf);
    await client.deleteSubject(subject, false);
    await client.deleteSubject(subject, true);
  });

  it('string+enum union', async () => {
    const conf: ClientConfig = {
      baseURLs: [baseURL],
      cacheCapacity: 1000
    };
    const client = SchemaRegistryClient.newClient(conf);
    const ser = new AvroSerializer(client, SerdeType.VALUE, {useLatestVersion: true});
    const info = {
      schemaType: 'AVRO',
      schema: unionSchema
    };
    await client.register(subject, info, false);
    const obj = {
      color: {"test.Color": "BLUE"}
    };
    const bytes = await ser.serialize(topic, obj);

    const deser = new AvroDeserializer(client, SerdeType.VALUE, {});
    const obj2 = await deser.deserialize(topic, bytes);
    expect(obj2.color).toEqual(obj.color);
  });

  it('string+enum union with non-applicable rule', async () => {
    const conf: ClientConfig = {
      baseURLs: [baseURL],
      cacheCapacity: 1000
    };
    const client = SchemaRegistryClient.newClient(conf)
    const serConfig: AvroSerializerConfig = {
      useLatestVersion: true,
      ruleConfig: {
        secret: 'mysecret'
      }
    };
    const ser = new AvroSerializer(client, SerdeType.VALUE, serConfig);
    const dekClient = fieldEncryptionExecutor.client!;

    const encRule: Rule = {
      name: 'test-encrypt',
      kind: 'TRANSFORM',
      mode: RuleMode.WRITEREAD,
      type: 'ENCRYPT',
      tags: ['PII'],
      params: {
        'encrypt.kek.name': 'kek1',
        'encrypt.kms.type': 'local-kms',
        'encrypt.kms.key.id': 'mykey',
      },
      onFailure: 'ERROR,ERROR'
    };
    const ruleSet: RuleSet = {
      domainRules: [encRule]
    };

    const info = {
      schemaType: 'AVRO',
      schema: unionSchema,
      ruleSet
    };

    await client.register(subject, info, false);

    const obj = {
      color: {"test.Color": "BLUE"}
    };
    const bytes = await ser.serialize(topic, obj);

    const deserConfig: AvroDeserializerConfig = {
      ruleConfig: {
        secret: 'mysecret'
      }
    };
    const deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig);
    fieldEncryptionExecutor.client = dekClient;
    const obj2 = await deser.deserialize(topic, bytes);
    expect(obj2.color).toEqual(obj.color);
  });
});

This issue was found while trying to track down a similar issue with deserialization. In that case the serialization happened on a partner service with the Java client, using the same schema of course. In that scenario, the field was deserializing as a null even though the source had a value. We were unable to replicate that issue in a standalone test due to this issue. Seems likely the two are related.

rayokota commented 5 days ago

Thanks @apeloquin-agilysys , I'll take a look