Closed barnyrelph closed 1 year ago
Hi @barnyrelph
I encountered this very issue just this past week. After scouring online with nothing positive, I accidentally traced the issue to a missing option in the Debezium source connector configuration.
Add this option to your configuration to rectify this.
"value.converter.apicurio.registry.as-confluent": true
Whiles introducing the above options ensures a lookup with the Global ID assigned as the schema, the lookup still returns a 404. The reason for this appears to be that Apicurio expects the value of the id
to be the registered Content-Id instead of the Global-Id
Yet to figure this out.
So there are two compatibility issues you might run into when using both confluent and apicurio tools. The first is a client setting in apicurio tooling that instructs our tools to use integers instead of lungs for all IDs that we encode in the messages. Confluent uses 4 byte integers while we use 8 byte longs. That is the option you discovered and is one piece of the puzzle.
The other piece is that early versions of apicurio used global identifiers rather than content identifiers as the unique lookup in those tools. Global IDs are unique for every version of every artifact Even if the same content is uploaded multiple times. Content IDs on the other hand are shared. So that if you upload the same content multiple times you will get back the same ID.
Confluent server has always used content IDs (although I think they call them global IDs) in their API. As of version 2.0 of apicurio registry, our API uses content IDs as well. So it should be easier to interoperate with our tools and their tools.
However if you are still using any legacy apicurio tools, or the V1 edition of our rest API, there may still be an incompatibility.
Does that context help at all? It's possible we still have a mismatch somewhere between the confluent compatibility API and our core API.
This is an issue we have as well. Setting up a data infrastructure at a Parisian hospital. I think we'll have to switch to the Confluent schema registry because of this :/ We tried all the solutions mentioned in this thread. Context: we use Debezium and ksqldb
This is an issue we have as well. Setting up a data infrastructure at a Parisian hospital. I think we'll have to switch to the Confluent schema registry because of this :/ We tried all the solutions mentioned in this thread. Context: we use Debezium and ksqldb
Sorry for being so late to the party. In addition to what Eric described, you can still force the Confluent Compatibility API to use the global ID across the entire API by setting the environment variable ENABLE_CCOMPAT_LEGACY_ID_MODE to true in your Apicurio Registry instance and this is the important point, this is a configuration for the server, not the converter. Alternatively (and probably easier) you can instruct the converter to use the contentId instead of the global ID by setting apicurio.registry.use-id
to contentId
since it uses the globalId by default thus why it's trying to use it to get your schemas.
Hi @barnyrelph
I encountered this very issue just this past week. After scouring online with nothing positive, I accidentally traced the issue to a missing option in the Debezium source connector configuration.
Add this option to your configuration to rectify this.
"value.converter.apicurio.registry.as-confluent": true
Whiles introducing the above options ensures a lookup with the Global ID assigned as the schema, the lookup still returns a 404. The reason for this appears to be that Apicurio expects the value of the
id
to be the registered Content-Id instead of the Global-IdYet to figure this out.
--UPDATE--.
@tgy
Resolved this issue some months back. I have data pipelines running smoothly now with Kafka, Debezium Connectors, Apicurio Registry and ksqlDB. The issue was resolved by adding this to my Debezium connector configurations.
"value.converter.apicurio.registry.as-confluent": true,
"value.converter.apicurio.registry.use-id": "contentId",
Repeat the same if you are serializing/deserializing your message key data with a schema registry(using Apicurio Registry in my case)
"key.converter.apicurio.registry.as-confluent": true,
"key.converter.apicurio.registry.use-id": "contentId",
Hi,
Yes, that is the expected configuration, you can't use as-confluent without also setting use-id.
@tgy can you please confirm if setting this configuration fixes the problem for you?
Thanks!
Hi guys, thanks a lot for your answers. We're looking into the solutions you shared and will let you know if it worked!
Hi @carlesarnal and @ofelix03, thank you very much for your help. I'm from the same team as @tgy . We tried the solution you provided but unfortunately it does not work for us. ksqldb is able to read the schema in order to create a "stream":
ksql> create stream dwcnumeric_ with
(
kafka_topic='dwcavro.Philips.PatientData._Export.Numeric_', format='AVRO'
);
Message
----------------
Stream created
----------------
It does correctly identify the fields in the schema:
ksql> describe dwcnumeric_ extended;
Name : DWCNUMERIC_
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : AVRO
Value format : AVRO
Kafka topic : dwcavro.Philips.PatientData._Export.Numeric_ (partitions: 1, replication: 1)
Statement : CREATE STREAM DWCNUMERIC_ (ROWKEY STRUCT<ID BIGINT> KEY, ID BIGINT, TIMESTAMP STRING, BASEPHYSIOID BIGINT, PHYSIOID BIGINT, LABEL STRING, UNITLABEL STRING, SUBPHYSIOID BIGINT, SUBLABEL STRING, __OP STRING) WITH (FORMAT='AVRO', KAFKA_TOPIC='dwcavro.Philips.PatientData._Export.Numeric_');
Field | Type
----------------------------------------
ROWKEY | STRUCT<ID BIGINT> (key)
ID | BIGINT
TIMESTAMP | VARCHAR(STRING)
BASEPHYSIOID | BIGINT
PHYSIOID | BIGINT
LABEL | VARCHAR(STRING)
UNITLABEL | VARCHAR(STRING)
SUBPHYSIOID | BIGINT
SUBLABEL | VARCHAR(STRING)
__OP | VARCHAR(STRING)
----------------------------------------
However, when I want to query the data, I obtain no result:
ksql> select * from dwcnumeric_ limit 10;
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|ROWKEY |ID |TIMESTAMP |BASEPHYSIOID |PHYSIOID |LABEL |UNITLABEL |SUBPHYSIOID |SUBLABEL |__OP |
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
Query Completed
Query terminated
When I run this query, I see the following error repeating in the apicurio log:
023-01-18 10:46:16 INFO <_> [io.apicurio.common.apps.logging.audit.AuditLogService] (executor-thread-12) apicurio.audit action="request" result="failure" src_ip="172.28.0.8" path="/apis/ccompat/v6/schemas/ids/0" response_code="404" method="GET" user=""
This error message repeats many times (maybe as many times as there are messages in the topic).
At the same time, the ksqldb-server logs show the following error:
[2023-01-18 11:17:44,442] ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Error deserializing message from topic: dwcavro.Philips.PatientData._Export.Numeric_","recordB64":null,"cause":["Failed to deserialize data for topic dwcavro.Philips.PatientData._Export.Numeric_ to Avro: ","Error retrieving Avro key schema for id 0","No content with id/hash 'contentId-0' was found.; error code: 40403"],"topic":"dwcavro.Philips.PatientData._Export.Numeric_"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.transient_DWCNUMERIC__9209023474268862486.KsqlTopic.Source.deserializer:44)
It looks like ksqldb queries the Id 0, which is strange since my Ids start at 1. I have schema Ids 1 to 17. The relevant schema Ids for the topic in question are 8 for _dwcavro.Philips.PatientData.Export.Numeric-key_ and 9 for _dwcavro.Philips.PatientData.Export.Numeric-value_.
If I request the correct Ids from apicurio, it actually returns a result. I don't understand why ksqldb requests Id 0 over and over again.
➜ ~ curl http://localhost:8082/apis/ccompat/v6/schemas/ids/0
{"message":"No content with id/hash 'contentId-0' was found.","error_code":40403}% ➜ ~
➜ ~
➜ ~ curl http://localhost:8082/apis/ccompat/v6/schemas/ids/8
{"schema":"{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"dwcavro.Philips.PatientData._Export.Numeric_\",\"fields\":[{\"name\":\"Id\",\"type\":\"long\"}],\"connect.name\":\"dwcavro.Philips.PatientData._Export.Numeric_.Key\"}","references":[]}%
➜ ~ curl http://localhost:8082/apis/ccompat/v6/schemas/ids/9
{"schema":"{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"dwcavro.Philips.PatientData._Export.Numeric_\",\"fields\":[{\"name\":\"Id\",\"type\":\"long\"},{\"name\":\"TimeStamp\",\"type\":{\"type\":\"string\",\"connect.version\":1,\"connect.name\":\"io.debezium.time.ZonedTimestamp\"}},{\"name\":\"BasePhysioId\",\"type\":\"long\"},{\"name\":\"PhysioId\",\"type\":\"long\"},{\"name\":\"Label\",\"type\":\"string\"},{\"name\":\"UnitLabel\",\"type\":\"string\"},{\"name\":\"SubPhysioId\",\"type\":\"long\"},{\"name\":\"SubLabel\",\"type\":\"string\"},{\"name\":\"__op\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"dwcavro.Philips.PatientData._Export.Numeric_.Value\"}","references":[]}%
Best regards, Jona Joachim
@jaj42
To better understand what's going on, please share some details
Which setup of the Apicurio Registry are you running(in-memory, SQL database, Kafka storage)
Do you have the apicurio.registry.auto-register
in your configuration? If yes, what's the current value?
@ofelix03 Thanks for taking the time to look at our setup
Our Apicurio Registry is stored in a PostgreSQL database (in docker-compose.yml
we use REGISTRY_DATASOURCE_URL=jdbc:postgresql://host.docker.internal/apicurio
with the apicurio/apicurio-registry-sql:latest-snapshot
image).
We're not setting any AUTO_REGISTER_ARTIFACT
environment variable so I guess we're not touching the apicurio.registry.auto-register
config, which defaults to false
.
@ofelix03 @tgy We run on docker using the debezium/connect:nightly and apicurio/apicurio-registry-sql:latest-snapshot images. We do as a matter of fact use the apicurio.registry.auto-register configuration knob. The debezium kafka-connect container is configured using these environment variables according to debezium documentation (https://debezium.io/documentation/reference/stable/configuration/avro.html)
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://apicurio-registry:8080/apis/registry/v2
- CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
- CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://apicurio-registry:8080/apis/registry/v2
- CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true
- CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true
- CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
- CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://apicurio-registry:8080/apis/registry/v2
- CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true
- CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true
- CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE=avro
@ofelix03 @tgy just as a follow-up, I ran the same setup with confluent schema registry and it works as expected with no error message. In the confluent registry, schema Ids start with 1 as well and requesting schema 0 results in much the same error as with apicurio:
➜ ~ curl http://localhost:8181/schemas/ids/0
{"error_code":40403,"message":"Schema 0 not found"}%
➜ ~
➜ ~ curl http://localhost:8181/schemas/ids/5
{"schema":"{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"LRB_DWC01._Export.Numeric_\",\"fields\":[{\"name\":\"Id\",\"type\":\"long\"}],\"connect.name\":\"LRB_DWC01._Export.Numeric_.Key\"}"}%
➜ ~ curl http://localhost:8181/schemas/ids/6
{"schema":"{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"LRB_DWC01._Export.Numeric_\",\"fields\":[{\"name\":\"Id\",\"type\":\"long\"},{\"name\":\"TimeStamp\",\"type\":{\"type\":\"string\",\"connect.version\":1,\"connect.name\":\"io.debezium.time.ZonedTimestamp\"}},{\"name\":\"BasePhysioId\",\"type\":\"long\"},{\"name\":\"PhysioId\",\"type\":\"long\"},{\"name\":\"Label\",\"type\":\"string\"},{\"name\":\"UnitLabel\",\"type\":\"string\"},{\"name\":\"SubPhysioId\",\"type\":\"long\"},{\"name\":\"SubLabel\",\"type\":\"string\"},{\"name\":\"__op\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"LRB_DWC01._Export.Numeric_.Value\"}"}%
However, ksqldb is able to parse the topic with no error. It doesn't even try to query ids/0. Something must be wrong in the apicurio emulation of the confluent api, prompting ksqldb to request schema id 0 instead of the correct schema id. The same holds true for kafka-ui which could not deserialize avro with apicurio-registry and showed binary data. With confluent-registry it now shows correctly deserialized data.
Best regards, Jona
Hi @jaj42 @tgy,
If you can share your full setup since you mentioned you're running docker-compose (obviously omitting any confidential information) and not only some snippets that would be very helpful. I think you might be missing some essential configuration on the Apicurio Registry side of things, so I would like to take a look at your configuration. Keep in mind that although we provide a compatibility API both the tooling and the server should be configured properly to make things work.
Thanks in advance.
Hi Carl. Thanks for taking the time. Here's our docker-compose.yaml
version: "2"
services:
zookeeper-lrb:
restart: "unless-stopped"
image: debezium/zookeeper:1.9
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka-lrb:
restart: "unless-stopped"
image: debezium/kafka:1.9
ports:
- 9092:9092
- 9011:9011
links:
- zookeeper-lrb
environment:
- ZOOKEEPER_CONNECT=zookeeper-lrb:2181
- KAFKA_LOG_RETENTION_MS=3600000
- KAFKA_LOG_ROLL_MS=3600000
- KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS=5000
- KAFKA_LISTENERS=INTERN://kafka-lrb:29092,EXTERN://kafka-lrb:9092
- KAFKA_ADVERTISED_LISTENERS=INTERN://kafka-lrb:29092,EXTERN://10.130.192.132:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERN:PLAINTEXT,EXTERN:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERN
- JMXPORT=9011
- JMXHOST=localhost
apicurio-registry:
profiles:
- donotstart
restart: "unless-stopped"
image: apicurio/apicurio-registry-sql:latest-snapshot
ports:
- 8082:8080
environment:
- REGISTRY_DATASOURCE_URL=jdbc:postgresql://host.docker.internal/apicurio
- REGISTRY_DATASOURCE_USERNAME=X
- REGISTRY_DATASOURCE_PASSWORD=X
extra_hosts:
- host.docker.internal:host-gateway
confluent-registry:
restart: "unless-stopped"
image: confluentinc/cp-schema-registry:latest
ports:
- 8181:8081
environment:
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://kafka-lrb:9092
- SCHEMA_REGISTRY_HOST_NAME=confluent-registry
- SCHEMA_REGISTRY_LISTENERS=http://confluent-registry:8081
connect-lrb:
restart: "unless-stopped"
build:
context: connect-jmx
args:
DEBEZIUM_VERSION: 1.9
JMX_AGENT_VERSION: 0.15.0
ports:
- 8083:8083
- 9012:9012
links:
- kafka-lrb
environment:
- BOOTSTRAP_SERVERS=kafka-lrb:29092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=physiodata_configs
- OFFSET_STORAGE_TOPIC=physiodata_offsets
- STATUS_STORAGE_TOPIC=physiodata_statuses
- KAFKA_CONNECT_PLUGINS_DIR=/kafka/connect/
- KAFKA_HEAP_OPTS=-Xms1G -Xmx20G
- KAFKA_OPTS=-javaagent:/kafka/etc/jmx_prometheus_javaagent.jar=8080:/kafka/etc/config.yml
- JMXPORT=9012
- JMXHOST=10.130.192.132
depends_on:
- zookeeper-lrb
- kafka-lrb
kafka-bridge:
restart: "unless-stopped"
image: quay.io/strimzi/kafka-bridge
ports:
- 9080:8080
volumes:
- ./kafka-bridge/config/:/opt/strimzi/config/
entrypoint: /opt/strimzi/bin/kafka_bridge_run.sh
command: --config-file=config/application.properties
depends_on:
- kafka-lrb
prometheus:
restart: "unless-stopped"
build:
context: prometheus
args:
PROMETHEUS_VERSION: v2.26.0
ports:
- 9090:9090
links:
- connect-lrb
user: root
volumes:
- /data/infra/prometheus:/prometheus
prometheus-alertmanager:
image: prom/alertmanager:v0.25.0
restart: unless-stopped
ports:
- 9093:9093
volumes:
- /data/infra/alertmanager/conf:/config
- /data/infra/alertmanager/data:/data
command: --config.file=/config/alertmanager.yml --log.level=debug
kafka-ui:
restart: "unless-stopped"
image: provectuslabs/kafka-ui:latest
ports:
- 8100:8080
environment:
- KAFKA_CLUSTERS_0_NAME=lariboisiere
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-lrb:9092
- KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper-lrb:2181
#- KAFKA_CLUSTERS_0_SCHEMAREGISTRY=http://apicurio-registry:8080/apis/ccompat/v6
- KAFKA_CLUSTERS_0_SCHEMAREGISTRY=http://confluent-registry:8081
- KAFKA_CLUSTERS_0_KSQLDBSERVER=http://ksqldb-server:8088
- KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME=debezium
- KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS=http://debezium:8083
- KAFKA_CLUSTERS_0_KAFKACONNECT_1_NAME=prometheus
- KAFKA_CLUSTERS_0_KAFKACONNECT_1_ADDRESS=http://connect-lrb:8083
ksqldb-server:
restart: "unless-stopped"
image: confluentinc/ksqldb-server:latest
container_name: ksqldb-server
depends_on:
- kafka-lrb
ports:
- "8088:8088"
volumes:
- "./connect-plugins:/usr/share/kafka/plugins"
environment:
- KSQL_LISTENERS=http://0.0.0.0:8088
- KSQL_BOOTSTRAP_SERVERS=kafka-lrb:9092
- KSQL_KSQL_SCHEMA_REGISTRY_URL=http://apicurio-registry:8080/apis/ccompat/v6
# Configuration to embed - Kafka Connect support.
- KSQL_CONNECT_GROUP_ID=ksql-connect-cluster
- KSQL_CONNECT_BOOTSTRAP_SERVERS=kafka-lrb:9092
- KSQL_CONNECT_KEY_CONVERTER=org.apache.kafka.connect.storage.StringConverter
- KSQL_CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- KSQL_CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://apicurio-registry:8080/apis/ccompat/v6
- KSQL_CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://apicurio-registry:8080/apis/ccompat/v6
- KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
#- KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=true
- KSQL_CONNECT_CONFIG_STORAGE_TOPIC=ksql-connect-configs
- KSQL_CONNECT_OFFSET_STORAGE_TOPIC=ksql-connect-offsets
- KSQL_CONNECT_STATUS_STORAGE_TOPIC=ksql-connect-statuses
- KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
- KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
- KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
- KSQL_CONNECT_PLUGIN_PATH=/usr/share/kafka/plugins
extra_hosts:
- host.docker.internal:host-gateway
ksqldb-cli:
restart: "unless-stopped"
image: confluentinc/ksqldb-cli:latest
container_name: ksqldb-cli
depends_on:
- ksqldb-server
entrypoint: /bin/sh
tty: true
debezium:
restart: "unless-stopped"
image: debezium/connect:nightly
depends_on:
- kafka-lrb
ports:
- 8084:8083
volumes:
- "./debezium-plugins:/kafka/plugins"
environment:
- ENABLE_APICURIO_CONVERTERS=true
- GROUP_ID=2
- BOOTSTRAP_SERVERS=kafka-lrb:29092
- CONFIG_STORAGE_TOPIC=dbz_connect_configs
- OFFSET_STORAGE_TOPIC=dbz_connect_offsets
- KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
- VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://apicurio-registry:8080/apis/registry/v2
- CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
- CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://apicurio-registry:8080/apis/registry/v2
- CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true
- CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true
- CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter
- CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://apicurio-registry:8080/apis/registry/v2
- CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true
- CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true
- CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE=avro
and this:
{
"name": "dwc-connector-avro",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.hostname": "...",
"database.port": "...",
"database.encrypt": "false",
"database.user": "...",
"database.password": "...",
"database.dbname": "...",
"database.names": "...",
"database.server.name": "...",
"topic.prefix": "dwcavro",
"snapshot.mode": "schema_only",
"schema.history.internal.kafka.topic": "...",
"schema.history.internal.kafka.bootstrap.servers": "kafka-lrb:29092",
"key.converter.apicurio.registry.as-confluent": true,
"key.converter.apicurio.registry.use-id": "contentId",
"value.converter.apicurio.registry.as-confluent": true,
"value.converter.apicurio.registry.use-id": "contentId",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields": "op",
"decimal.handling.mode": "double",
"table.include.list": "_Export.Patient_,_Export.PatientStringAttribute_,_Export.PatientDateAttribute_,_Export.Numeric_,_Export.NumericValue_,_Export.Wave_,_Export.WaveSample_",
"message.key.columns": "_Export.Wave_:Id;_Export.WaveSample_:WaveId;_Export.Numeric_:Id;_Export.NumericValue_:NumericId;_Export.PatientStringAttribute_:PatientId,Timestamp"
}
}
Hi,
After some testing, I think you're missing some key configuration points in your connector. Here you have an example I just used to successfully use all the stack together (kafka-ui, apicurio-registry, debezium, ksqldb).
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.user": "postgres",
"database.dbname": "apicuriodb",
"tasks.max": "1",
"database.history.kafka.bootstrap.servers": "kafka:29092",
"database.history.kafka.topic": "schema-changes.inventory",
"database.server.name": "apicuriodb",
"schema.include.list": "public",
"value.converter.apicurio.registry.auto-register": "true",
"key.converter.apicurio.registry.as-confluent": "true",
"database.port": "5432",
"plugin.name": "pgoutput",
"topic.prefix": "postgre-changes",
"database.hostname": "apicurio-studio-db",
"database.password": "******",
"key.converter.apicurio.registry.use-id": "contentId",
"name": "Test",
"value.converter.apicurio.registry.url": "http://schema-registry:8080/apis/registry/v2",
"key.converter.apicurio.registry.url": "http://schema-registry:8080/apis/registry/v2",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.auto-register": "true",
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.as-confluent": "true",
"value.converter.apicurio.registry.use-id": "contentId"
}
Notice especially the difference in value.converter
and key.converter
configurations (ignore the ones related to PG). I have tried it and with only your config ksql tries to query the schema with id 0, with my configuration it just works. If this does not work either, let me know the result and we can try to see what happens.
Closing as stale, if someone finds this in the future, please, re-open the issue.
Hi All, I'm using Debezium to capture Postgres events, stream them into Kafka and hopefully process them in ksqlDB, then stream them out to a separate Postgres DB for analysis.
I have a Connector set up watching a couple of tables. On inserting to the table, I can see Schemas created in the registry. I have ksqlDB set up using:
I then create a stream in ksqlDB:
Upon insert, the schema registry is populated and I can see ksqlDB go looking for the schema, but in the logs for the apicurio registry, I can see this:
That last request seems to be where things are going awry. Is this a problem with my configuration (most likely) or some aspect of the Confluent API missing?
Connector setup snippet for reference: :
Thanks in advance for any guidance