confluentinc / kafka-connect-storage-common

Shared software among connectors that target distributed filesystems and cloud storage.
Other
5 stars 155 forks source link

kafka s3 sink connect complains Error retrieving Avro value schema version for id 12345 #284

Open jma562 opened 1 year ago

jma562 commented 1 year ago

I'm trying to set up a kafka s3 sink connector that will consume messages in avro format and dump to s3 compatible storage (minio) in parquet format.

This pipe line works for certain topics but fails for others. After some investigation, I've found that: if a topic has an corresponding schema entry of the same topic's name in schema registry. the kafka connector can successfully de-serialize the messages and convert to parket.

If a topic has no schema entry of the same name in schema registry (the schema is there, as offsetexplorer and nifi flow) can still de-serialize the message from such a topic. The connector fails with

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic <topic name> to Avro:
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13      at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema version for id 12345

I tried to dig more in hope to find a solution but got stuck.

finding schema based on schema ID, which might be related to io.confluent.kafka.serializers.subject.TopicNameStrategy In case of schema entries are published to schema registry over a long period of time and it's not been done consistently. some entries are follow topic names others are not. How do I deal with this situation and

I did try to take a look at https://schema.registry.url/schemas/12345 where 12345 is the schema ID shown in exception message. I did see a default schema version declared here:

{\"type\":\"record\",\"name\":\"schema_name\",\"namespace\":\"name_space\",\"fields\":
... ... [{"name":"schemaVersion","type":"string","default":"1.47"},

finding schema version based on schema ID which might be related to use.latest.version=true and auto.register.schemas=false but after I set it it does not help. connector complains that these options are not recognized. Tried value.converter.use.latest.version=true as well, no difference.

How do we force it to use latest version of schema regardless of default version?

Your help is greatly appreciated.

jma562 commented 1 year ago

Is it a hard requirement that only these three patterns are supported by connector?

TopicNameStrategy | RecordNameStrategy | TopicRecordNameStrategy

The schema subject name must follow one of the patterns? Why do offset explorer/nifi can consume and deserialize messages from these topics but not kafka connector?

jma562 commented 1 year ago

Further looking into the schema registry, I've found that the schema ID is unique per schema name per schema version, so why kafka connector still throws that exception. I verified that schema can be found in schema registry. The only thing that could mess this up is the lookup strategy (topicNameStrategy), even the only valid schema exists, since the name does not match topic name, converter skips it. This does not make sense to me if this is the logic implemented in converter.

jma562 commented 1 year ago
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema version for id 11025
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found. io.confluent.rest.exceptions.RestNotFoundException: Subject not found.
io.confluent.rest.exceptions.RestNotFoundException: Subject not found.
        at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.subjectNotFoundException(Errors.java:51)
        at io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource.lookUpSchemaUnderSubject(SubjectsResource.java:93)

Add exception stack trace here. SchemaId is unique, why the subject matters here. I could imagine that it's for performance enhancement but at least it should fall back to look up by global schemaID only and the local cache built based on schemaId can help improve performance (which is already done by CachedSRClient).

jma562 commented 1 year ago

Hi,

I configured value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy in worker.properties and started confluent connector by connector_distributed.sh /plugin/worker.properties

However, I kept seeing [2022-11-21 16:40:23,690] WARN The configuration 'value.subject.name.strategy' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:355)

I also tried different variations as follows:

grep -i "name.strategy" /plugins/worker.properties 
value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
value.converter.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
consumer.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
consumer.value.converter.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy

but nothing seems to be working.

I'd greatly appreciate if anyone can point to me how to change the subject naming strategy (in confluent s3 sink connector)。

Regards