Closed rodolfocugler closed 3 years ago
Thanks for reaching out. Your problem is most likely related to the fact that your key contains data which isn't valid JSON. The connector expects the key to be parseable into a BSONDocument based on a valid JSON string. If you cannot change that upstream (i.e. the consumer writing to the topic) or need to process existing data it can be achieved with a properly configured SMT (Single Message Transform).
see #36 or #55 for details. let me know if that helps - it should ;-)
Also please note that my community sink connector is slowly phased out which means you should switch to the official connector from MongoDB which can be found here: https://www.mongodb.com/kafka-connector
closing this due to inactivity / no answer. feel free to re-open if applicable :)
Problem
I am trying to export my topic values to a MongoDB. However, I'm getting an exception. I guess I made a wrong configuration, but I spent much time on that and still not find it.
Can you please help me to find what I missed here?
Thanks
Scenario
There is one Kafka topic which has raw Json values:
The current mongodb-sink config is this one:
Stacktrace
[2020-10-19 20:05:22,666] ERROR WorkerSinkTask{id=asdddd-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.bson.json.JsonParseException: Invalid JSON number at org.bson.json.JsonScanner.scanNumber(JsonScanner.java:444) at org.bson.json.JsonScanner.nextToken(JsonScanner.java:97) at org.bson.json.JsonReader.popToken(JsonReader.java:511) at org.bson.json.JsonReader.readBsonType(JsonReader.java:155) at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:680) at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722) at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450) at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81) at org.bson.BsonDocument.parse(BsonDocument.java:62) at at.grahsl.kafka.connect.mongodb.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:32) at at.grahsl.kafka.connect.mongodb.converter.SinkConverter.convert(SinkConverter.java:44) at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$buildWriteModel$3(MongoDbSinkTask.java:213) at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModel(MongoDbSinkTask.java:212) at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:143) at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$null$0(MongoDbSinkTask.java:118) at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:117) at java.base/java.util.HashMap.forEach(HashMap.java:1336) at at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:112) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560) ... 10 more