edenhill / kcat

Generic command line non-JVM Apache Kafka producer and consumer
Other
5.45k stars 484 forks source link

kcat fails to deserialize Avro using schema registry, but confluent_kafka_python succeeds #356

Open spenczar opened 2 years ago

spenczar commented 2 years ago

I am trying to use kcat to read Confluent Wire Format-encoded messages. They're Avro encoded, and I have a Schema Registry. This little Python script works with confluent_kafka_python v1.7.0:

from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
import time
import os

def main():
    sr_client = SchemaRegistryClient({"url": os.environ["KAFKA_SCHEMA_REGISTRY"]})

    deserializer = AvroDeserializer(sr_client)

    config = {
        "bootstrap.servers": os.environ["KAFKA_BOOTSTRAP"],
        "group.id": os.environ["KAFKA_USERNAME"] + "-testing",
        "security.protocol": "SASL_SSL",
        "sasl.mechanisms": "SCRAM-SHA-512",
        "sasl.username": os.environ["KAFKA_USERNAME"],
        "sasl.password": os.environ["KAFKA_PASSWORD"],
        "value.deserializer": deserializer,
    }
    consumer = DeserializingConsumer(config)

    consumer.subscribe(["alerts-simulated"])

    while True:
        msg = consumer.poll(1)
        if msg is None:
            print("waiting for messages...")
            time.sleep(1)
        else:
            deserialized = msg.value()
            print(deserialized)

if __name__ == "__main__":
    main()

But this kcat invocation fails, even though it should be identical, and even while running on the same messages:

-> % docker run -it --network=host edenhill/kcat:1.7.0 \
  -b $KAFKA_BOOTSTRAP \
  -X security.protocol=SASL_SSL \
  -X sasl.mechanisms=SCRAM-SHA-512 \
  -X sasl.username=$KAFKA_USERNAME \
  -X sasl.password=$KAFKA_PASSWORD \
  -C \
  -t alerts-simulated \
  -r $KAFKA_SCHEMA_REGISTRY \
  -s value=avro \
  -J
% Reached end of topic alerts-simulated [2] at offset 1650023
% Reached end of topic alerts-simulated [0] at offset 1612739
% ERROR: Failed to deserialize value in message in alerts-simulated [1] at offset 1652255: Failed to encode Avro as JSON: terminating

None of this data is sensitive, so I can upload the schema and message here too: message_and_schema.zip

That zip has a 'msg.avro' which is the Kafka message (including the 5-byte Confluent Wire Format header) and a "schema.json" which is the Avro schema used.

Be warned - this is a big and complex schema, about 40KB, and the Avro message is 580KiB of scientific simulation data.

crccheck commented 2 years ago

I have a very similar issue where I have a JavaScript client that can successfully tail a topic while kcat errors with:

Avro/Schema-registry key deserialization: Invalid CP1 magic byte 57, expected 0: message not produced with Schema-Registry Avro framing: terminating

I know there's a magic byte because I'm pulling it off the raw message and I know it is encoded in Avro.

edenhill commented 2 years ago

Each message must be prefixed by the schema-registry specific framing, which is one byte for magic (value 0) and then the schemaId in big-endian (4 bytes), then followed by the serialized payload (e.g., serialized avro). This message means there was no such framing. You can verify what the framing looks like by running kcat without a deserializer and passing the key to hexdump or similar.

crccheck commented 2 years ago

oooh, my issue was staring me in the face and I didn't notice... "key deserialization". Only my values are serialized. Changing my config to -s value=avro fixed it. This wasn't intuitive because other clients I've used and written search for the magic byte and only decode if it exists.

I also had to pass my schema registry password unencoded where I could have sworn I read that it should be url encoded.

spenczar commented 2 years ago

@edenhill You can confirm that msg.avro in the zip I provided uses the schema-registry specific framing ("Confluent wire format"):

-> % cat msg.avro | head -c 5 | hexdump
0000000 0000 0000 0001
0000005

This is saying that I have a message with schema ID of 1. I still see the above error from kcat.

edenhill commented 2 years ago

Failed to encode Avro as JSON: suggests that deserialization worked, but it failed to convert the avro objects to JSON. It could be that your schema uses constructs that are not supported by the avro-c version that is used by kcat.