rwynn / gtm

gtm (go tail mongo) is a MongoDB event listener
MIT License
147 stars 33 forks source link

Implement Bulk Fetching #4

Closed rwynn closed 7 years ago

rwynn commented 7 years ago

Implement bulk fetching to approach or match the throughput optimization of PR #3 by @zph, while at the same time maintaining the ordering semantics of the oplog.

gtm.zip

rwynn commented 7 years ago

Updated for the unique situation where objects are used as _id in mongo. Fixes invalid map key in Go. gtm.zip

zph commented 7 years ago

@rwynn Thank you for your patches. I tried out the first one and it successfully improves performance.

When operating on the same set of records that I benchmarked before: Pre-patch: ~13s lag max for collection My patch: ~1.3s lag max This patch: ~1.3s lag max

I set options to 500ms timer and kept 50 as MaxDocs.

I do have a suggestion for the code: reset timer if a flush exists. Otherwise I expect we'd see a flush part way into timer and timer fires sooner than x duration from last flush.

The patch I tried for addressing that is as follow:

diff --git a/Users/zph/Downloads/gtm.go b/vendor/github.com/zph/gtm/gtm.go
old mode 100755
new mode 100644
index 514ba13..cb9fab1
--- a/Users/zph/Downloads/gtm.go
+++ b/vendor/github.com/zph/gtm/gtm.go
@@ -55,6 +55,7 @@ type OpBuf struct {
        Entries     []*Op
        MaxDocs     int
+       MaxDuration time.Duration
        FlushTicker *time.Ticker

 }

 func Since(ts bson.MongoTimestamp) {
@@ -362,7 +363,9 @@ func FetchDocuments(session *mgo.Session, buf *OpBuf, inOp OpChan, inErr chan er
                        buf.AddEntry(op)
                        if buf.NeedsFlush() {
                                buf.Flush(session, outOp, outErr)
+                               // Reset timer so it's x duration from last flush
+                               buf.FlushTicker = time.NewTicker(buf.MaxDuration)
                        }
                }
        }
@@ -400,7 +403,7 @@ func Tail(session *mgo.Session, options *Options) (OpChan, chan error) {
        outErr := make(chan error, options.ChannelSize)
        inOp := make(OpChan, options.ChannelSize)
        outOp := make(OpChan, options.ChannelSize)
-       buf := &OpBuf{MaxDocs: options.MaxDocs, FlushTicker: time.NewTicker(options.MaxDuration)}
+       buf := &OpBuf{MaxDocs: options.MaxDocs, FlushTicker: time.NewTicker(options.MaxDuration), MaxDuration: options.MaxDuration}
        go FetchDocuments(session, buf, inOp, inErr, outOp, outErr)
        go TailOps(session, inOp, inErr, options)
        return outOp, outErr
zph commented 7 years ago

I'm happy to see a solution that should maintain ordering semantics.

rwynn commented 7 years ago

@zph, Just pushed some updates to gtm.

zph commented 7 years ago

@rwynn Interesting!

I'll take a look at it in next few days.

1) I really like the update and insert data changes! 2) pause/resume/etc sound helpful in cases of overload. 3) Full sync using DirectReadNs could be quite helpful if it's a more convenient way than what I implemented for full sync per collection.

I'll ping you with my thoughts once I can :)