gravity9-tech / mongo-cse

Mongo Change Stream Enhancer divides Change Stream events into partitions and enables you to handle them in separate Threads, increasing throughput. It achieves that by creating a Change Stream per each partition (number is configurable) and handling each Change Stream in a dedicated Thread.
3 stars 1 forks source link

Add the posibility of using a custom query when opening a change stream #52

Closed BartekGravity closed 3 months ago

BartekGravity commented 5 months ago

Currently, the library only allows opening a generic watch on the whole collection and dividing it into threads:

        ChangeStreamIterable<Document> watch = collection.watch(List.of(
                Aggregates.match(
                        or(List.of(
                                partitionMatchExpression(fullDocumentKey(mongoConfig.getKeyName()), mongoConfig.getNumberOfPartitions(), partition),
                                partitionMatchExpression(documentKey(mongoConfig.getKeyName()), mongoConfig.getNumberOfPartitions(), partition)
                        ))
                )
            ))
            .fullDocument(mongoConfig.getFullDocument())
            .fullDocumentBeforeChange(mongoConfig.getFullDocumentBeforeChange())
            .maxAwaitTime(mongoConfig.getMaxAwaitTimeInMs(), MILLISECONDS);

We should allow users to define a custom query if they want to limit the number of events that they get (e.g. only interested in a specific set of documents being updated).