confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
124 stars 1.04k forks source link

KSQLDB Push Queries Fail to Deserialize AVRO Data - Schema Lookup Performed with Wrong Schema ID #9519

Open ofelix03 opened 2 years ago

ofelix03 commented 2 years ago

I'm not certain what I could be missing.

  1. I have set up a Kafka broker server, with a Zookeeper and a distributed Kafka Connect.
  2. For schema management, I have set up an Apicurio Schema Registry instance
  3. I also have KSQLDB setup

The following I can confirm is working as expected

  1. My source JDBC connector successfully pushed table data into the topic stss.market.info.public.ice_symbols

Problem:

Inside the KSQLDB server, I have successfully created a table from the topic stss.market.info.public.ice_symbols

enter image description here

Here is the detail of the table created

enter image description here

The problem I'm facing is when performing a push query against this table, it returns no data. Deserialization of the data fails due to the unsuccessful lookup of the AVRO schema in the Apicurio Registry.

Looking at the Apicurio Registry logs reveals that KSQLDB calls to Apicrio Registry to fetch the deserialization schema using a schema ID of 0 instead of 5, which is the ID of the schema I have registered in the registry.

enter image description here

KSQLDB server logs also confirm this 404 HTTP response in the Apicurio logs as shown in the image below

enter image description here

Expectation:

I expect, KSQLDB queries to the table to perform a schema lookup with an ID of 5 and not 0. I'm guessing I'm probably missing some configuration.

Here is the image of the schema registered in the Apicruio Registry enter image description here enter image description here

Here is also my source connector configuration. It has the appropriate schema lookup strategy configured. Although, I don't believe KSQLDB requires this when deserialization its table data. This configuration should only be relevant to the capturing of the table data, and its validation and storage in the topic stss.market.info.public.ice_symbols.

{
  "name": "new.connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name": "pgoutput",
    "database.hostname": "172.17.203.10",
    "database.port": "6000",
    "database.user": "postgres",
    "database.password": "123",
    "database.dbname": "stss_market_info",
    "database.server.name": "stss.market.info",
    "table.include.list": "public.ice_symbols",
    "message.key.columns": "public.ice_symbols:name",
    "snapshot.mode": "always",
    "transforms": "unwrap,extractKey",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKey.field": "name",
    "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
    "value.converter.apicurio.registry.url": "http://local-server:8080/apis/registry/v2",
    "value.converter.apicurio.registry.auto-register": true,
    "value.converter.apicurio.registry.find-latest": true,
    "value.apicurio.registry.as-confluent": true, 
    "name": "new.connector",
    "value.converter.schema.registry.url": "http://local-server:8080/apis/registry/v2"
  }
}

Thanks in advance for any assistance.

colinhicks commented 2 years ago

@ofelix03, I am not familiar with the Apicurio registry. It looks like another Apicurio user encountered a similar problem: https://github.com/Apicurio/apicurio-registry/issues/2151, where the Apicurio logs show errant requests to /apis/ccompat/v6/schemas/ids/0.

Are you able to reproduce this issue against Confluent Schema Registry?

ofelix03 commented 2 years ago

@colinhicks I appreciate your response. I figured it out. Adding "value.converter.apicurio.registry.as-confluent": true to the Debezium connector configuration address this issue. The schema lookup is now performed with the global-id specified assigned to the registered schema in Apicurio

Whiles this is a step forward, the lookup request to the endpoint /apis/ccompat/v6/schemas/ids/<id> still returns a 404. The reason for this appears to be that the Apicurio registry expects the value of the id to be the registered Content-Id instead of the Global-Id