confluentinc / schema-registry

Confluent Schema Registry for Kafka
https://docs.confluent.io/current/schema-registry/docs/index.html
Other
2.23k stars 1.11k forks source link

Avro producer, with auto-registered schema and topic set to "confluent.value.schema.validation=true" produces InvalidRecordException #2636

Closed josephh closed 1 year ago

josephh commented 1 year ago

Environment

Running Confluent locally, version 7.2.0, Java 11.

Steps to reproduce

  1. Create a topic called “flow”
  2. Publish a message to that topic, using kafka-avro-console-producer (string serializer for the key and a simple, 2 field avro schema as follows, { “type”: “record”, “name”: “demoRecord”, “fields”: [ { “name”: “first”, “type”: “int” }, { “name”: “second”, “type”: “string” }]}). Key-value message successfully published abc:{“first”:3, “second”: “foo”}
  3. confirm schema has been auto-registered in schema registry, i.e.
    curl --silent -X GET http://localhost:8081/subjects | jq
    [
    “flow-value”
    ]
  4. “Turn on” schema/message validation, i.e.
    kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name flow --add-config confluent.value.schema.validation=true
    Completed updating config for topic flow.
  5. Try and publish the same message again, again using an avro console producer. Kafka now errors, with output:

    [2023-05-10 22:07:42,769] ERROR Error when sending message to topic flow with key: 3 bytes, value: 10 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback:52) org.apache.kafka.common.InvalidRecordException: Log record DefaultRecord(offset=0, timestamp=1683752861715, key=3 bytes, value=10 bytes) is rejected by the record interceptor io.confluent.kafka.schemaregistry.validator.RecordSchemaValidator

How can i get some more detail on why this record is considered "Invalid"?

josephh commented 1 year ago

How can i get some more detail on why this record is considered "Invalid"?

Note to self - just look in the server.log... the config for the record validator is written there (and the hostname for the schema registry should have been localhost. I had mistakenly copied and pasted the example from https://docs.confluent.io/platform/current/schema-registry/schema-validation.html#prerequisites-and-setting-sr-urls-on-the-brokers). Could the server have warned me about that (maybe it does already somewhere?)? 🤔

[2023-06-22 16:01:46,570] INFO RecordSchemaValidatorConfig values: 
        confluent.basic.auth.credentials.source = 
        confluent.basic.auth.user.info = [hidden]
        confluent.bearer.auth.credentials.source = 
        confluent.bearer.auth.token = [hidden]
        confluent.key.schema.validation = false
        confluent.key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
        confluent.missing.id.cache.ttl.sec = 60
        confluent.missing.id.query.range = 200
        confluent.missing.schema.cache.ttl.sec = 60
        confluent.schema.registry.max.cache.size = 10000
        confluent.schema.registry.max.retries = 1
        confluent.schema.registry.retries.wait.ms = 0
        confluent.schema.registry.url = [http://schema-registry:8081]
josephh commented 1 year ago

Check the schema registry logs and confirm the RecordSchemaValidatorConfig in future!