rwynn / gtm

gtm (go tail mongo) is a MongoDB event listener
MIT License
146 stars 33 forks source link

Multithread direct reads #12

Closed makhdumi closed 6 years ago

makhdumi commented 6 years ago

Direct reads seem to be running off of one cursor, using gte query+sort pagination on _id.

This can be pretty inefficient for sharded clusters/collections. For such collections, it's faster to query on chunk ranges and in parallel. The mongo-hadoop connector does this here.

I imagine that the same applies to non-sharded collections too - if the _id range is "guess-chunked" well (i.e. approximately same number of documents per chunk range).

Usually, if I query all documents in say, a 10M document collection, fetching with a single cursor will be pretty slow despite none of the mongod server resource usage (network, CPU, disk) even getting near to being maxed out. I think this is typical with TCP transfers - e.g. how HTTP "download accelerators" get 4x-6x the speed using multiple threads with different Range headers, compared to a single thread.

For some perspective: I'm currently getting (on the monstache machine) ~2 Mbits/s average with ~23 MBit/s peak with DirectRead in monstache on the same local virtual network in Azure, compared to ~35 MBit/s average with 90 MBit/s peak in poorly written (bad chunking) multithreaded/multi-cursor .NET code reading from the same collection over the Internet.

If it's agreed that adding multi-cursor/threaded direct reads would be good, I'd be happy to submit a pull request, but am unsure how to best make it configurable (Do you make multi-cursor DirectRead optional? How do you specify the chunking method? etc).

rwynn commented 6 years ago

Hi, the direct read impl was changed recently in this commit:

https://github.com/rwynn/gtm/commit/58b0adee97f178370f7e0bc671cbdeae8f0bb1a0

Due to some feedback received in this issue with respect to a 10 million document collection.

https://github.com/rwynn/monstache/issues/44

It went from multiple cursors each with coordinated skips and limits to what you see today with a gte query.

The report was closed on feedback that the situation got better for the large 10m collection after the change. It would be nice to go back to concurrent readers but keep the improvements for the other fella.

I will be happy to look at a PR. Maybe you could base it off the code prior to the commit referenced above. At that point multi-cursor was optional via DirectReadersPerCol. As far as the chunking method maybe you could specify that in the Options passed in.

rwynn commented 6 years ago

By the way, with regard to the sharded vs non-sharded, gtm should be connecting directly to the shards and not going through a mongos connection. So, I'm not sure the query logic needs to take sharding into account. The example suggests connecting to the config server, getting the shards, using a MultiContext, which would be a session for each shard, and setting up a listener on the config server oplog to address new shards being added. Maybe I'm missing something?

makhdumi commented 6 years ago

The gte change from skip+limit makes sense. With multiple cursors, I was thinking of multiple range cursors with gte and lt.

Knowing the sharding info just helps with determining the ranges to query on (using the chunks collection), and what the sharding key is.

Otherwise I guess if you don't know that the collection is sharded, you have to query off of _id, i.e. treat it as an unsharded collection? You also have to sort of guess the good ranges: e.g. a simple method would be to just use the total collection size and the min/max _id (I use this for unsharded collections and it performs badly in our case, since the _ids are not at all evenly distributed). Another way would be to "binary search" for each range.

When I use the simple method that results in bad chunking, I get the ~35 Mbps average. But for sharded collections, when I use the already-available ranges in the chunks collection, I get a pretty stable ~80-90 MBps average.

I didn't think of handling when new chunks are added or rebalanced though, which is important (wasn't in my specific situation). I think that makes it a lot trickier.

rwynn commented 6 years ago

I'm going down the path of trying https://docs.mongodb.com/manual/reference/command/parallelCollectionScan/ with a configurable amount of cursors which read in separate go routines. I need to feature check this and only make it available for 2.6+.

makhdumi commented 6 years ago

I was totally unaware of parallelCollectionScan!

Knowing about this helps a lot with my own stuff too, thank you. I'm looking forward to the implementation in gtm.

rwynn commented 6 years ago

Problem is it doesn't seem to work very well with WiredTiger. Looks like it always returns 1 cursor for that engine. And I'm actually seeing slightly better results on 1 million docs with the current code. I guess I need to try other engines to see if it is better in any case.

https://jira.mongodb.org/browse/SERVER-17688

rwynn commented 6 years ago

mmapv1 is working great though. Multiple cursors get returned and it is pretty fast.

rwynn commented 6 years ago

@makhdumi the parallelCollectionScan stuff has been committed to master now. Please let me know how that goes. Of course, it only works for mmapv1 currently, but should transparently upgrade when WiredTiger is enhanced.

rwynn commented 6 years ago

Just added commit 453cc542c407bc353070ad798398c965aa38104b which puts each in cursor on a separate connection.

This gives pretty good thoughput for 10 milliion tiny docs. The syncing to ES is turned off in the run, so just mongo read performance via mmapv1.

time go run monstache.go -direct-read-namespace test.test -exit-after-direct-reads -direct-read-cursors 100
INFO 2018/03/16 21:35:08 Successfully connected to MongoDB version 3.6.3
INFO 2018/03/16 21:35:08 Parallel collection scan command returned 17/100 cursors requested for test.test
INFO 2018/03/16 21:35:08 Starting 17 go routines to read test.test
10000000
INFO 2018/03/16 21:35:58 Shutting down

real    0m52.016s
user    2m9.244s
sys     0m24.640s

With the code that doesn't utilize the collection scan it takes 3X as long.

time go run monstache.go -direct-read-namespace test.test -exit-after-direct-reads -direct-read-cursors 1
INFO 2018/03/16 21:40:39 Successfully connected to MongoDB version 3.6.3
INFO 2018/03/16 21:40:39 Parallel collection scan command returned 1/1 cursors requested for test.test
INFO 2018/03/16 21:40:39 Reverting to single-threaded collection read
10000000
INFO 2018/03/16 21:43:21 Shutting down

real    2m43.619s
user    2m37.740s
sys     0m35.496s
rwynn commented 6 years ago

@makhdumi

You might want to give monstache another try. Recent changes have removed removed the storage of metadata in Mongodb for the purposes of later deletion. This may have been what was slowing it down for you.

makhdumi commented 6 years ago

That might have been it, thank you. I actually had to abandon monstache after I was seeing ~70 ms between each oplog read, but I didn't have time to dig further, so I'm not sure if it was my setup or monstache itself.

Thank you for the initial dump improvements as well. I hadn't seen the latest one.

rwynn commented 6 years ago

Thanks. I’m looking into adding split vector support. Seems parallel collection scan is going away when mmapv1 does.

rwynn commented 6 years ago

@makhdumi thank you for pointing me to the mongo-hadoop-connector code. I've added split vector support to monstache and gtm now because of that. I'll take a look into the oplog read lag at some point.

rwynn commented 6 years ago

gtm now supports splitting collections and reading them in multiple go routines with separate connections using range selectors.