hpgrahsl / kafka-connect-mongodb

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Apache License 2.0
153 stars 60 forks source link

Avro schema "TopicNameStrategy" error #121

Closed San13 closed 4 years ago

San13 commented 4 years ago

I am trying to save KStream Kafka topic to mongodb sink connector. Below is my configuration

connector.class=at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector
mongodb.max.num.retries=3
topics=brainy-bin
tasks.max=1
mongodb.connection.uri=mongodb+srv://san:san@cluster0-sfaz2.mongodb.net/test?retryWrites=true&w=majority
mongodb.collection=BrainyBin
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
mongodb.retries.defer.timeout=5000

I have this avro value -- (data is from avro console consumer)

{"endPoint":"test1","createdDate":1584617333519,"companyName":"test","productName":"Brainy Bin","path":"/3303/0/5700","timestamp":1584617233036,"AvroResponsePayload":{"code":"CONTENT","kind":"observe","body":{"io.teamone.leshan.avro.response.AvroReadResponseBody":{"content":{"io.teamone.leshan.avro.resource.AvroResource":{"id":5700,"path":"/3303/0/5700","kind":"SINGLE_RESOURCE","type":"FLOAT","value":{"double":-315.2}}}}}}}

Below is my error log

[2020-03-23 10:51:20,261] ERROR Failed to start task MongoDbSinkConnector-0 (org.apache.kafka.connect.runtime.Worker:456) java.lang.RuntimeException: io.confluent.kafka.serializers.subject.TopicNameStrategy is not an instance of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy at io.confluent.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:207) at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.subjectNameStrategyInstance(AbstractKafkaAvroSerDeConfig.java:199) at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.keySubjectNameStrategy(AbstractKafkaAvroSerDeConfig.java:177) at io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.configureClientProperties(AbstractKafkaAvroSerDe.java:67) at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.configure(AbstractKafkaAvroSerializer.java:43) at io.confluent.connect.avro.AvroConverter$Serializer.(AvroConverter.java:127) at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:72) at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:263) at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:418) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:835) [2020-03-23 10:51:20,263] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:860)

hpgrahsl commented 4 years ago

hi @San13

thanks for your issue report which in fact doesn't seem to be related to the sink connector itself. you can see from the stacktrace you posted that you don't even reach any code from the sink connector packages.

The issue is related to the avro de/serialization and the schema registry itself which happens before the connector even gets to see the sink records. Maybe what you face is similiar to an issue described here https://github.com/confluentinc/schema-registry/issues/825

Good luck for issue resolution! Feel free to comment further with advise and help how you eventually fixed it. It might help others that may run into the same issue.