birdayz / kaf

Modern CLI for Apache Kafka, written in Go.
Apache License 2.0
2.24k stars 143 forks source link

Unable to kaf consume and decode Avro messages encoded with schema references (Avro type: unknown type name) #308

Open jeffkni opened 9 months ago

jeffkni commented 9 months ago

Kaf config (~/.kaf/config)

current-cluster: local
clusteroverride: ""
clusters:
- name: local
  version: ""
  brokers:
  - localhost:9092
  SASL:
    mechanism: PLAIN
    username: admin
    password: admin-secret
    clientID: ""
    clientSecret: ""
    tokenURL: ""
    token: ""
  TLS: null
  security-protocol: SASL_PLAINTEXT
  schema-registry-url: http://localhost:8082

Steps to Reproduce

  1. Schema Registry + Kafka running in docker
    • Expose schema reg in docker to local :8082
      $ docker ps --format 'table {{ .Names }}\t{{.Status}}\t{{.Ports}}
      NAMES                 STATUS                    PORTS
      kafka                 Up 45 minutes             8082/tcp, 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp
      schema-registry       Up 45 minutes (healthy)   8081/tcp, 0.0.0.0:8082->8082/tcp, :::8082->8082/tcp
  2. Publish schemas that have schema references to the schema registry
  3. emit an avro-encoded message to the topic
    • The encoded message was produced from code that used codegen with schema references
  4. kaf consume --verbose -f my.topic to read it

Expected result: kaf decodes the message the way it does when there are no schema references. Actual result: kaf can't decode the message and shows it in encoded avro.

  Headers:
               Key: type   Value: MySuperType
  Key:         MY-KEY
  Partition:   0
  Offset:      3
  Timestamp:   2024-02-28 13:15:26.982 +0000 GMT
  [ avro encoded data here ]

Adding--verbose you can see kaf complain it doesn't understand the reference type:

could not decode Avro data: Record "MySuperType" field 2 ought to be valid Avro named type: 
Union item 2 ought to be valid Avro type: unknown type name:
 "com.mycompany.common.MySharedType"

Workaround

Confluent Schema Registry ships with an exe kafka-avro-console-consumer that can be used to tail Kafka topics with "live" message decoding of messages that contain schema references. If you're using docker, this gets close to kaf consume layout with decoding:

docker exec -it schema-registry /bin/bash -c "kafka-avro-console-consumer --topic my.super.topic \
  --bootstrap-server kafka:29092 --consumer-property security.protocol=SASL_PLAINTEXT \
  --consumer-property sasl.jaas.config='org.apache.kafka.common.security.plain.PlainLoginModule required username=my-kafka-username password=my-kafka-password;' \
--consumer-property sasl.mechanism=PLAIN \
--property line.separator=$'\n=======' \
--property key.separator=$'\n' \
--property print.offset=true \
--property print.partition=true \
--property print.headers=true \
--property print.timestamp=true \
--property schema.registry.url=http://localhost:8082"

Note the kafka:29092 port comes from the KAFKA_LISTENERS config in docker-compose.yaml, for example:

  kafka:
    image: confluentinc/confluent-local
    hostname: kafka
    container_name: kafka
    ports:
      - '9092:9092'
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENERS: 'INTERNAL://kafka:29092,CONTROLLER://kafka:29093,EXTERNAL://0.0.0.0:9092'
   [...]