globalsign / mgo

The MongoDB driver for Go
Other
1.97k stars 232 forks source link

Query blocks with ChangeStream #376

Open jinq0123 opened 4 years ago

jinq0123 commented 4 years ago

We use the issue tracker to track bugs with mgo - if you have a usage question, it's best to try Stack Overflow :)

Replace this text with your description, and please answer the questions below before submitting your issue to help us out. Thanks!


What version of MongoDB are you using (mongod --version)?

db version v4.2.0
git version: a4b751dcf51dd249c5865812b390cfd1c0129c30
allocator: tcmalloc
modules: none
build environment:
    distmod: 2012plus
    distarch: x86_64
    target_arch: x86_64

What version of Go are you using (go version)?

go version go1.12.7 windows/amd64

What operating system and processor architecture are you using (go env)?

set GOARCH=amd64
set GOBIN=
set GOCACHE=C:\Users\jinqing\AppData\Local\go-build
set GOEXE=.exe
set GOFLAGS=
set GOHOSTARCH=amd64
set GOHOSTOS=windows
set GOOS=windows
set GOPATH=D:\gopath
set GOPROXY=https://goproxy.cn
set GORACE=
set GOROOT=D:\Go
set GOTMPDIR=
set GOTOOLDIR=D:\Go\pkg\tool\windows_amd64
set GCCGO=gccgo
set CC=gcc
set CXX=g++
set CGO_ENABLED=1
set GOMOD=
set CGO_CFLAGS=-g -O2
set CGO_CPPFLAGS=
set CGO_CXXFLAGS=-g -O2
set CGO_FFLAGS=-g -O2
set CGO_LDFLAGS=-g -O2
set PKG_CONFIG=pkg-config
set GOGCCFLAGS=-m64 -mthreads -fmessage-length=0 -fdebug-prefix-map=C:\Users\jinqing\AppData\Local\Temp\go-build075823922=/tmp/go-build -gno-record-gcc-switches

What did you do?

The program watches a change stream in backgroud, and the query will block.

Setup a local mongodb:

mkdir d:\data\db
D:\Tool\mongodb-win32-x86_64-2012plus-4.2.0\bin
λ mongo --nodb
MongoDB shell version v4.2.0
> cluster = new SharingTest({shards: 3})

Then run the test program:

package main

import (
    "fmt"
    "os"
    "time"

    "github.com/globalsign/mgo"
)

func main() {
    session, err := mgo.Dial("mongodb://localhost:20006/test")
    if err != nil {
        fmt.Println("Dial error: ", err)
        return
    }

    go loopWatch(session)
    for {
        time.Sleep(time.Second)
        query(session)
    }
}

func query(session *mgo.Session) {
    c := session.DB("").C("test1")
    result := struct{}{}

    startTime := time.Now()
    err := c.Find(nil).One(&result)
    cost := time.Since(startTime)
    fmt.Printf("Query takes: %v, result: %v, err: %v\n", cost, result, err)
}

func loopWatch(session *mgo.Session) {
    for {
        watch(session)
    }
}

func watch(session *mgo.Session) {
    coll := session.DB("").C("test2")
    changeStream, err := coll.Watch(nil, mgo.ChangeStreamOptions{
        MaxAwaitTimeMS: 5 * time.Second,
    })
    if err != nil {
        fmt.Println("failed to watch: ", err)
        os.Exit(1)
    }
    defer changeStream.Close()

    //Handling change stream in a cycle
    for {
        //making a struct for unmarshalling
        changeDoc := struct {
            OperationType string `json:"operationType" bson:"operationType"`
        }{}

        //getting next item from the steam
        fmt.Println("Change stream next...")
        ok := changeStream.Next(&changeDoc)
        if ok {
            fmt.Println("got change")
            continue
        }

        //if data from the stream wasn't unmarshaled, we get ok == false as a result
        //so we need to call Err() method to get info why
        //it'll be nil if we just have no data
        err := changeStream.Err()
        if err == nil {
            continue
        }

        //if err is not nil, it means something bad happened, let's finish our func
        return
    } // for
}

Output:

D:\jinqing\Test\go\mgochg
λ mgochg.exe
Change stream next...
Change stream next...
Query takes: 4.0112788s, result: {}, err: not found
Change stream next...
Query takes: 4.0022317s, result: {}, err: not found
Change stream next...
Query takes: 4.0022971s, result: {}, err: not found
Change stream next...
Query takes: 4.0003046s, result: {}, err: not found
Change stream next...
Query takes: 4.0032182s, result: {}, err: not found
Change stream next...
Query takes: 4.0043058s, result: {}, err: not found
...

Can you reproduce the issue on the latest development branch?

fananchong commented 4 years ago

The correct way to use should look like this:

// Copy the session - if needed this will dial a new connection which
// can later be reused.
//
// Calling close returns the connection to the pool.
conn := session.Copy()
defer conn.Close()

// Do something(s) with the connection
_, _ = conn.DB("").C("my_data").Count()
jinq0123 commented 4 years ago

Same result using copied session in Eventual mode:

    session.SetMode(mgo.Eventual, true)