hpgrahsl / kafka-connect-mongodb

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Apache License 2.0
153 stars 61 forks source link

key _id can not be null exceptiopn #95

Closed mottish closed 5 years ago

mottish commented 5 years ago

Hi @hpgrahsl,

I have a new issue where the MongoDB Sink connector fails with exception "key _id can not be null".

I have a MongoDB with collections contains millions of documents and it seems that some may have NULL objectId.

Is it possible to skip them and not to fail the entire Sink connector?

My config:

{
  "name": "CONNECTOR_NAME",
  "config": {
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": true,
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": true,
    "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
    "topics.regex": "DB_NAME.*",
    "mongodb.connection.uri": "mongodb://DB:27017/DB_NAME?w=1&journal=true",
    "mongodb.ssl.enabled": true,
    "mongodb.change.data.capture.handler": "at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.MongoDbHandler",
    "mongodb.collections": "${topic}",
    "mongodb.change.data.capture.handler.portal_users": "at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.MongoDbHandler",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"
  }
}

The full exception the connector displays:

{  
   "name":"mongodb-sink-test",
   "connector":{  
      "state":"RUNNING",
      "worker_id":"kafka-1:8083"
   },
   "tasks":[  
      {  
         "id":0,
         "state":"FAILED",
         "worker_id":"kafka-1:8083",
         "trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: java.lang.IllegalArgumentException: The value for key _id can not be null\n\tat at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.MongoDbInsert.perform(MongoDbInsert.java:53)\n\tat at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.MongoDbHandler.handle(MongoDbHandler.java:75)\n\tat java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)\n\tat java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)\n\tat java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)\n\tat java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)\n\tat java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)\n\tat java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)\n\tat java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)\n\tat java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)\n\tat at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.buildWriteModelCDC(MongoDbSinkTask.java:253)\n\tat at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.processSinkRecords(MongoDbSinkTask.java:147)\n\tat at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$null$0(MongoDbSinkTask.java:118)\n\tat java.util.ArrayList.forEach(ArrayList.java:1257)\n\tat at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.lambda$put$1(MongoDbSinkTask.java:117)\n\tat java.util.HashMap.forEach(HashMap.java:1289)\n\tat at.grahsl.kafka.connect.mongodb.MongoDbSinkTask.put(MongoDbSinkTask.java:112)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)\n\t... 10 more\nCaused by: java.lang.IllegalArgumentException: The value for key _id can not be null\n\tat org.bson.BsonDocument.put(BsonDocument.java:721)\n\tat org.bson.BsonDocument.<init>(BsonDocument.java:83)\n\tat at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.MongoDbInsert.perform(MongoDbInsert.java:48)\n\t... 27 more\n"
      }
   ],
   "type":"sink"
}
hpgrahsl commented 5 years ago

@mottish not sure if I completely get what you tried to describe above.

1) are you using mongodb as a source and sink? 2) do you mean that there are documents in the collection you are reading from which might have null as the _id field? 3) by design/definition of mongodb itself, the _id field has to be unique so even if there is a document which is really using null for the _id then you may only ever have one such document per collection otherwise it would immediately violate the uniqueness property and lead to a duplicate key error on writes

hpgrahsl commented 5 years ago

@mottish since I haven't heard back so far I assume you found a way to mitigate your problem described above. will close this for now. feel free to re-open if necessary.