Apicurio / apicurio-registry

An API/Schema registry - stores APIs and Schemas.
https://www.apicur.io/registry/
Apache License 2.0
605 stars 268 forks source link

Some message don't have MAGIC_BYTE, when use with debezium #4199

Closed j2gg0s closed 5 months ago

j2gg0s commented 9 months ago

Only second message's value start with \x00.

k exec -ti -n infra-kafka x01-0 -c kafka -- /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --max-messages 3 --value-deserializer "org.apache.kafka.common.serialization.BytesDeserializer" --topic kcx.cactus.lifestyle.wish_spu --property print.partition=true --property print.offset=true --property print.headers=true --partition=0
Partition:0     Offset:5144346  apicurio.key.globalId:0B,apicurio.key.encoding:BINARY,apicurio.value.globalId:0D,apicurio.value.encoding:BINARY \x02\xB8\xB0\x8B\x80\xE0\xB7\xAA\xA1\x0F\x8C\xFA\x99\x80\xC0\xBB\xBF\xAC\x0B\x02\x06&\x9A\xD0\x90\xC6\xD6\xCE\xBAa\xD0\xC3\xE3\xD7\xDDa\x00\x5C\x00\xF8\x97\xB0\x01\x5C\x02\xB8\xB0\x8B\x80\xE0\xB7\xAA\xA1\x0F\x8C\xFA\x99\x80\xC0\xBB\xBF\xAC\x0B\x02\x06!\x91\xC0\x90\xC6\xD6\xCE\xBAa\xC0\xCF\xF1\xA0\x9Fc\x00\x5C\x00\xF8\x97\xB0\x01\x5C\x162.3.0.Final\x0Amysql\x14kcx.cactus\x90\x8F\xB6\x85\x9Fc\x00\x0Afalse\x12lifestyle\x00\x02\x10wish_spu\xDE\x9C\xB2\xA6\x0D\x02^953b272e-e0d3-11eb-83d4-98039b072ec8:2291839102 mysql-bin.005635\xF0\xCC\xC4\xA4\x02\x00\x02\xA8\xEF(\x00\x02u\x02\xAC\x90\xB6\x85\x9Fc\x00
Partition:0     Offset:5144347  apicurio.key.globalId:0B,apicurio.key.encoding:BINARY,apicurio.value.globalId:0D,apicurio.value.encoding:BINARY \x00\x02\xC4\xBD\x88\x80\x80\xB9\xAE\xF0\x12\xC0\xA1\x9B\x80\x80\x92\xC6\xE4\x0F\x02\x02\x00\x90\xDF\xF1\xA0\x9Fc\x90\xDF\xF1\xA0\x9Fc\x00\x9A\xB6z\xC8\x92\x86\x80\xC0\x85\xA8\xCC\x0A\xAE\xE4\x8C\x01\xC8#\x162.3.0.Final\x0Amysql\x14kcx.cactus\x90\x8F\xB6\x85\x9Fc\x00\x0Afalse\x12lifestyle\x00\x02\x10wish_spu\xDE\x9C\xB2\xA6\x0D\x02^953b272e-e0d3-11eb-83d4-98039b072ec8:2291839140 mysql-bin.005635\xA0\xE9\xC8\xA4\x02\x00\x02\xA8\xEF(\x00\x02c\x02\x96\x98\xB6\x85\x9Fc\x00
Partition:0     Offset:5144348  apicurio.key.globalId:0B,apicurio.key.encoding:BINARY,apicurio.value.globalId:0D,apicurio.value.encoding:BINARY \x02\xE2\xF9\x92\x80\xC0\x90\x8A\xA2\x0F\x8C\xFA\x99\x80\xC0\xBB\xBF\xAC\x0B\x02\x06#f\x80\x90\xD9\xC5\xB4\xDBa\x80\xAE\xE5\xD7\xDDa\x00\x5C\x00\xF8\x97\xB0\x01\x5C\x02\xE2\xF9\x92\x80\xC0\x90\x8A\xA2\x0F\x8C\xFA\x99\x80\xC0\xBB\xBF\xAC\x0B\x02\x06!\x91\xC0\x90\xD9\xC5\xB4\xDBa\x90\xDF\xF1\xA0\x9Fc\x00\x5C\x00\xF8\x97\xB0\x01\x5C\x162.3.0.Final\x0Amysql\x14kcx.cactus\x90\x8F\xB6\x85\x9Fc\x00\x0Afalse\x12lifestyle\x00\x02\x10wish_spu\xDE\x9C\xB2\xA6\x0D\x02^953b272e-e0d3-11eb-83d4-98039b072ec8:2291839146 mysql-bin.005635\xEC\xA5\xC9\xA4\x02\x00\x02\xA8\xEF(\x00\x02u\x02\xBC\x98\xB6\x85\x9Fc\x00
Processed a total of 3 messages

My sink(kafka-connect-jdbc) can consume message correct with apicurio's converter.

This is a bug?

EricWittmann commented 9 months ago

Can you include any configuration you have for your source/sink applications? It looks to me like the messages have headers, and the globalId of the schema is included in the headers. When that is the case, there should not be a magic byte in the payload. The magic byte is only present when the globalId is encoded in the payload, not when it's included in the headers.

j2gg0s commented 9 months ago

Can you include any configuration you have for your source/sink applications? It looks to me like the messages have headers, and the globalId of the schema is included in the headers. When that is the case, there should not be a magic byte in the payload. The magic byte is only present when the globalId is encoded in the payload, not when it's included in the headers.

@EricWittmann Do we have document for this?

j2gg0s commented 9 months ago
{
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.connectionTimeZone": "Asia/Shanghai",
    "database.hostname": "***",
    "database.password": "***",
    "database.port": "3306",
    "database.server.id": "5501",
    "database.user": "***",
    "errors.log.enable": "true",
    "errors.retry.timeout": "600000",
    "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
    "key.converter.apicurio.registry.auto-register": "true",
    "key.converter.apicurio.registry.find-latest": "true",
    "key.converter.apicurio.registry.url": "http://apicurio.kafka:8080/apis/registry/v2",
    "name": "source-cactus",
    "producer.override.max.request.size": "10485760",
    "read.only": "true",
    "schema.history.internal.kafka.bootstrap.servers": "x01.infra-kafka:9092",
    "schema.history.internal.kafka.topic": "kch.cactus",
    "schema.name.adjustment.mode": "avro",
    "snapshot.mode": "initial",
    "table.exclude.list": ".+\\.tp_[0-9]+_.+,.+\\.tpa_[a-zA-Z0-9]+_.+,likes\\..+",
    "time.precision.mode": "connect",
    "tombstones.on.delete": "false",
    "topic.creation.default.cleanup.policy": "compact",
    "topic.creation.default.compression.type": "lz4",
    "topic.creation.default.partitions": "9",
    "topic.creation.default.replication.factor": "1",
    "topic.prefix": "kcx.cactus",
    "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
    "value.converter.apicurio.registry.auto-register": "true",
    "value.converter.apicurio.registry.find-latest": "true",
    "value.converter.apicurio.registry.url": "http://apicurio.kafka:8080/apis/registry/v2"
  },
  "name": "source-cactus",
  "tasks": [
    {
      "connector": "source-cactus",
      "task": 0
    }
  ],
  "type": "source"
}
carlesarnal commented 7 months ago

Eric is right, with your configuration, the information you're looking for is not encoded. The latest available artifact for that topic name is used, you have more information in the docs.

carlesarnal commented 5 months ago

Closing as stale. If any other information is requested, please, re-open!