Apicurio / apicurio-registry

An API/Schema registry - stores APIs and Schemas.
https://www.apicur.io/registry/
Apache License 2.0
609 stars 269 forks source link

Trouble Using Confluent AvroConverter with apicurio registry #4831

Open reidmeyer opened 4 months ago

reidmeyer commented 4 months ago

Description

Registry Version: 2.5.9.Final Persistence type: in-memory

I'm using Kafka Connect, and I'm trying to use the confluent compatible api from apicurio. I expect it should work with the confluent avro converter.

But I'm getting an error in the confluent library like: The given schema does not match any schema under the subject tst-kpn-des--reid-magic-byte-avro-key; error code: 40403 It's possible their is a bug in the confluent library, but just posting here for some direction.

I have a value and key schema on apicurio with content/global id of 1 and 2.

I was thinking maybe something canonical related is going on..? my next step was to look into the default setting on that within apicurio.

My config:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: confluent-postgres-sink
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: io.confluent.connect.jdbc.JdbcSinkConnector
  # autoRestart:
  #   enabled: true
  tasksMax: 1
  config:
    topics: tst-kpn-des--reid-magic-byte-avro
    value.converter.schemas.enable: true
    key.converter.schemas.enable: true
    key.converter: io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url: myurl.org/apis/ccompat/v7
    value.converter: io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url: myurl.org/apis/ccompat/v7
    value.converter.schema.registry.ssl.truststore.location: /opt/kafka/external-configuration/my-truststore/truststore.jks
    value.converter.schema.registry.ssl.truststore.password: streamin
    key.converter.schema.registry.ssl.truststore.location: /opt/kafka/external-configuration/my-truststore/truststore.jks
    key.converter.schema.registry.ssl.truststore.password: streamin
    connection.url: myconnection
    connection.user: reid
    connection.password: mypw
    auto.create: true

I produce messages onto my topic like:

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import requests
import os

response = requests.get('myserver.org/apis/ccompat/v7/schemas/ids/1/', verify=False)
schema_str = response.json()["schema"]
value_schema = avro.loads(schema_str)

response = requests.get('myserver.org/apis/ccompat/v7/schemas/ids/2/', verify=False)
schema_str = response.json()["schema"]
key_schema = avro.loads(schema_str)

# Get the absolute paths for the SSL certificates
current_dir = os.path.dirname(os.path.realpath(__file__))
ca_cert_path = os.path.join(current_dir, 'certs', 'ca-cert.cert')
cert_path = os.path.join(current_dir, 'certs', 'cert.cert')
key_path = os.path.join(current_dir, 'certs', 'key.key')

# Define Avro producer
avro_producer = AvroProducer(
    {
        'bootstrap.servers': 'myservers',
        'security.protocol': 'ssl',
        'ssl.ca.location': ca_cert_path,
        'ssl.certificate.location': cert_path,
        'ssl.key.location': key_path,
        'schema.registry.url': 'myserver.org/apis/ccompat/v7'
    },
    default_value_schema=value_schema,
    default_key_schema=key_schema
)

# Produce Avro message
avro_producer.produce(topic='tst-kpn-des--reid-magic-byte-avro', value={'message': 'hello'}, key={'key': 'some-key'})
avro_producer.flush()

Environment

Kubernetes: v1.26.15 Kafka Connect from Strimzi: 3.7.0 confluent avro converter: 7.6.1 confluent jdbc sink: 10.7.6

Steps to Reproduce

  1. Create schemas through apicurio ui.
  2. produce 4 byte magic byte style messages onto the topic
  3. deploy kafka connect with connector configured as above

Expected vs Actual Behaviour

I expect it to successfully grab the schema. Totally possible I'm doing something very wrong. Hoping for some guidance.

Logs

Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro key schema version for id 2
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:805)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:222)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:269)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:199)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:126)
    ... 17 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: The given schema does not match any schema under the subject tst-kpn-des--reid-magic-byte-avro-key; error code: 40403
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:336)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:409)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:500)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:485)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:353)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:609)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:589)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:204)
    ... 20 more

2024-06-27 12:06:46 DEBUG <_> [io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage] (executor-thread-7) Selecting a single artifact (latest version) by artifactId: null tst-kpn-des--reid-magic-byte-avro-key (behavior = DEFAULT)
2024-06-27 12:06:46 DEBUG <_> [io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage] (executor-thread-7) Selecting artifact (latest version) meta-data: null tst-kpn-des--reid-magic-byte-avro-key (behavior = DEFAULT)
2024-06-27 12:06:46 INFO <_> [io.apicurio.common.apps.logging.audit.AuditLogService] (executor-thread-7) apicurio.audit action="request" result="failure" src_ip="xx.xx" x_forwarded_for="xx.xx" path="/apis/ccompat/v7/subjects/tst-kpn-des--reid-magic-byte-avro-key" response_code="404" method="POST" user="" ```
carlesarnal commented 4 months ago

Hello @reidmeyer, before digging into the issue, we fixed a few issues that are probably related in 2.5.11.Final. Do you mind giving it a try? (Sorry for the delay, I've been on PTO)

reidmeyer commented 4 months ago

@carlesarnal, will do! I will get back to you next week after I try again.

reidmeyer commented 4 months ago

@carlesarnal, I'm still getting the same error :(

I also tested with 2.6.1.Final

2024-07-23 11:48:02 INFO <_> [io.apicurio.common.apps.logging.audit.AuditLogService] (executor-thread-2) apicurio.audit action="register" result="success" src_ip="100.80.227.245" x_forwarded_for="172.26.140.31" artifact_id="tst-kpn-des--reid-magic-byte-avro-value" 
2024-07-23T11:49:00.959586192Z 2024-07-23 11:49:00 INFO <_> [io.apicurio.common.apps.logging.audit.AuditLogService] (executor-thread-2) apicurio.audit action="register" result="success" src_ip="100.80.227.245" x_forwarded_for="172.26.140.31" artifact_id="tst-kpn-des--reid-magic-byte-avro-value" 
2024-07-23T11:49:08.165792053Z 2024-07-23 11:49:08 INFO <_> [io.apicurio.common.apps.logging.audit.AuditLogService] (executor-thread-2) apicurio.audit action="request" result="failure" src_ip="100.82.94.88" x_forwarded_for="100.81.129.119" path="/apis/ccompat/v7/subjects/tst-kpn-des--reid-magic-byte-avro-value" response_code="404" method="POST" user="" 
│ org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler                                                                                                                                                                                                               │
│     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)                                                                                                                                                                   │
│     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)                                                                                                                                                                              │
│     at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:528)                                                                                                                                                                                           │
│     at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:505)                                                                                                                                                                                                     │
│     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:341)                                                                                                                                                                                                                │
│     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:242)                                                                                                                                                                                                           │
│     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:211)                                                                                                                                                                                                             │
│     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)                                                                                                                                                                                                                       │
│     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)                                                                                                                                                                                                                         │
│     at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)                                                                                                                                                                                                │
│     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)                                                                                                                                                                                                            │
│     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)                                                                                                                                                                                                                           │
│     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)                                                                                                                                                                                                    │
│     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)                                                                                                                                                                                                    │
│     at java.base/java.lang.Thread.run(Thread.java:840)                                                                                                                                                                                                                                              │
│ Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic tst-kpn-des--reid-magic-byte-avro to Avro:                                                                                                                                                           │
│     at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:148)                                                                                                                                                                                                                │
│     at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:528)                                                                                                                                                                                  │
│     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)                                                                                                                                                                         │
│     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)                                                                                                                                                                   │
│     ... 14 more                                                                                                                                                                                                                                                                                     │
│ Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema version for id 1                                                                                                                                                                               │
│     at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:805)                                                                                                                                                                                  │
│     at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:222)                                                                                                                                                                           │
│     at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:269)                                                                                                                                                         │
│     at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:199)                                                                                                                                                                                                     │
│     at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:126)                                                                                                                                                                                                                │
│     ... 17 more                                                                                                                                                                                                                                                                                     │
│ Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: The given schema does not match any schema under the subject tst-kpn-des--reid-magic-byte-avro-value; error code: 40403                                                                                    │
│     at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:336)                                                                                                                                                                                              │
│     at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:409)                                                                                                                                                                                                  │
│     at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:500)                                                                                                                                                                                         │
│     at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:485)                                                                                                                                                                                         │
│     at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:353)                                                                                                                                                              │
│     at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:609)                                                                                                                                                                          │
│     at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:589)                                                                                                                                                                          │
│     at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:204)                                                                                                                                                                           │
│     ... 20 more

my kafka connect spec:

  class: io.confluent.connect.jdbc.JdbcSinkConnector
  tasksMax: 1
  config:
    topics: tst-kpn-des--reid-magic-byte-avro
    value.converter.schemas.enable: true
    value.converter: io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url: myapicurio/apis/ccompat/v7
    connection.url: myurl
    connection.user: ${env:POSTGRES_EXAMPLE_USERNAME}
    connection.password: ${env:POSTGRES_EXAMPLE_PASSWORD}
    auto.create: true

Screenshot 2024-07-23 at 2 14 16 PM

maxemann96 commented 1 week ago

I'm experiencing the same issue and noted possible something useful in the log from Apicurio while using Kafka UI. It makes requests to /apis/ccompat/v7/subjects/TOPIC-value/versions/latest, while the AvroConverter makes requests to /apis/ccompat/v7/subjects/TOPIC-value. (Also Kafka UI works).

maxemann96 commented 1 week ago

Further research revealed, that the confluent AvroConnector tries to call the POST /subjects/(string: subject) endpoint (docs) while fetching the schema from the registry (see lookUpSubjectVersion function in the stacktrace. This seems to return a 404 from the apicurio registry and thus resulting in this behaviour. So I would suggest, that the error is in the compatibility layer from apicurio and not in the confluent AvroConnector.

I didn't dived deeper into this problem, since I now just use io.apicurio.registry.utils.converter.AvroConverter as my Avro converter.

For the convenience of others finding this bug and using Strimzi to deploy Kafka with a connector, that uses the AvroConverter with a Apicurio Registry, find my working config below. This documentation page is helpful to understand which options exist.

Working config with Apicurio AvroConnector

```yaml apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: kafka namespace: kafka spec: bootstrapServers: kafka-kafka-bootstrap.kafka.svc.cluster.local:9093 build: output: image: IMAGE_TAG_REDACTED type: docker plugins: - artifacts: - sha512sum: >- 8f7111c460020946b5a7fda4c3ea1d038d42b0de613360fcf754a00289e3453483f70665648321c9715ad28e01190218d625cc5b60fc619a5823eb66a7e11f82 type: zip url: >- https://github.com/lensesio/stream-reactor/releases/download/8.1.15/kafka-connect-aws-s3-8.1.15.zip - sha512sum: >- 7853cfcd7804e220bf3c869928be038f0179e5d05f90c081139a7ba981dee5e6f6bd5770f2b210f65c605d5f0467a49f6bf779af48be56f482055794fb8ff240 type: tgz url: >- https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/3.0.3/apicurio-registry-distro-connect-converter-3.0.3.tar.gz name: streamreactor-sink-s3 ... ``` ```yaml apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: kafka-connector-s3-sink namespace: kafka spec: class: io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector config: connect.s3.compression.codec: ZSTD connect.s3.kcql: >- INSERT INTO AWS_BUCKET select * from KAFKA_TOPIC STOREAS `PARQUET` PROPERTIES ('flush.interval'=600) name: NAME tasks.max: 1 topics: KAFKA_TOPIC value.converter: io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url: >- http://kafka-registry-service.kafka.svc.cluster.local:8080/apis/registry/v2 tasksMax: 1 ```