Apicurio / apicurio-registry

An API/Schema registry - stores APIs and Schemas.
https://www.apicur.io/registry/
Apache License 2.0
554 stars 250 forks source link

Trouble using artifact id in the headers #4836

Open reidmeyer opened 5 days ago

reidmeyer commented 5 days ago

Description

Registry Version: 2.5.9.Final Persistence type: in-memory

I'm trying to do a kafka connector with the apicurio avro converter with schema registry, but with the artifact id in the header instead of the magic byte. I end up getting a Unknown magic byte! error.

I produce a message onto my kafka topic with using kafbat/kafka-ui. I put in a random string in both the key and value, hello and goodbye. In the headers section, I paste

{
    "apicurio.key.artifactId": "something1",
    "apicurio.value.artifactId": "something2",
    "apicurio.key.globalId": "something3",
    "apicurio.value.globalId": "something4",
    "apicurio.key.contentId": "something5",
    "apicurio.value.contentId": "something6"   
}

I've also tried with:

{
    "apicurio.key.artifactId": 1,
    "apicurio.value.artifactId": 2,
    "apicurio.key.globalId": 3,
    "apicurio.value.globalId": 4,
    "apicurio.key.contentId": 5,
    "apicurio.value.contentId": 6   
}

in case the type of the headers is an issue.

I then have a kafka connect config that look like:

  class: io.aiven.connect.jdbc.JdbcSinkConnector
  # autoRestart:
  #   enabled: true
  tasksMax: 1
  config:
    topics: my-topic
    # These are defaults, but they're here for clarity:
    connection.url: myurl

    # This is default, but it's here for clarity:
    insert.mode: insert
    auto.create: True

    value.converter: io.apicurio.registry.utils.converter.AvroConverter
    key.converter: io.apicurio.registry.utils.converter.AvroConverter

    value.converter.schemas.enable: true
    key.converter.schemas.enable: true

    value.converter.apicurio.registry.url: myurl.org/apis/registry/v1
    key.converter.apicurio.registry.url: myurl.org/apis/registry/v1

    batch.size: 1

then I get the error as described.

Do I have the headers defined wrong? Does this converter not work without a magic byte? Am I missing a config?

Environment

Kubernetes: v1.26.15 Kafka Connect from Strimzi: 3.7.0 confluent avro converter: 7.6.1 confluent jdbc sink: 10.7.6

Steps to Reproduce

  1. Run kafka connect with apicurio avro converter and any connector
  2. produce message with raw key/value (i.e. no magic byte), and put headers with artifact ids
  3. run and see failure

Expected vs Actual Behaviour

Based on what I've seen in the docs, I expect the headers to be looked at if there is no magic byte, but I'm getting stuck at the unknown magic byte error.

Logs

Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
    at io.apicurio.registry.utils.serde.AbstractKafkaSerDe.getByteBuffer(AbstractKafkaSerDe.java:90)
    at io.apicurio.registry.utils.serde.AbstractKafkaDeserializer.deserialize(AbstractKafkaDeserializer.java:76)
    at io.apicurio.registry.utils.converter.SchemalessConverter.toConnectData(SchemalessConverter.java:118)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:515)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
    ... 14 more