Closed qfrank closed 6 years ago
Hi @qfrank
I ran the example above but got the expected output:
> db.Bulk1.count()
2
> db.Bulk2.count()
1
Also in your example above you're calling blk.Run()
twice, you should check the error returned to ensure the bulk operation actually applied.
Dom
Hi @domodwyer ! thanks for your attention! Here is my test code:
func assertEqual(t *testing.T, a interface{}, b interface{}) {
if a != b {
t.Fatalf("%s != %s", a, b)
}
}
func TestCrossBulkProblem(t *testing.T) {
s, err := mgo.Dial("mongodb://localhost:29017")
if err != nil {
t.Fail()
}
d := s.DB("parts")
var (
blk1Col, blk2Col = d.C("Bulk1"), d.C("Bulk2")
blk1, blk2 = blk1Col.Bulk(), blk2Col.Bulk()
)
if _, err := blk1Col.RemoveAll(nil); err != nil {
t.Fail()
}
if _, err := blk2Col.RemoveAll(nil); err != nil {
t.Fail()
}
for i := 0; i < 2; i++ {
blk1.Insert(bson.M{"name": "bulk1"})
if i == 0 {
_, err = blk1.Run()
if err != nil {
t.Fail()
}
}
}
blk2.Insert(bson.M{"name": "bulk2"})
if _, err = blk1.Run(); err != nil {
t.Fail()
}
if _, err = blk2.Run(); err != nil {
t.Fail()
}
count1, err := blk1Col.Count()
if err != nil {
t.Fail()
}
assertEqual(t, count1, 2)
count2, err := blk2Col.Count()
if err != nil {
t.Fail()
}
assertEqual(t, count2, 2)
//NOTE!!! if all of above assert passed, it means we just reproduced the problem!
}
this is my debug view when step into "blk2.Insert(bson.M{"name": "bulk2"})"
this is my debug view after step over "blk2.Insert(bson.M{"name": "bulk2"})"
Environment mongodb server: 3.2.6
mgo version: [[constraint]] name = "github.com/globalsign/mgo" branch = "master"
OS info: MacBook Pro (Retina, 15-inch, Mid 2014) macOS Sierra 10.12.2
Hi @qfrank
I just ran the above code and got:
--- FAIL: TestCrossBulkProblem (0.21s)
test_test.go:12: %!s(int=1) != %!s(int=2)
FAIL
exit status 1
I ran against 3.2, 3.4 and 3.6 and got the same result on all - I don't really know what to suggest (also try swapping the %s
for %v
and it will work for everything!)
Hi @domodwyer can u tell me what your os type and version, also your golang version pls, i want reproduce your result and prove my issue either...so that u can believe me..thanks.. i'm using go version go1.9.2 darwin/amd64
Hi @qfrank
So while playing with this we discovered that go 1.9.2
triggers the issue, but go 1.10.3
does not - we'll look into this but it suggests the code is relying on some undefined behaviour.
Thanks for your ticket!
Dom
Hi @domodwyer So maybe u could reopen my first pull request on github and considering accept it? i would be very happy then, thanks!
Hi @qfrank
My colleague @eminano is working on a patch that fixes the issue, but preserves the pooling behaviour to minimise the GC pressure (sharing the pool across all Bulk
instances, instead of a per-instance pool which doesn't provide the same benefit).
Thanks very much for the reporting the issue though, it's really appreciated!
Dom
Hi @domodwyer
okay, got it, TAT..But i just checked your colleague's PR, looks like it's still not safe enough in multi goroutine environment, say bulk1 in goroutine#1, bulk2 in goroutine#2, which won't be fit for my requirement..I know i can handle this by myself, just some thoughts on it~
i have a newbie question, do we really need using sync.Pool here? Can we just custom implementation of cache to minimise the GC pressure instead? looking forward your response, thanks
Hi @qfrank
An instance of Bulk
(including the original mgo, our existing implementation and @eminano's implementation) is not safe for concurrent access - we'll be updating the docs to make this clear. Basically this is not safe:
bulk := col1.Bulk()
go func() { bulk.Insert(...) }()
go func() { bulk.Insert(...) }()
// This is bad
However using different bulks concurrently is (and always has been) fine:
go func() {
bulk := col1.Bulk()
bulk.Insert(...)
}()
go func() {
bulk := col1.Bulk()
bulk.Insert(...)
}()
// This is OK!
However there has been a bug since #56 that meant reusing a bulk instance after a successful bulk.Run()
(as you were) would reference memory (in the Bulk.actions
slice) that had been returned to the pool, meaning the behaviour was non-deterministic due to the timing of GC runs - incorrect behaviour is triggered more frequently by allocating and using another bulk immediately as your test does too, because it modifies the data referenced by both instances.
We pool the bulkAction
type (and it's underlying arrays) to minimise the number of allocations when using many separate Bulk
instances, either sequentially or concurrently (such as if you made a Bulk
per file during batch data loading, etc). It's a popular pattern for data analytics (the whole "big data" thing) and using the pool helps minimise runtime and memory fragmentation because you don't have to perform one or more (usually more!) allocations and memory copies when adding more documents to a bulk as the docs
slice grows, and the GC mark-and-sweep doesn't have to spend as much time pausing the runtime or stealing CPU because there's less garbage. I hope this makes sense! If not, feel free to reply :)
Interestingly because of the pooling, after a successful bulk.Run()
the bulk no longer contains the operations that were added to it - this is different behaviour to the original mgo
which we usually avoid, but it was missed during #56. I'm of the opinion to keep our behaviour (a successful Run()
empties the Bulk
) because:
We'll update the documentation to make the above clear though!
Thanks @qfrank for the ticket, and your help - it's really appreciated!
Dom
I'll cut a release shortly!
say we have two bulk variables defined from different collections named "Bulk1" and "Bulk2". After running following code, collection "Bulk1" get 2 document inserted, but collection "Bulk2" also get 2 documents inserted! the reason for this is because actionPool defined in bulk.go is not safe in this situation. i just made a pull request to resolve this issue, hope it will get accepted, thanks