Open mosesBD opened 4 years ago
i also have another issue with direct read here is how i start it
fmt.Println(startDirectRead)
ctx := gtm.Start(client, >m.Options{
DirectReadNs: []string{config.SourceDB + "." + config.SourceCollection},
})
go printOp(ctx)
ctx.DirectReadWg.Wait()
in my printOp go routine i simply use the code you have as an example to print operations
for {
select {
case op = <-ctx.OpC:
count = count + 1
fmt.Println(count)
break
case err = <-ctx.ErrC:
fmt.Println("got error %+v", err)
break
default:
return
}
}
i expect this code to print until the collection is fully received then since the channel is nil the default case will run and the routine will exit however the routine hits the default case immediately i tried to add 1 to the waitGroup with ADD() function and then calling the Done() function after routine is done. if i remove the default case the for loop will go on forever how can i access the collection data in direct read mode and then continue with the rest of the code?
Hopefully this will help...
main.go
package main
import (
"context"
"flag"
"fmt"
"log"
"time"
"github.com/rwynn/gtm"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func readContext(ctx *gtm.OpCtx, done chan bool) {
var errsDone, opsDone bool
var opCnt, errCnt int
for !errsDone || !opsDone {
select {
case op, open := <-ctx.OpC:
if op == nil && !open {
opsDone = true
break
}
opCnt++
break
case err, open := <-ctx.ErrC:
if err == nil && !open {
errsDone = true
break
}
errCnt++
break
}
}
fmt.Printf("Processed %d ops and %d errs\n", opCnt, errCnt)
close(done)
}
func main() {
var mongoURL string
flag.StringVar(&mongoURL, "url", "mongodb://localhost:27017", "MongoDB connection URL")
flag.Parse()
log.Printf("Connecting to MongoDB at %s", mongoURL)
client, err := mongo.NewClient(options.Client().ApplyURI(mongoURL))
if err != nil {
log.Fatalln(err)
}
if err = client.Connect(context.Background()); err != nil {
log.Fatalln(err)
}
defer client.Disconnect(context.Background())
done := make(chan bool)
gtmCtx := gtm.Start(client, >m.Options{
DirectReadNs: []string{"test.test"},
ChangeStreamNs: []string{"test.test"},
MaxAwaitTime: time.Duration(10) * time.Second,
OpLogDisabled: true,
})
go readContext(gtmCtx, done)
gtmCtx.DirectReadWg.Wait()
gtmCtx.Stop()
<-done
fmt.Println("All done")
}
go.mod
module test
go 1.12
require (
github.com/DataDog/zstd v1.4.4 // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/google/go-cmp v0.4.0 // indirect
github.com/minio/highwayhash v1.0.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rwynn/gtm v1.0.1-0.20191119151623-081995b34c9c
github.com/serialx/hashring v0.0.0-20190515033939-7706f26af194 // indirect
github.com/stretchr/testify v1.4.0 // indirect
github.com/tidwall/pretty v1.0.0 // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
github.com/xdg/stringprep v1.0.0 // indirect
go.mongodb.org/mongo-driver v1.2.1
golang.org/x/crypto v0.0.0-20200128174031-69ecbb4d6d5d // indirect
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect
golang.org/x/text v0.3.2 // indirect
)
awesome thanks very much one question though: i noticed you have both ChangeStreamNs and DirectReadNs enabled also the OpLogDisabled is true would you please clarify whether this code just reads the collection or does it also read the changes applied to it while direct read is happening. it would be great if the Options struct had some comments
i ran the code and saw something interesting if i print the op and err the number of processed operations and errs changes if i don't print the op and err and just count them (like your code) it has the correct number of ops otherwise it seems like a couple of thousands of ops are not counted i,m doing direct read on a collection with about 3 million records the database is not under any load so no fields change during the read
with operation print:
Processed 2916557 ops and 0 errs
All done
without operation print:
Processed 2918601 ops and 0 errs
All done
i did a bit more investigation consider the code below
ctx.DirectReadWg.Wait()
fmt.Println("direct read done")
ctx.Stop()
<-done
after the direct read is done you have closed the ctx which i think kills the channel and causes the readContext function to close but when i print the operations i can see the function printing operations even after the fmt.Println("direct read done") is executed which means the ctx is being closed prematurely. any thoughts?
I think it may be related to the use of buffered channels in gtm.
Try updating the readContext function I posted and give the name op to the 2 _ vars. Then only set the 2 done flags if...
op == nil && !open
Due to buffering I think that the channel is closed but still has data on it. When the channel is empty the zero value of nil for a pointer will start getting returned.
Also to your previous question. With Change stream NS set, yes you will also receive changes to the collections that are happening. There are methods to check if the event source is direct read or change event. Make that an empty slice if you don’t want changes.
You will probably want OplogDisabled always set to true. This project is old and thus supports reading directly from the oplog. But you don’t want to do that anymore since change streams.
i updated my original posted code to account for the buffering.
i tried the new code and just added fmt.Println(op) and fmt.Println(err) but still i lose some operations
Processed 2916562 ops and 0 errs
All done
it should have 2918601 ops also change stream NS was set to empty string since database has no operations at the moment and simply has 2918601 inserts in it (backup restore)
Can you post the full code that does not work? Also, does it work with or without the Println
?
package main
import (
"context"
"flag"
"fmt"
"log"
"time"
"github.com/rwynn/gtm"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func readContext(ctx *gtm.OpCtx, done chan bool) {
var errsDone, opsDone bool
var opCnt, errCnt int
for !errsDone && !opsDone {
select {
case op, open := <-ctx.OpC:
if op == nil && !open {
opsDone = true
break
}
opCnt++
fmt.Println(op.Data)
break
case err, open := <-ctx.ErrC:
if err == nil && !open {
errsDone = true
break
}
fmt.Println(err)
errCnt++
break
}
}
fmt.Printf("Processed %d ops and %d errs\n", opCnt, errCnt)
close(done)
}
func main() {
var mongoURL string
flag.StringVar(&mongoURL, "url", "mongodb://172.20.11.57:27017", "MongoDB connection URL")
flag.Parse()
log.Printf("Connecting to MongoDB at %s", mongoURL)
client, err := mongo.NewClient(options.Client().ApplyURI(mongoURL))
if err != nil {
log.Fatalln(err)
}
if err = client.Connect(context.Background()); err != nil {
log.Fatalln(err)
}
defer client.Disconnect(context.Background())
done := make(chan bool)
gtmCtx := gtm.Start(client, >m.Options{
DirectReadNs: []string{"record_stats.answerlogs"},
ChangeStreamNs: []string{},
MaxAwaitTime: time.Duration(10) * time.Second,
OpLogDisabled: true,
})
go readContext(gtmCtx, done)
gtmCtx.DirectReadWg.Wait()
fmt.Println("Direct read done")
gtmCtx.Stop()
<-done
fmt.Println("All done")
}
Direct read done
Processed 2918601 ops and 0 errs
All done
works correctly without the print so did the previous code
Even when I add the Println
like you have I still come up with the same number, in my case a collection of size 4000012
. Using go version 1.13.4 linux/amd64 here.
That's really a mystery to me why that Print would be affecting the count.
i,m using 1.13.1 on centos7
i will try with 1.13.4 and see what happens
thank you
i tried with go 1.13.4 but no luck i also used another server with more resources to make sure there are no bottlenecks there when i run the code i posted the text Direct read done is printed but there are operations left (still printing) before the main routine exits
Direct read done
map[_id:ObjectID("5e2794e980ea13a924dd989c") answer:-1 created_at: game_id: hint_used: question_id: question_number:7 state:NOT_ANSWERED user_id: zone_number:1]
map[_id:ObjectID("5e2794e980ea13a924dd989d") answer:-1 created_at:game_id: hint_used: question_id: question_number:8 state:NOT_ANSWERED user_id: zone_number:1]
map[_id:ObjectID("5e2495c99dae954a6cb4ea4a") answer:-1 created_at:game_id: hint_used: question_id: question_number:13 state:NOT_ANSWERED user_id: zone_number:1]
Processed 2916590 ops and 0 errs
All done
is it the same for you?
on the new server running without the print returns different results (some correct and some not correct) looks like the problem is not with the code rather my distribution or hardware would you please tell me your linux distro for the mongo server and mongo version you are using also information about the hardware would be useful i am using mongo 4.2.3 on centos7 server has 8gb ram and 8 cpu cores using ssd disk
here is my mongo logs . looks like latency issues
2020-02-06T14:10:26.264-0500 I NETWORK [listener] connection accepted from 127.0.0.1:50832 #74 (1 connection now open)
2020-02-06T14:10:26.265-0500 I NETWORK [conn74] received client metadata from 127.0.0.1:50832 conn74: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:26.266-0500 I NETWORK [listener] connection accepted from 127.0.0.1:50834 #75 (2 connections now open)
2020-02-06T14:10:26.266-0500 I NETWORK [conn75] received client metadata from 127.0.0.1:50834 conn75: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:26.511-0500 I COMMAND [conn75] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: {} }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("23fae6e9-0c35-4852-b126-a68a1d5b5e26") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:051BBA2A planCacheKey:051BBA2A reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 238ms
2020-02-06T14:10:26.512-0500 I NETWORK [listener] connection accepted from 127.0.0.1:50836 #76 (3 connections now open)
2020-02-06T14:10:26.513-0500 I NETWORK [conn76] received client metadata from 127.0.0.1:50836 conn76: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:26.808-0500 I COMMAND [conn75] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5dd451fdf2b4ae5988416dfd') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("23fae6e9-0c35-4852-b126-a68a1d5b5e26") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 295ms
2020-02-06T14:10:26.809-0500 I NETWORK [listener] connection accepted from 127.0.0.1:50838 #77 (4 connections now open)
2020-02-06T14:10:26.809-0500 I NETWORK [conn77] received client metadata from 127.0.0.1:50838 conn77: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:27.106-0500 I COMMAND [conn75] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5dd999b8cef705b56f075a57') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("23fae6e9-0c35-4852-b126-a68a1d5b5e26") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 297ms
2020-02-06T14:10:27.107-0500 I NETWORK [listener] connection accepted from 127.0.0.1:50840 #78 (5 connections now open)
2020-02-06T14:10:27.107-0500 I NETWORK [conn78] received client metadata from 127.0.0.1:50840 conn78: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:27.414-0500 I COMMAND [conn75] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5de108bf984e0f8957c747e0') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("23fae6e9-0c35-4852-b126-a68a1d5b5e26") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 307ms
2020-02-06T14:10:27.418-0500 I NETWORK [listener] connection accepted from 127.0.0.1:50842 #79 (6 connections now open)
2020-02-06T14:10:27.418-0500 I NETWORK [conn79] received client metadata from 127.0.0.1:50842 conn79: { driver: { name: "mongo-go-driver", version: "v1.2.1" }, os: { type: "linux", architecture: "amd64" }, platform: "go1.13.4" }
2020-02-06T14:10:27.737-0500 I COMMAND [conn78] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5dfc9f870ff0bbb61395759a') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("e3c44531-fbec-4c24-a26d-2f3d8e431df5") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 321ms
2020-02-06T14:10:28.076-0500 I COMMAND [conn78] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5e0dec7411714ed41f504082') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("e3c44531-fbec-4c24-a26d-2f3d8e431df5") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 338ms
2020-02-06T14:10:28.472-0500 I COMMAND [conn78] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5e16361af67e1200a5f8eab2') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("e3c44531-fbec-4c24-a26d-2f3d8e431df5") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 395ms
2020-02-06T14:10:28.913-0500 I COMMAND [conn78] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5e1cbfe7d01063c2f09edccd') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("e3c44531-fbec-4c24-a26d-2f3d8e431df5") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 440ms
2020-02-06T14:10:29.485-0500 I COMMAND [conn75] command record_stats.answerlogs command: aggregate { aggregate: "answerlogs", allowDiskUse: false, pipeline: [ { $match: { _id: { $gte: ObjectId('5e217712c7b942a926c26db7') } } }, { $sort: { _id: 1 } }, { $skip: 291860 }, { $limit: 1 }, { $project: { _id: 1 } } ], cursor: {}, lsid: { id: UUID("e3c44531-fbec-4c24-a26d-2f3d8e431df5") }, $db: "record_stats" } planSummary: IXSCAN { _id: 1 } keysExamined:291861 docsExamined:0 cursorExhausted:1 numYields:2280 nreturned:1 queryHash:58536163 planCacheKey:5F700006 reslen:137 locks:{ ReplicationStateTransition: { acquireCount: { w: 2300 } }, Global: { acquireCount: { r: 2300 } }, Database: { acquireCount: { r: 2300 } }, Collection: { acquireCount: { r: 2300 } }, Mutex: { acquireCount: { r: 20 } } } storage:{} protocol:op_msg 561ms
2020-02-06T14:10:36.137-0500 I NETWORK [conn79] end connection 127.0.0.1:50842 (5 connections now open)
2020-02-06T14:10:36.137-0500 I NETWORK [conn75] end connection 127.0.0.1:50834 (3 connections now open)
2020-02-06T14:10:36.137-0500 I NETWORK [conn78] end connection 127.0.0.1:50840 (1 connection now open)
2020-02-06T14:10:36.137-0500 I NETWORK [conn74] end connection 127.0.0.1:50832 (0 connections now open)
2020-02-06T14:10:36.137-0500 I NETWORK [conn76] end connection 127.0.0.1:50836 (2 connections now open)
2020-02-06T14:10:36.137-0500 I NETWORK [conn77] end connection 127.0.0.1:50838 (4 connections now open)
i ran the code couple more times on both servers looks like i was wrong even on the slower server just printing the count can produce different results but it happens less often slower server is running mongo 4.2.1 this is probably a matter of hardware and mongo version
@mosesBD,
I think I found the mistake on my part. It was silly. The condition for the for
loop should be
for !errsDone || !opsDone {
Notice the ||
instead of &&
. I updated the code sample in this thread.
I think gtm
is OK and this was just an error in usage.
looks like that was it testing with count and print works fine
i checked a couple more times and it is working :) how do i tell whether the operation i am receiving is coming from DirectReadNs or ChangeStreamNs
i,m trying to use gtm to sync two mongo instances. first instance has been running for a while and has roughly 50GB of data. my question is if i do a direct read and start writing to the secondary database what can i do about the changes made to the primary database while i,m reading the data since it can change after i read it my own idea is to store the op-log while doing the direct read and then apply it to the second database. i understand that there are already tools for this but this is part of a bigger project and i need to achieve this via code. any better options?