confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
127 stars 1.04k forks source link

Cannot query topics using schemas named with RecordNameStrategy #8825

Closed nicollette closed 2 years ago

nicollette commented 2 years ago

Describe the bug I cannot read a topic that uses a schema named with the RecordNameStrategy.

To Reproduce Steps to reproduce the behavior, include:

  1. Using confluentinc/ksqldb-server:0.22.0 Docker image
  2. Create an Avro schema, example .avsc file:
    {
    "type" : "record",
    "name" : "IdTest",
    "namespace" : "streams",
    "doc" : "Test Avro Schema",
    "fields" : [
    {
      "name": "id",
      "type": ["null", "string"]
    }
    ]
    }
  3. Produce data to Kafka using this schema and register it using the RecordNameStrategy. This is how I configured the SerDe in my Kafka Streams app:
    final SpecificAvroSerde<IdTest> avroSerde = new SpecificAvroSerde<>();
    final Map<String, String> serdeConfig = new HashMap<>();
    serdeConfig.put(
        AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://$url:80");
    serdeConfig.put(AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, RecordNameStrategy.class.getName());
    avroSerde.configure(serdeConfig, false);

    The schema will be automatically registered when messages are produced to Kafka.

  4. Try creating STREAM over this topic:
    CREATE SOURCE STREAM IF NOT EXISTS test_stream
    WITH (
    KAFKA_TOPIC = 'test_topic',
    VALUE_FORMAT = 'AVRO'
    );

    This fails with:

    Schema for message values on topic 'nlui-test-avro' does not exist in the Schema Registry.
    Subject: nlui-test-avro-value
    Possible causes include:
    - The topic itself does not exist
        -> Use SHOW TOPICS; to check
    - Messages on the topic are not serialized using a format Schema Registry supports
        -> Use PRINT 'nlui-test-avro' FROM BEGINNING; to verify
    - Messages on the topic have not been serialized using a Confluent Schema Registry supported serializer
        -> See https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
    - The schema is registered on a different instance of the Schema Registry
        -> Use the REST API to list available subjects  https://docs.confluent.io/current/schema-registry/docs/api.html#get--subjects
    - You do not have permissions to access the Schema Registry.
        -> See https://docs.confluent.io/current/schema-registry/docs/security.html
  5. Create the STREAM by specifying the Schema ID:
    CREATE SOURCE STREAM IF NOT EXISTS test_stream
    WITH (
    KAFKA_TOPIC = 'test_topic',
    VALUE_FORMAT = 'AVRO',
        VALUE_SCHEMA_ID = '$id_of_schema'
    );

    The STREAM is created.

  6. Try querying the stream with:
    SELECT * FROM test_stream EMIT CHANGES;

Expected behavior When specifying VALUE_SCHEMA_ID, I would expect ksqlDB to be able to find the schema using this ID and I would be able to query the topic in ksqlDB. According to this doc, The schemas referred to by KEY_SCHEMA_ID and VALUE_SCHEMA_ID must be registered in {{ site.sr }}. They can be under any subject but must match the formats defined by KEY_FORMAT and VALUE_FORMAT, respectively..

Actual behaviour When querying the STREAM in Step 6 in the Steps to Reproduce, the server logs show it cannot deserialize the messages:

task [0_0] Skipping record due to deserialization error. topic=[test_topic] partition=[0] offset=[41315] (org.apache.kafka.streams.processor.internals.RecordDeserializer:89)
org.apache.kafka.common.errors.SerializationException: Error deserializing message from topic: test_topic
        at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:55)
        at io.confluent.ksql.serde.tls.ThreadLocalDeserializer.deserialize(ThreadLocalDeserializer.java:37)
        at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:225)
        at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:204)
        at io.confluent.ksql.serde.GenericDeserializer.deserialize(GenericDeserializer.java:59)
        at io.confluent.ksql.logging.processing.LoggingDeserializer.tryDeserialize(LoggingDeserializer.java:61)
        at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:48)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
        at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:304)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:960)
        at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1056)
        at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:950)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:739)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:602)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:574)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic test_topic to Avro:
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
        at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:49)
        ... 18 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema version for id 3406
        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:241)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:188)
        at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
        ... 19 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:452)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:439)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:246)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:415)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:128)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:170)
        ... 21 more

Additional context I am looking for a solution for querying topics that use any Subject Naming Strategy.

lihaosky commented 2 years ago

Hi @nicollette , VALUE_SCHEMA_ID and KEY_SCHEMA_ID are not properly supported in 0.22 release and it's not in public documentation. It's reimplemented and will be out in 0.24 release. Can you try again using master?

lihaosky commented 2 years ago

Hi @nicollette , I'm closing this as it's in old release which schema_id is not properly supported.

nicollette commented 2 years ago

Hi @lihaosky , thanks for the update! Using the schema in the issue description (a single String field), ksqldb version 0.24.0 and VALUE_SCHEMA_ID, everything works as expected 🎉

I tried creating a STREAM using a schema that contains a single Float field and it fails with:

Value format does not support schema.
format: AVRO
schema: Persistence{columns=[`id` DOUBLE], features=[]}
reason: Unexpected schema type: Schema{FLOAT32}

This is on v.0.24.0 and I specified VALUE_SCHEMA_ID. My Avro schema is:

{
  "type" : "record",
  "name" : "IdTestFloat",
  "namespace" : "streams",
  "doc" : "Test Avro Schema",
  "fields" : [
    {
      "name": "id",
      "type": ["null", "float"]
    }
  ]
}

Is this the expected behavior?

waytoalpit commented 8 months ago

@nicollette Ca you please confirm if this is working?