kafkajs / confluent-schema-registry

is a library that makes it easier to interact with the Confluent schema registry
https://www.npmjs.com/package/@kafkajs/confluent-schema-registry
MIT License
154 stars 101 forks source link

AVSC files without a top level name, type and fields are considered invalid, but it shouldn't. #169

Open samhwang opened 2 years ago

samhwang commented 2 years ago

Hey KafkaJS Team, just wanted to started that this project is superb. However, I just came into this weird hiccup, which I think shouldn't be. I'm still new to how Kafka and Avro (and KafkaJS) works, so if I'm wrong, please lead me in the right direction :D.

Issue Description

I'm trying to read an avro file that does not have a top level record, but instead it has an array of records.

[
  {
    "name": "ARecord",
    "namespace": "com.samhwang.testavro",
    "type": "record",
    "fields": [
      {"name": "first_name", "type": "string"},
      {"name": "last_name", "type": "string"}
    ]
  },
  {
    "name": "BRecord",
    "namespace": "com.samhwang.testavro",
    "type": "record",
    "fields": [
      {"name": "test_attr", "type": "string"}
    ]
  }
]

This is validated to be correct, both by a locally hosted Kafka Schema Registry (using the docker image confluentinc/cp-schema-registry:6.2.1) and on Confluent Cloud (see screenshot attached). Screen Shot 2021-11-12 at 2 18 05 pm

Reproduction steps/repo

I have created a sample repo in Typescript to reproduce this issue at: samhwang/test-node-kafka.

Reproduction steps

This script is essentially:

    try {
        const schema = await fs.readFile(absolutePathToSchema, {encoding: 'utf-8'});
        const url = schemaRegistryUrl + "/subjects/" + topic + "-value/versions";
        const headers = {"Content-Type": "application/vnd.schemaregistry.v1+json"};

        const response = await axios.post(url, {schema}, {
            headers,
        });
        const {data} = response;

        console.log("Schema Registered Successfully!", data);
    } catch (error: any) {
        console.error(error);
    }
    try {
        const schema = await readAVSCAsync(absolutePathToSchema);
        const data = registry.register(schema);
        console.log("Schema Registered Successfully!", data);
    } catch (error: any) {
        console.error(error);
    }

Possible solution?

pmsfc commented 4 months ago

@Nevon We're having the same issue when consuming a topic with a schema without top level record.

For example this code:

// this fails!
registry.decode(buffer, {
   [SchemaType.AVRO]: {readerSchema}, // We can't use a RawAvroSchema type without a Record
})

But if we use the schema id thats included in the Kafka message it works.

// this works!
registry.decode(buffer)

On the other hand the avsc lib allows this, we can check it's typing:

export type AvroSchema = DefinedType | DefinedType[]; // Note the array here
type DefinedType = PrimitiveType | ComplexType | LogicalType | string;
type PrimitiveType = 'null' | 'boolean' | 'int' | 'long' | 'float' | 'double' | 'bytes' | 'string';
type ComplexType = NamedType | RecordType | EnumType | MapType | ArrayType | FixedType;
type LogicalType = ComplexType & LogicalTypeExtension;