gravity9-tech / mongo-cse

Mongo Change Stream Enhancer divides Change Stream events into partitions and enables you to handle them in separate Threads, increasing throughput. It achieves that by creating a Change Stream per each partition (number is configurable) and handling each Change Stream in a dedicated Thread.
3 stars 1 forks source link

Missing handling of delete events #16

Closed piotrlg9 closed 10 months ago

piotrlg9 commented 10 months ago

Library is unprepared for handling delete events, few issues found:

1) It skips them while using default watch pipeline because the expression uses fullDocument which is missing for delete event:

    static Bson toDate() {
        return new Document("$toDate", "$fullDocument._id");
    }

2) After removal of filtering expression, library fails - it is unprepared to handle events without fullDocument, see error:

09:46:19.516 [Thread-1] ERROR com.gravity9.mongocdc.MongoChangeStreamWorker - Exception while processing change
java.lang.NullPointerException: Cannot invoke "org.bson.Document.toJson()" because the return value of "com.mongodb.client.model.changestream.ChangeStreamDocument.getFullDocument()" is null
    at com.gravity9.mongocdc.MongoChangeStreamWorker.run(MongoChangeStreamWorker.java:101) [mongo-change-stream-enhancer-1.0.0.jar:?]
    at java.base/java.lang.Thread.run(Thread.java:833) [?:?]

Line which fails: log.info("{} document: {}", document.getOperationType().name(), document.getFullDocument().toJson());

3) There is no failure handling implemented, so NPE is generated indefinitely

piotrlg9 commented 10 months ago

Delete event contains "_id" field of the original document in the following subdocument:

   "documentKey": {
      "_id": ObjectId("599af247bb69cd89961c986d")
   }