globalsign / mgo

The MongoDB driver for Go
Other
1.97k stars 230 forks source link

New MongoDB 4.0 deployment level changestreams watch() #251

Closed peterdeka closed 6 years ago

peterdeka commented 6 years ago

As per https://docs.mongodb.com/manual/reference/method/Mongo.watch/#Mongo.watch 4.0 introduces the possibility to watch for all collections across all databases of a deployment. I'd like to contribute to implement that as we strongly need it ASAP.

The only big edit to my understanding of the specification https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#mongoclient-watch-helper is that we have somehow to tell the Iter() method to set aggregate:1 instead of the collection name (as below).

// Iter executes the pipeline and returns an iterator capable of going
// over all the generated results.
func (p *Pipe) Iter() *Iter {
    // Clone session and set it to Monotonic mode so that the server
    // used for the query may be safely obtained afterwards, if
    // necessary for iteration when a cursor is received.
    cloned := p.session.nonEventual()
    defer cloned.Close()
    c := p.collection.With(cloned)

    var result struct {
        Result []bson.Raw // 2.4, no cursors.
        Cursor cursorData // 2.6+, with cursors.
    }

    cmd := pipeCmd{
        Aggregate: c.Name, // <---- should be 1 for database level and cluster level
        Pipeline:  p.pipeline,
        AllowDisk: p.allowDisk,
        Cursor:    &pipeCmdCursor{p.batchSize},
    }
...

And best practice here to avoid breaking the API?

Thanks, Pietro

domodwyer commented 6 years ago

Hi @peterdeka!

I'm not overly familiar with the newer features of MongoDB 4.0, but from a purely API perspective, maybe implementing something similar to Collection.Watch on the mgo Session would have the best semantics? You are watching changes on the entire cluster, and the Session type already provides an abstraction for interacting it in other ways.

It would be nice if it could make use of the existing ChangeStream type, but I'm not sure of technicals!

What do you think?

Dom

domodwyer commented 6 years ago

Hi @peterdeka

I've just seen you have a PR open already and it's exactly the above! Ignore the noise :)

Thanks! Dom

rwynn commented 6 years ago

Hi @domodwyer and @peterdeka. I noticed this issue was related to MongoDB 4 change streams so I thought I'd ask the question here. Would you be able to support the new startAtOperationTime option, a bson.MongoTimestamp I assume, as an addition to the ChangeStreamOptions type? Please let me know if you would like me to open a separate issue for that.

I think the only question would be if and how to handle the undefined case where one specifies both resumeAfter and startAtOperationTime since it appears they are mutually exclusive options.

peterdeka commented 6 years ago

Hi @rwynn seems feasible. I only have to check and be sure that this doesn't interfere with the current use of the resume() function. We have to check the fringe cases when the resumeToken is used in order to confirm that the retry mechanism of Next() is not disrupted by ChangeStreams that make use of startAtOperationTime. I hope i can get this done before the PR gets merged.

eminano commented 6 years ago

I'll close this issue now that the PR has been merged into development. We'll be cutting a release soon! Thanks @peterdeka!