visualfabriq / bquery

A query and aggregation framework for Bcolz (W2013-01)
https://www.visualfabriq.com
BSD 3-Clause "New" or "Revised" License
56 stars 11 forks source link

Multi-core support for bquery #17

Open ARF1 opened 9 years ago

ARF1 commented 9 years ago

After missing groupby with bcolz for some time I was excited to find this interesting project. To get started, I was looking at the unique method.

I found some interesting timing results with a 12-character string column in my database:

import blaze
import bquery
import bcolz
from multiprocessing import Pool
p = Pool()

db = bquery.open(...)

%%timeit
db.unique('my_col')
--> 5.5 sec (uses only one core)

db = bcolz.open(...)
d = blaze.Data(db)

%%timeit
blaze.compute(d['my_col'].distinct(), map=p.map)
--> 3.32 sec (using 2 cores on my dual core machine)

%%timeit
blaze.compute(d['my_col'].distinct())
--> 7.69 sec (using only one core)

It appears that parallel processing with blaze provides a fairly significant speedup with my database given its inherent overhead. Is it conceivable to parallelise the bquery code?

FrancescElies commented 9 years ago

Hi,

thanks for your input, in the following PR https://github.com/visualfabriq/bquery/pull/19 you can find a first implementation of how this could work, please keep in mind, it's not deeply tested, If you could try this out and make any improvements or suggestions on it, would be great :)

Regards & thanks again for your post

ARF1 commented 9 years ago

Wow, thanks for the rapid response!

At first I thought, that the PR had practically no real speedup. Then I realised that you are parallelising by column rather than by data chunk:

# Original bquery
In [3]: %%timeit
   ...: db.unique('col1')
   ...: 
1 loops, best of 3: 5.51 s per loop

# With PR #19
In [2]: %%timeit
   ...: db.unique('col1')
   ...: 
1 loops, best of 3: 4.71 s per loop

Using two column, a speedup become apparent:

# Original bquery
In [5]: %%timeit
   ...: db.unique(['col1', 'col2'])
   ...: 
1 loops, best of 3: 10.6 s per loop

# With PR #19
In [5]: %%timeit
   ...: db.unique(['col1', 'col2'])
   ...: 
1 loops, best of 3: 6.03 s per loop

So PR #19 works but is not quite what blaze does - which does achieve the speedup even if only a single column is specified.

Blaze chunks the column in bite-sizes pieces, applies its reduction operation, e.g. unique, to each chunk resulting in a set of unique values for each chunk. These sets of "unique values" are then joined and the reduction operation applied again to create a "global" list of unique values...

Before I submitted this issue, I looked a bit at the code and concluded that parallelising it was a problem I did not know how to implement in cython. My gut instinct was that the following would be necessary (taking string columns as an example):

Clearly this is a non-trivial effort but it would have three advantages I can see:

What do you think?

Note: cython.parallel would probably be required... No idea how to use it. Especially with shared states!

CarstVaartjes commented 9 years ago

Interesting!! This is a good read too: http://archive.euroscipy.org/file/9030/raw/cython_parallel.pdf I'm not such a big fan of multiprocessing in many use cases, because they tend to break when run as part of something like celery or uwsgi + creating a pool can take quite a while (a second or so) which negates the speed advantage a bit. So parallel looks much more promosing at least

So what would be needed:

Shall we start with the first one, who feels up for that? ;)

FrancescElies commented 9 years ago

@ARF1 thanks for the beautiful analysis you and suggestions you make, I see thing exactly as you wrote, per chunk instead of column based parallelization would have many advantages, but as you pointed out that requires more effort than the previous PR , but it would be really nice if we could make it work. About cython.parallel I never used it but it seems to me the right thing to use too.

@CarstVaartjes about chunk iterator I explored this idea a while ago, I will recover some commits and prepare them for a PR for bcolz. I'll close the previous PR, it was only a first idea of what could be done. Cython parallel sounds a much better way to go.

ARF1 commented 9 years ago

@CarstVaartjes Thanks for the detailed reply. I am not too familiar with the bcolz code base. I fear I am not be best person for the job.

That being said, (I think) I managed to get a proof-of-concept working, parallelising _factorize_str_helper using cython.parallel. The speed-up is nothing spectacular yet: for my 2-core machine, x2 for uncompressed bcolz but only x1.6 for compressed bcolz. I am not thrilled, because by optimizing the single-threaded code I already had a speed-up of x1.5...

There is plenty of cleaning up and optimizing to be done. What shall I do with the code? Put it in a fork on github, make a pull request, create a branch? I am not too familiar with the github workflow.


And while I have your attention, can you tell my why modifying:

chunk_ = carray_.chunks[i]
chunk_._getitem(0, chunklen, in_buffer_ptr)

to

carray_.chunks[i]._getitem(0, chunklen, in_buffer_ptr)

results in the following error at run-time?

AttributeError: 'bcolz.carray_ext.chunk' object has no attribute '_getitem'

I currently have to serialise execution of the above quoted code block which I suspect is the reason the parallelised version runs slow with compressed bcolz.

CarstVaartjes commented 9 years ago

Hi,

the chunk iteration in bcolz needs some attention/love. see the discussion here: https://groups.google.com/forum/#!topic/bcolz/C_FClZM9I6A Once it's implemented it will greatly reduce complexity of the code (more readable, no separate loop for leftover arrays anymore + hopefully great performance). I hope that we can have it implemented soon (Francesc's code works but as you can see, the bcolz team needs to decide on what they want & how first I think). I can imagine that if you're reading the chunks serialized atm, you won't have any performance increase yet compared to a single-threaded approach. But very soon it should.

Could you share your parallel approach and changes? would be really interested to learn from them (and additionally: any 1.5x improvement sounds fantastic to implement, regardless of the second multi-threading step)

BR

Carst

ARF1 commented 9 years ago

@CarstVaartjes

I put the code with performance improvements to the "single-threaded" string processing into PR #21. This was not an intentional performance improvement but was an offshoot of making _factorize_str_helper nogil-compatible as a pre-requisite for parallel processing.

"Single-threaded" in quotes as I observed my 2-core CPU with more than 50% load when running this code. It seems that about 1/3 of the performance improvement is due to implicit parallelisation by python.

Possible additional optimizations (but probably fairly minor):


Uncompressed bcolz timings on my machine:

bquery master:
In [3]: %timeit -r 10 a.cache_factor(['mycol'], refresh=True)
1 loops, best of 10: 2.46 s per loop

pull request:
In [3]: %timeit -r 10 a.cache_factor(['isin'], refresh=True)
1 loops, best of 10: 1.59 s per loop

==> Factor: 1.5

Compressed bcolz timings on my machine:

bquery master:
In [3]: %timeit -r 10 a.cache_factor(['mycol'], refresh=True)
1 loops, best of 10: 4.03 s per loop

pull request:
In [3]: %timeit -r 10 a.cache_factor(['mycol'], refresh=True)
1 loops, best of 10: 3.13 s per loop

==> Factor: 1.3
ARF1 commented 9 years ago

@CarstVaartjes The parallelised string processing is in PR #22.