h2oai / db-benchmark

reproducible benchmark of database-like ops
https://h2oai.github.io/db-benchmark
Mozilla Public License 2.0
322 stars 85 forks source link

Dask benchmark improvements #144

Closed ravwojdyla closed 4 years ago

ravwojdyla commented 4 years ago

Hi! I would like to propose a first batch of improvements to Dask benchmark, to summarize, the important changes are just 2 lines of code:

In later PR I would like to also modify nlargest to use aggregation instead of groupyby-apply, and I believe there is also an issue with last test. In some cases these changes on my machine (8 cores) produce up to 4x improvements in comparison to the numbers currently published on the website.

ravwojdyla commented 4 years ago

@jangorecki thanks for prompt response, please see my comments above. And sounds good regarding the wait for another run.

jangorecki commented 4 years ago

I run the benchmark, it is still running, but some initial observations on 1e7 data size below.

I am getting tons of this kind of warnings

distributed.worker - WARNING - Memory use is high but worker has no data 
to store to disk.  Perhaps some other process is leaking memory?  Process
 memory: 2.65 GB -- Worker memory limit: 3.38 GB

If those warnings are nothing to be bothered about, then how can I disable them? I am tracking number of lines written to stderr, as well as signal returned, because it helps to detect issues in various benchmark scripts. In the following output of benchmark launcher, each "finish" line is suffixed by return signal (0 and 1 below) and number of lines in stderr.

start:  dask groupby G1_1e7_1e2_0_0
finish: dask groupby G1_1e7_1e2_0_0: 0: stderr 69
start:  dask groupby G1_1e7_1e1_0_0
finish: dask groupby G1_1e7_1e1_0_0: 1: stderr 199
start:  dask groupby G1_1e7_2e0_0_0
finish: dask groupby G1_1e7_2e0_0_0: 1: stderr 6306

Before it was

start:  dask groupby G1_1e7_1e2_0_0
finish: dask groupby G1_1e7_1e2_0_0: 0
start:  dask groupby G1_1e7_1e1_0_0
finish: dask groupby G1_1e7_1e1_0_0: 0: stderr 19
start:  dask groupby G1_1e7_2e0_0_0
finish: dask groupby G1_1e7_2e0_0_0: 0: stderr 19

Aside from number of lines in stderr, 1e7_1e1 and 1e7_2e0 script now failed to finish (return signal 1), while before they finished successfully. We need to address that as well.


tornado.application - ERROR - Exception in Future <Future cancelled> afte
r timeout
Traceback (most recent call last):
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/tornado/gen.py", line 970, in error_callback
    future.result()
concurrent.futures._base.CancelledError
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarti
ng
distributed.nanny - WARNING - Worker process 32591 was killed by unknown 
signal
Traceback (most recent call last):
  File "./dask/groupby-dask.py", line 307, in <module>
distributed.nanny - WARNING - Restarting worker
    ans = x.groupby(['id1','id2','id3','id4','id5','id6']).agg({'v3':'sum
', 'v1':'count'}).compute()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/dask/base.py", line 166, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/dask/base.py", line 437, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/client.py", line 2318, in get
    direct=direct)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/client.py", line 1652, in gather
    asynchronous=asynchronous)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/client.py", line 670, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/utils.py", line 277, in sync
    six.reraise(*error[0])
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/six.py", line 693, in reraise
    raise value
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/utils.py", line 262, in f
    result[0] = yield future
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/tornado/gen.py", line 1133, in run
    value = future.result()
es/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/client.py", line 1497, in _gather
    traceback)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/six.py", line 693, in reraise
    raise value
distributed.scheduler.KilledWorker: ("('aggregate-chunk-aggregate-agg-b4e
de103bc26350ec5823a6b32e1e135', 0)", 'tcp://127.0.0.1:41269')
distributed.process - WARNING - reaping stray process <ForkServerProcess(
ForkServerProcess-16, started daemon)>
distributed.nanny - WARNING - Worker process 32602 was killed by unknown 
signal
tornado.application - ERROR - Exception in Future <Future cancelled> afte
r timeout
Traceback (most recent call last):
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/tornado/gen.py", line 970, in error_callback
    future.result()
concurrent.futures._base.CancelledError
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarti
ng
distributed.nanny - WARNING - Worker process 8999 was killed by unknown s
ignal
distributed.nanny - WARNING - Restarting worker
...
distributed.nanny - WARNING - Worker process 8896 was killed by unknown s
ignal
Traceback (most recent call last):
  File "./dask/groupby-dask.py", line 248, in <module>
    ans = x[['id6','v3']].groupby(['id6']).apply(lambda x: x.nlargest(2, 
columns='v3'), meta={'id6': 'int64', 'v3': 'float64'})[['v3']].compute()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/dask/base.py", line 166, in compute
distributed.nanny - WARNING - Restarting worker
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/dask/base.py", line 437, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/client.py", line 2318, in get
distributed.worker - WARNING - Memory use is high but worker has no data 
to store to disk.  Perhaps some other process is leaking memory?  Process
 memory: 3.06 GB -- Worker memory limit: 3.38 GB
    direct=direct)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/client.py", line 1652, in gather
    asynchronous=asynchronous)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/client.py", line 670, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/utils.py", line 277, in sync
    six.reraise(*error[0])
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/six.py", line 693, in reraise
    raise value
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/utils.py", line 262, in f
    result[0] = yield future
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/client.py", line 1497, in _gather
    traceback)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/six.py", line 693, in reraise
    raise value
distributed.scheduler.KilledWorker: ("('shuffle-shuffle-join-shuffle-spli
t-getitem-f9bfef7320a356107f92a7576bcb8bde', 0)", 'tcp://127.0.0.1:37875'
)
distributed.process - WARNING - reaping stray process <ForkServerProcess(
ForkServerProcess-14, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(
ForkServerProcess-48, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(
ForkServerProcess-15, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(
ForkServerProcess-28, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(
ForkServerProcess-34, started daemon)>
distributed.nanny - WARNING - Worker process 38798 was killed by unknown 
signal
distributed.nanny - WARNING - Worker process 8923 was killed by unknown s
ignal
distributed.nanny - WARNING - Worker process 8925 was killed by unknown s
ignal
distributed.nanny - WARNING - Worker process 9475 was killed by unknown s
ignal
tornado.application - ERROR - Exception in Future <Future cancelled> afte
r timeout
Traceback (most recent call last):
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/tornado/gen.py", line 970, in error_callback
    future.result()
concurrent.futures._base.CancelledError
distributed.nanny - WARNING - Worker process 9127 was killed by unknown s
ignal
ravwojdyla commented 4 years ago

Thanks for the update @jangorecki! We definitely should not ignore low memory warnings (before we look into other issues). So you have a machine with 40cores and 125GB? With default Client settings you should have ended up with 8 workers (5 threads each), ~15GB memory limit for each worker. But for some reason you ended up with 3.38GB per worker (which is roughly ~125/40), are you passing number of workers somewhere? Which version of dask are you running?

ravwojdyla commented 4 years ago

@jangorecki also could you please run this:

>>> from dask import distributed
>>> n_workers, nthreads = distributed.deploy.local.nprocesses_nthreads()
>>> distributed.worker.parse_memory_limit("auto", 1, n_workers)
8431729664
>>> (n_workers, nthreads)
(4, 2)

This will show us what are the defaults on your machine.

jangorecki commented 4 years ago

I got the timings as well. In general there is a nice speed up in questions that are high cardinality (except for q10 for some reason). This is something that dask was quite bad at. On the other hand other questions got slowdown. Some questions did not completed at all now. There are less questions that got speed up than questions are now slower or fails, but the speed up is bigger than slowdown. It generally looks to be the way to go, but it sacrifices a lot, thus I am not sure if that should be merged. Unless we can of course tweak it, so all the questions that have been answered before will still be answered. Otherwise, if merging, I would prefer to have an open issue for each query that will now start to fail, as a documentation, and a way to track updates about each issue. Detailed timings below (first time run), extra +/- markers for the most significant changes:

ravwojdyla commented 4 years ago

@jangorecki thanks for the numbers. A couple of things I want to point out:

jangorecki commented 4 years ago

So you have a machine with 40cores and 125GB?

yes

With default Client settings you should have ended up with 8 workers (5 threads each), ~15GB memory limit for each worker. But for some reason you ended up with 3.38GB per worker (which is roughly ~125/40), are you passing number of workers somewhere?

no, only changes as in your PR

Which version of dask are you running?

2.15.0

also could you please run this

from dask import distributed
n_workers, nthreads = distributed.deploy.local.nprocesses_nthreads()
#Traceback (most recent call last):
#  File "<stdin>", line 1, in <module>
#NameError: name 'n_workers' is not defined

looks like some part was missing

Maybe we should include a dummy warm up question so it's a bit more fair?

Most of the tools suffer in some degree from warm up on the first query, there was a good discussion about doing warming ups in #69.

ravwojdyla commented 4 years ago
from dask import distributed
n_workers, nthreads = distributed.deploy.local.nprocesses_nthreads()
#Traceback (most recent call last):
#  File "<stdin>", line 1, in <module>
#NameError: name 'n_workers' is not defined

That is very weird, are you running this in python shell? How about output of this:

>>> from dask import distributed
>>> distributed.deploy.local.nprocesses_nthreads()

Regarding warm up, fine with me, given that the 2nd run will be faster on the 1st ever question anyway.

jangorecki commented 4 years ago

Yes, python console. Only source dask virtual env, then python.

#Python 3.6.7 (default, Oct 25 2018, 09:16:13) 
#[GCC 5.4.0 20160609] on linux
#Type "help", "copyright", "credits" or "license" for more information
from dask import distributed
distributed.deploy.local.nprocesses_nthreads()
#Traceback (most recent call last):
#  File "<stdin>", line 1, in <module>
#AttributeError: module 'distributed.deploy.local' has no attribute 'nprocesses_nthreads'
ravwojdyla commented 4 years ago

πŸ€”

➜  db-benchmark git:(rav/dask_imp_1) βœ— source dask/py-dask/bin/activate
(py-dask) ➜  db-benchmark git:(rav/dask_imp_1) βœ— python
Python 3.6.9 (default, Apr 18 2020, 01:56:04)
[GCC 8.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from dask import distributed
>>> distributed.deploy.local.nprocesses_nthreads()
(4, 2)
>>>
(py-dask) ➜  db-benchmark git:(rav/dask_imp_1) βœ— pip freeze
click==7.1.2
cloudpickle==1.4.1
dask==2.15.0
distributed==2.15.2
fsspec==0.7.3
HeapDict==1.0.1
locket==0.2.0
msgpack==1.0.0
numpy==1.18.3
pandas==1.0.3
partd==1.1.0
pkg-resources==0.0.0
psutil==5.7.0
python-dateutil==2.8.1
pytz==2020.1
PyYAML==5.3.1
six==1.14.0
sortedcontainers==2.1.0
tblib==1.6.0
toolz==0.10.0
tornado==6.0.4
zict==2.0.0
jangorecki commented 4 years ago
(py-dask) jan@mr-dl11:~/git/db-benchmark⟫ pip freeze
Click==7.0
cloudpickle==0.6.1
dask==2.15.0
distributed==1.25.0
fastparquet==0.3.2
fsspec==0.4.1
HeapDict==1.0.0
llvmlite==0.30.0
locket==0.2.0
msgpack==0.6.0
numba==0.46.0
numpy==1.15.4
pandas==0.23.4
partd==0.3.9
pkg-resources==0.0.0
psutil==5.4.8
python-dateutil==2.7.5
pytz==2018.7
PyYAML==3.13
six==1.12.0
sortedcontainers==2.1.0
tblib==1.3.2
thrift==0.13.0
toolz==0.9.0
tornado==5.1.1
zict==0.1.3

Looks like upgrade of dask does not upgrade distributed. This command is being used every time.

python -m pip install --upgrade dask
jangorecki commented 4 years ago

After upgrading dask[distributed]

from dask import distributed
n_workers, nthreads = distributed.deploy.local.nprocesses_nthreads()
distributed.worker.parse_memory_limit("auto", 1, n_workers)
#16882002944
(n_workers, nthreads)
#(8, 5)

should I tweak something, or is it ready to re-run?

ravwojdyla commented 4 years ago

Yes - that looks much better. Thank you! If you could please rerun the benchmark, then we can double check which questions still have issues, and fix them.

ravwojdyla commented 4 years ago

Btw - about tweaking things, there might be an opportunity for adjusting the number of threads, I still fear that GIL cost might kick in here at 5 threads per python process. In my case with 8 threads per python process (the default thread-pool based scheduler), there is noticeable GIL cost. But that is something that you would need to measure and make an educated decision, so maybe for now we can stick to defaults on your machine, and check the results?

jangorecki commented 4 years ago

It looks better, less failures and less stderr lines.

start:  dask groupby G1_1e7_1e2_0_0
finish: dask groupby G1_1e7_1e2_0_0: 0
start:  dask groupby G1_1e7_1e1_0_0
finish: dask groupby G1_1e7_1e1_0_0: 0
start:  dask groupby G1_1e7_2e0_0_0
finish: dask groupby G1_1e7_2e0_0_0: 0
start:  dask groupby G1_1e7_1e2_0_1
finish: dask groupby G1_1e7_1e2_0_1: 0
finish: dask groupby G1_1e8_1e2_0_0: 1: stderr 497
start:  dask groupby G1_1e8_1e1_0_0
finish: dask groupby G1_1e8_1e1_0_0: 1: stderr 277
start:  dask groupby G1_1e8_2e0_0_0
finish: dask groupby G1_1e8_2e0_0_0: 1: stderr 185
start:  dask groupby G1_1e8_1e2_0_1
finish: dask groupby G1_1e8_1e2_0_1: 1: stderr 550
start:  dask groupby G1_1e9_1e2_0_0
finish: dask groupby G1_1e9_1e2_0_0: 1: stderr 2817
start:  dask groupby G1_1e9_1e1_0_0
finish: dask groupby G1_1e9_1e1_0_0: 1: stderr 1619
start:  dask groupby G1_1e9_2e0_0_0
finish: dask groupby G1_1e9_2e0_0_0: 1: stderr 886
start:  dask groupby G1_1e9_1e2_0_1
finish: dask groupby G1_1e9_1e2_0_1: 1: stderr 3542

stderr are mostly

distributed.worker - WARNING - Memory use is high but worker has no data to stor
e to disk.  Perhaps some other process is leaking memory?  Process memory: 12.74
 GB -- Worker memory limit: 16.88 GB

some

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.worker - WARNING - gc.collect() took 1.135s. This is usually a sign 
that some tasks handle too many Python objects at the same time. Rechunking the 
work into smaller tasks might help.

and script fails on error

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
Traceback (most recent call last):
  File "./dask/groupby-dask.py", line 307, in <module>
distributed.nanny - WARNING - Restarting worker
    ans = x.groupby(['id1','id2','id3','id4','id5','id6']).agg({'v3':'sum', 'v1$
:'count'}).compute()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/das$
/base.py", line 166, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dask
/base.py", line 437, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/client.py", line 2632, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/client.py", line 1931, in gather
    asynchronous=asynchronous,
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/client.py", line 780, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/utils.py", line 347, in sync
    raise exc.with_traceback(tb)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/utils.py", line 331, in f
    result[0] = yield future
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/torn
ado/gen.py", line 1133, in run
    value = future.result()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/client.py", line 1790, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ("('aggregate-combine-18e2e5ea858f43d1c5c712
9687e32544', 2, 0, 0)", <Worker 'tcp://127.0.0.1:35033', name: 0, memory: 0, pro
cessing: 1>)
distributed.nanny - WARNING - Restarting worker

or

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
Traceback (most recent call last):
  File "./dask/groupby-dask.py", line 108, in <module>
distributed.nanny - WARNING - Restarting worker
    ans = x.groupby(['id3']).agg({'v1':'sum', 'v3':'mean'}).compute()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dask
/base.py", line 166, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dask
/base.py", line 437, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/client.py", line 2632, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/client.py", line 1931, in gather
    asynchronous=asynchronous,
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/client.py", line 780, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/utils.py", line 347, in sync
    raise exc.with_traceback(tb)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/utils.py", line 331, in f
    result[0] = yield future
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/torn
ado/gen.py", line 1133, in run
    value = future.result()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/client.py", line 1790, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ("('aggregate-combine-ad2de86e24e72088f3ff46
cb33bb175e', 2, 0, 0)", <Worker 'tcp://127.0.0.1:34995', name: 3, memory: 0, pro
cessing: 1>)
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker

and on 1e9 data (parquet)

distributed.worker - WARNING - gc.collect() took 2.271s. This is usuall[37/3265]
that some tasks handle too many Python objects at the same time. Rechunking the 
work into smaller tasks might help.
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.worker - ERROR - Worker stream died during communication: tcp://127.
0.0.1:42647
Traceback (most recent call last):
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/comm/tcp.py", line 184, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/worker.py", line 1954, in gather_dep
    self.rpc, deps, worker, who=self.address
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/worker.py", line 3224, in get_data_from_worker
    return await retry_operation(_get_data, operation="get_data_from_worker")
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/utils_comm.py", line 390, in retry_operation
    operation=operation,
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/utils_comm.py", line 370, in retry
    return await coro()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/worker.py", line 3211, in _get_data
    max_connections=max_connections,
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/core.py", line 533, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/comm/tcp.py", line 199, in read
    convert_stream_closed_error(self, e)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/comm/tcp.py", line 121, in convert_stream_closed_error
    raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: ConnectionResetError: [E
rrno 104] Connection reset by peer
distributed.worker - WARNING - Memory use is high but worker has no data to stor
e to disk.  Perhaps some other process is leaking memory?  Process memory: 13.00
 GB -- Worker memory limit: 16.88 GB
distributed.worker - ERROR - Worker stream died during communication: tcp://127.
0.0.1:42647
Traceback (most recent call last):
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dist
ributed/comm/tcp.py", line 193, in read
    n = await stream.read_into(frame)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:
...

this During handling of the above exception, another exception occurred: looks to be a bug somewhere, isn't it?


Timings on upgraded version. Any idea how we could tweak G1_1e8_2e0_0_0? 1e7-1e8 are in-mem, 1e9 is parquet

              data iquestion   master       PR
 1: G1_1e7_1e2_0_0         1    0.126    0.263
 2: G1_1e7_1e2_0_0         2    0.219    0.758
 3: G1_1e7_1e2_0_0         3    1.569    1.681
 4: G1_1e7_1e2_0_0         4    0.213    0.503
 5: G1_1e7_1e2_0_0         5    0.310    1.008
 6: G1_1e7_1e2_0_0         7    1.664    1.435
 7: G1_1e7_1e2_0_0         8  295.221   23.788
 8: G1_1e7_1e2_0_0        10   13.358   19.429
 9: G1_1e7_1e1_0_0         1    0.084    0.196
10: G1_1e7_1e1_0_0         2    0.146    0.478
11: G1_1e7_1e1_0_0         3   15.711   14.924
12: G1_1e7_1e1_0_0         4    0.224    0.627
13: G1_1e7_1e1_0_0         5    1.361    3.608
14: G1_1e7_1e1_0_0         7   15.189   15.110
15: G1_1e7_1e1_0_0         8 2725.947  202.638
16: G1_1e7_1e1_0_0        10       NA   31.315
17: G1_1e7_2e0_0_0         1    0.076    0.156
18: G1_1e7_2e0_0_0         2    0.165    0.442
19: G1_1e7_2e0_0_0         3   42.794   39.416
20: G1_1e7_2e0_0_0         4    0.227    0.536
21: G1_1e7_2e0_0_0         5    3.446    7.298
22: G1_1e7_2e0_0_0         7   45.658   35.978
23: G1_1e7_2e0_0_0         8       NA 1059.461
24: G1_1e7_2e0_0_0        10       NA   50.430
25: G1_1e7_1e2_0_1         1    0.085    0.163
26: G1_1e7_1e2_0_1         2    0.174    0.409
27: G1_1e7_1e2_0_1         3    1.743    1.581
28: G1_1e7_1e2_0_1         4    0.240    0.597
29: G1_1e7_1e2_0_1         5    0.318    1.061
30: G1_1e7_1e2_0_1         7    1.649    1.523
31: G1_1e7_1e2_0_1         8  296.673   23.879
32: G1_1e7_1e2_0_1        10   13.151   17.911
              data iquestion   master       PR
              data iquestion   master       PR
 1: G1_1e8_1e2_0_0         1    0.688    7.809
 2: G1_1e8_1e2_0_0         2    1.557    1.135
 3: G1_1e8_1e2_0_0         3  146.356   48.492
 4: G1_1e8_1e2_0_0         4    1.778    1.068
 5: G1_1e8_1e2_0_0         5    7.787    8.941
 6: G1_1e8_1e2_0_0         7  141.030   40.728
 7: G1_1e8_1e2_0_0         8 3731.613  477.391
 8: G1_1e8_1e1_0_0         1    0.682   10.642
 9: G1_1e8_1e1_0_0         2    1.210    0.674
10: G1_1e8_1e1_0_0         3  521.989  317.810
11: G1_1e8_1e1_0_0         4    1.880    2.420
12: G1_1e8_1e1_0_0         5   27.728   34.290
13: G1_1e8_1e1_0_0         7  487.942  314.352
14: G1_1e8_1e1_0_0         8       NA 3861.374
15: G1_1e8_2e0_0_0         1    0.587   11.253
16: G1_1e8_2e0_0_0         2    1.087    0.719
17: G1_1e8_2e0_0_0         3  942.296       NA
18: G1_1e8_2e0_0_0         4    1.718       NA
19: G1_1e8_2e0_0_0         5   65.000       NA
20: G1_1e8_2e0_0_0         7  919.055       NA
21: G1_1e8_2e0_0_0         8       NA       NA
22: G1_1e8_1e2_0_1         1    0.649    8.167
23: G1_1e8_1e2_0_1         2    1.198    0.702
24: G1_1e8_1e2_0_1         3  147.078   48.040
25: G1_1e8_1e2_0_1         4    1.751    1.100
26: G1_1e8_1e2_0_1         5    7.464    8.626
27: G1_1e8_1e2_0_1         7  141.842   42.098
28: G1_1e8_1e2_0_1         8 3649.284  385.563
              data iquestion   master       PR
              data iquestion   master      PR
 1: G1_1e9_1e2_0_0         1  741.038 128.884
 2: G1_1e9_1e2_0_0         2  746.965 116.202
 3: G1_1e9_1e2_0_0         3 5440.679      NA
 4: G1_1e9_1e1_0_0         1  790.948 125.173
 5: G1_1e9_1e1_0_0         2  790.832 128.427
 6: G1_1e9_1e1_0_0         3       NA      NA
 7: G1_1e9_2e0_0_0         1  783.267 136.433
 8: G1_1e9_2e0_0_0         2  791.165 135.211
 9: G1_1e9_2e0_0_0         3       NA      NA
10: G1_1e9_1e2_0_1         1  728.491 128.775
11: G1_1e9_1e2_0_1         2  752.626 110.685
12: G1_1e9_1e2_0_1         3 5710.615      NA
jangorecki commented 4 years ago

Is there a way to disable warnings which are just an operational messages and cannot really be addressed?

ravwojdyla commented 4 years ago

@jangorecki thanks for the update, I appreciate it! Let me see if I can reproduce those errors and I get back to you tomorrow.

ravwojdyla commented 4 years ago

@jangorecki just a quick update, I can reproduce the issues in 1e8 tests, I know what the issue is. I will push an update to this PR by Friday.

ravwojdyla commented 4 years ago

@jangorecki I updated the PR. Changes:

Unfortunately I can't exactly reproduce your numbers and your issues without the same VM spec, so next time you have a chance, could you please share numbers for the most recent change?

jangorecki commented 4 years ago

running first observation (1e7 rows) is that it uses on average < 10 cores, while 40 are available.

jangorecki commented 4 years ago

timings

              data iquestion   master      PR
 1: G1_1e7_1e2_0_0         1    0.126   0.238
 2: G1_1e7_1e2_0_0         2    0.219   0.708
 3: G1_1e7_1e2_0_0         3    1.569   1.499
 4: G1_1e7_1e2_0_0         4    0.213   0.637
 5: G1_1e7_1e2_0_0         5    0.310   1.134
 6: G1_1e7_1e2_0_0         7    1.664   1.315
 7: G1_1e7_1e2_0_0         8  295.221  59.235
 8: G1_1e7_1e2_0_0        10   13.358  18.873
 9: G1_1e7_1e1_0_0         1    0.084   0.208
10: G1_1e7_1e1_0_0         2    0.146   0.380
11: G1_1e7_1e1_0_0         3   15.711  15.317
12: G1_1e7_1e1_0_0         4    0.224   0.589
13: G1_1e7_1e1_0_0         5    1.361   3.603
14: G1_1e7_1e1_0_0         7   15.189  12.879
15: G1_1e7_1e1_0_0         8 2725.947 524.082
16: G1_1e7_1e1_0_0        10       NA  30.310
17: G1_1e7_2e0_0_0         1    0.076   0.187
18: G1_1e7_2e0_0_0         2    0.165   0.400
19: G1_1e7_2e0_0_0         3   42.794  45.395
20: G1_1e7_2e0_0_0         4    0.227   0.616
21: G1_1e7_2e0_0_0         5    3.446   7.999
22: G1_1e7_2e0_0_0         7   45.658  39.465
23: G1_1e7_2e0_0_0         8       NA      NA
24: G1_1e7_2e0_0_0        10       NA      NA
25: G1_1e7_1e2_0_1         1    0.085   0.200
26: G1_1e7_1e2_0_1         2    0.174   0.395
27: G1_1e7_1e2_0_1         3    1.743   1.702
28: G1_1e7_1e2_0_1         4    0.240   0.666
29: G1_1e7_1e2_0_1         5    0.318   0.982
30: G1_1e7_1e2_0_1         7    1.649   1.402
31: G1_1e7_1e2_0_1         8  296.673  58.995
32: G1_1e7_1e2_0_1        10   13.151  17.919
              data iquestion   master      PR
              data iquestion   master      PR
 1: G1_1e8_1e2_0_0         1    0.688   8.412
 2: G1_1e8_1e2_0_0         2    1.557   1.177
 3: G1_1e8_1e2_0_0         3  146.356 107.825
 4: G1_1e8_1e2_0_0         4    1.778   2.613
 5: G1_1e8_1e2_0_0         5    7.787  16.147
 6: G1_1e8_1e2_0_0         7  141.030  45.905
 7: G1_1e8_1e2_0_0         8 3731.613 392.796
 8: G1_1e8_1e1_0_0         1    0.682   9.317
 9: G1_1e8_1e1_0_0         2    1.210   0.666
10: G1_1e8_1e1_0_0         3  521.989      NA
11: G1_1e8_1e1_0_0         4    1.880      NA
12: G1_1e8_1e1_0_0         5   27.728      NA
13: G1_1e8_1e1_0_0         7  487.942      NA
14: G1_1e8_1e1_0_0         8       NA      NA
15: G1_1e8_2e0_0_0         1    0.587  18.541
16: G1_1e8_2e0_0_0         2    1.087   0.951
17: G1_1e8_2e0_0_0         3  942.296 533.915
18: G1_1e8_2e0_0_0         4    1.718   1.841
19: G1_1e8_2e0_0_0         5   65.000  40.562
20: G1_1e8_2e0_0_0         7  919.055 772.520
21: G1_1e8_2e0_0_0         8       NA      NA
22: G1_1e8_1e2_0_1         1    0.649   8.494
23: G1_1e8_1e2_0_1         2    1.198   0.706
24: G1_1e8_1e2_0_1         3  147.078 100.848
25: G1_1e8_1e2_0_1         4    1.751   2.557
26: G1_1e8_1e2_0_1         5    7.464  11.442
27: G1_1e8_1e2_0_1         7  141.842  45.652
28: G1_1e8_1e2_0_1         8 3649.284 399.192
              data iquestion   master      PR
              data iquestion   master      PR
 1: G1_1e9_1e2_0_0         1  741.038 109.802
 2: G1_1e9_1e2_0_0         2  746.965 114.268
 3: G1_1e9_1e2_0_0         3 5440.679      NA
 4: G1_1e9_1e1_0_0         1  790.948 111.814
 5: G1_1e9_1e1_0_0         2  790.832 110.161
 6: G1_1e9_1e1_0_0         3       NA      NA
 7: G1_1e9_2e0_0_0         1  783.267      NA
 8: G1_1e9_2e0_0_0         2  791.165      NA
 9: G1_1e9_2e0_0_0         3       NA      NA
10: G1_1e9_1e2_0_1         1  728.491 157.512
11: G1_1e9_1e2_0_1         2  752.626 124.105
12: G1_1e9_1e2_0_1         3 5710.615      NA
jangorecki commented 4 years ago

exit codes and stderr

start:  dask groupby G1_1e7_1e2_0_0
finish: dask groupby G1_1e7_1e2_0_0: 0
start:  dask groupby G1_1e7_1e1_0_0
finish: dask groupby G1_1e7_1e1_0_0: 0
start:  dask groupby G1_1e7_2e0_0_0
finish: dask groupby G1_1e7_2e0_0_0: 0: stderr 21
start:  dask groupby G1_1e7_1e2_0_1
finish: dask groupby G1_1e7_1e2_0_1: 0
start:  dask groupby G1_1e8_1e2_0_0
finish: dask groupby G1_1e8_1e2_0_0: 1: stderr 22
start:  dask groupby G1_1e8_1e1_0_0
finish: dask groupby G1_1e8_1e1_0_0: 1: stderr 22
start:  dask groupby G1_1e8_2e0_0_0
finish: dask groupby G1_1e8_2e0_0_0: 1: stderr 21
start:  dask groupby G1_1e8_1e2_0_1
finish: dask groupby G1_1e8_1e2_0_1: 1: stderr 22
start:  dask groupby G1_1e9_1e2_0_0
finish: dask groupby G1_1e9_1e2_0_0: 1: stderr 35
start:  dask groupby G1_1e9_1e1_0_0
finish: dask groupby G1_1e9_1e1_0_0: 1: stderr 51
start:  dask groupby G1_1e9_2e0_0_0
finish: dask groupby G1_1e9_2e0_0_0: 1: stderr 341
start:  dask groupby G1_1e9_1e2_0_1
finish: dask groupby G1_1e9_1e2_0_1: 1: stderr 126

it is great that number of stderr has been reduced so well, thanks, it make maintenance much easier.

jangorecki commented 4 years ago

exceptions:

those with KeyboardInterrupt are due to timeout in script launcher.

::::::::::::::
out/run_dask_groupby_G1_1e7_2e0_0_0.err
::::::::::::::
Traceback (most recent call last):
  File "./dask/groupby-dask.py", line 257, in <module>
    ans = x[['id6','v3']].groupby(['id6']).apply(lambda x: x.nlargest(2, columns='v3'), meta={'id6': 'int64', 'v3': 'float64'})[['v3']].compute()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dask/base.py", line 166, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dask/base.py", line 437, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 2632, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 1931, in gather
    asynchronous=asynchronous,
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 780, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/utils.py", line 344, in sync
    e.wait(10)
  File "/usr/lib/python3.6/threading.py", line 551, in wait
    signaled = self._cond.wait(timeout)
  File "/usr/lib/python3.6/threading.py", line 299, in wait
    gotit = waiter.acquire(True, timeout)
KeyboardInterrupt
command '/bin/bash -c "source ./dask/py-dask/bin/activate && ./_launcher/solution.R --solution=dask --task=groupby --nrow=1e7 --k=2e0 --na=0 --sort=0
 --out=time.csv > out/run_dask_groupby_G1_1e7_2e0_0_0.out 2> out/run_dask_groupby_G1_1e7_2e0_0_0.err"' timed out after 3600s
::::::::::::::
out/run_dask_groupby_G1_1e8_1e1_0_0.err
::::::::::::::
Traceback (most recent call last):
  File "./dask/groupby-dask.py", line 117, in <module>
    ans = x.groupby(['id3']).agg({'v1':'sum', 'v3':'mean'}, split_every=False).compute()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dask/base.py", line 166, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dask/base.py", line 437, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 2632, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 1931, in gather
    asynchronous=asynchronous,
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 780, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/utils.py", line 347, in sync
    raise exc.with_traceback(tb)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/utils.py", line 331, in f
    result[0] = yield future
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 1790, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ("('aggregate-agg-672bfa0f0a2a77ddf6fda8bcf5af2b06', 0)", <Worker 'tcp://127.0.0.1:33067', name: 4, memory: 0, pr
ocessing: 1>)
::::::::::::::
out/run_dask_groupby_G1_1e8_1e2_0_0.err
::::::::::::::
Traceback (most recent call last):
  File "./dask/groupby-dask.py", line 316, in <module>
    ans = x.groupby(['id1','id2','id3','id4','id5','id6']).agg({'v3':'sum', 'v1':'count'}).compute()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dask/base.py", line 166, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dask/base.py", line 437, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 2632, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 1931, in gather
    asynchronous=asynchronous,
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 780, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/utils.py", line 347, in sync
    raise exc.with_traceback(tb)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/utils.py", line 331, in f
    result[0] = yield future
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 1790, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ("('aggregate-combine-18e2e5ea858f43d1c5c7129687e32544', 2, 0, 0)", <Worker 'tcp://127.0.0.1:42889', name: 1, mem
ory: 0, processing: 1>)
::::::::::::::
out/run_dask_groupby_G1_1e8_1e2_0_1.err
::::::::::::::
Traceback (most recent call last):
  File "./dask/groupby-dask.py", line 316, in <module>
    ans = x.groupby(['id1','id2','id3','id4','id5','id6']).agg({'v3':'sum', 'v1':'count'}).compute()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dask/base.py", line 166, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dask/base.py", line 437, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 2632, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 1931, in gather
    asynchronous=asynchronous,
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 780, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/utils.py", line 347, in sync
    raise exc.with_traceback(tb)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/utils.py", line 331, in f
    result[0] = yield future
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 1790, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ("('aggregate-combine-58b9869579d438b2048591a4aee1be03', 2, 0, 0)", <Worker 'tcp://127.0.0.1:42177', name: 5, mem
ory: 0, processing: 1>)
::::::::::::::
out/run_dask_groupby_G1_1e8_2e0_0_0.err
::::::::::::::
Traceback (most recent call last):
  File "./dask/groupby-dask.py", line 257, in <module>
    ans = x[['id6','v3']].groupby(['id6']).apply(lambda x: x.nlargest(2, columns='v3'), meta={'id6': 'int64', 'v3': 'float64'})[['v3']].compute()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dask/base.py", line 166, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dask/base.py", line 437, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 2632, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 1931, in gather
    asynchronous=asynchronous,
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 780, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/utils.py", line 344, in sync
    e.wait(10)
  File "/usr/lib/python3.6/threading.py", line 551, in wait
    signaled = self._cond.wait(timeout)
  File "/usr/lib/python3.6/threading.py", line 299, in wait
    gotit = waiter.acquire(True, timeout)
KeyboardInterrupt
command '/bin/bash -c "source ./dask/py-dask/bin/activate && ./_launcher/solution.R --solution=dask --task=groupby --nrow=1e8 --k=2e0 --na=0 --sort=0
 --out=time.csv > out/run_dask_groupby_G1_1e8_2e0_0_0.out 2> out/run_dask_groupby_G1_1e8_2e0_0_0.err"' timed out after 7200s
::::::::::::::
out/run_dask_groupby_G1_1e9_1e1_0_0.err
::::::::::::::
Traceback (most recent call last):
  File "./dask/groupby-dask.py", line 117, in <module>
    ans = x.groupby(['id3']).agg({'v1':'sum', 'v3':'mean'}, split_every=False).compute()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dask/base.py", line 166, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/dask/base.py", line 437, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 2632, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 1931, in gather
    asynchronous=asynchronous,
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 780, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/utils.py", line 347, in sync
    raise exc.with_traceback(tb)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/utils.py", line 331, in f
    result[0] = yield future
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/client.py", line 1790, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ("('aggregate-agg-ad0fe21eaa4d7ae4f7e7fbe1d7ed44f8', 0)", <Worker 'tcp://127.0.0.1:38067', name: 1, memory: 0, pr
ocessing: 1>)
distributed.diskutils - ERROR - Failed to remove '/home/jan/git/db-benchmark/dask-worker-space/worker-6ryaox5m/storage/%28%27aggregate-chunk-ad0fe21e
aa4d7ae4f7e7fbe1d7ed44f8%27%2C%200%2C%20422%2C%200%29' (failed in <built-in function unlink>): [Errno 2] No such file or directory: '%28%27aggregate-
chunk-ad0fe21eaa4d7ae4f7e7fbe1d7ed44f8%27%2C%200%2C%20422%2C%200%29'
........[ many: distributed.diskutils - ERROR - Failed to remove ].....
distributed.diskutils - ERROR - Failed to remove '/home/jan/git/db-benchmark/dask-worker-space/worker-6ryaox5m/storage' (failed in <built-in function
 rmdir>): [Errno 2] No such file or directory: 'storage'
distributed.diskutils - ERROR - Failed to remove '/home/jan/git/db-benchmark/dask-worker-space/worker-6ryaox5m' (failed in <built-in function rmdir>)
: [Errno 2] No such file or directory: '/home/jan/git/db-benchmark/dask-worker-space/worker-6ryaox5m'
/usr/lib/python3.6/contextlib.py:88: UserWarning: Creating scratch directories is taking a surprisingly long time. This is often due to running worke
rs on a network file system. Consider specifying a local-directory to point workers to write scratch data to a local disk.
  next(self.gen)
::::::::::::::
out/run_dask_groupby_G1_1e9_1e2_0_0.err
::::::::::::::
[ same as above ]
::::::::::::::
out/run_dask_groupby_G1_1e9_1e2_0_1.err
::::::::::::::
distributed.worker - ERROR - failed during get data with tcp://127.0.0.1:33259 -> tcp://127.0.0.1:45177
Traceback (most recent call last):
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/comm/tcp.py", line 242, in write
    await future
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/worker.py", line 1249, in get_data
    compressed = await comm.write(msg, serializers=serializers)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/comm/tcp.py", line 246, in write
    convert_stream_closed_error(self, e)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/comm/tcp.py", line 121, in convert_stream_closed_error
    raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: ConnectionResetError: [Errno 104] Connection reset by peer
distributed.worker - ERROR - failed during get data with tcp://127.0.0.1:36725 -> tcp://127.0.0.1:45177
Traceback (most recent call last):
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/comm/tcp.py", line 184, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:
....
[ many more exception during handling exception ]
::::::::::::::
out/run_dask_groupby_G1_1e9_2e0_0_0.err
::::::::::::::
distributed.worker - ERROR - Worker stream died during communication: tcp://127.0.0.1:35557
Traceback (most recent call last):
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/comm/core.py", line 232, in connect
    _raise(error)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packages/distributed/comm/core.py", line 213, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'tcp://127.0.0.1:35557' after 10 s: connect() didn't finish in time

During handling of the above exception, another exception occurred:
...
[ many more exception during handling exception ]
jangorecki commented 4 years ago

could we address somehow?

distributed.diskutils - ERROR - Failed to remove '/home/jan/git/db-benchmark/dask-worker-space/worker-6ryaox5m/storage/%28%27aggregate-chunk-ad0fe21e
aa4d7ae4f7e7fbe1d7ed44f8%27%2C%200%2C%20422%2C%200%29' (failed in <built-in function unlink>): [Errno 2] No such file or directory: '%28%27aggregate-
chunk-ad0fe21eaa4d7ae4f7e7fbe1d7ed44f8%27%2C%200%2C%20422%2C%200%29'
ravwojdyla commented 4 years ago

@jangorecki thanks for the update! Will see what can be fixed.

jangorecki commented 4 years ago

@ravwojdyla any update on that? if it is not on your radar anymore, please let me know as well

ravwojdyla commented 4 years ago

@jangorecki updated, I am sorry I need to ask you to rerun the benchmarks but it's hard to reproduce some of the errors (especially the 1e9 tests, my VM can't handle this size). Could you please rerun the benchmark for the most recent change? I believe the most recent change should handle both high/low cardinality of groups (at least for the tested 1e7-1e8 inputs, and I'm curious to see the results for 1e9).

jangorecki commented 4 years ago

@ravwojdyla no worries about re-running, in h2o we have a dedicated machine for this project, any improvements that can be incorporated are reason good enough to keep it running. Will report back when it will finish.

jangorecki commented 4 years ago

setting index caused all scripts on 1e9 to failure with same error

Traceback (most recent call last):
  File "./dask/groupby-dask.py", line 48, in <module>
    x = x.set_index("id3")
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/dask/dataframe/core.py", line 3617, in set_index
    **kwargs
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/dask/dataframe/shuffle.py", line 85, in set_index
    divisions, sizes, mins, maxes, optimize_graph=False
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/dask/base.py", line 444, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/client.py", line 2632, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direc
t)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/client.py", line 1931, in gather
    asynchronous=asynchronous,
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/client.py", line 780, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/utils.py", line 347, in sync
    raise exc.with_traceback(tb)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/utils.py", line 331, in f
    result[0] = yield future
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/distributed/client.py", line 1790, in _gather
    raise exception.with_traceback(traceback)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/dask/utils.py", line 894, in __call__
    return getattr(obj, self.method)(*args, **kwargs)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/pandas/core/generic.py", line 9613, in stat_func
    numeric_only=numeric_only)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/pandas/core/series.py", line 3225, in _reduce
    filter_type=filter_type, **kwds)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/pandas/core/arrays/categorical.py", line 2068, in _reduce
    return func(numeric_only=numeric_only, **kwds)
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/pandas/core/arrays/categorical.py", line 2109, in max
    self.check_for_ordered('max')
  File "/home/jan/git/db-benchmark/dask/py-dask/lib/python3.6/site-packag
es/pandas/core/arrays/categorical.py", line 1472, in check_for_ordered
    "Categorical to an ordered one\n".format(op=op))
TypeError: Categorical is not ordered for operation max
you can use .as_ordered() to change the Categorical to an ordered one
ravwojdyla commented 4 years ago

@jangorecki yes, that failure needs to be fixed (sorry I could not test 1e9 on my VM), so essentially we need to change the type of the id3 from categorical to string for the set_index to work (one of the solutions). Btw, I know we might not be able to go with set_index solution (given your comments above), but have you been able to run this version for 1e7-1e8 tests and record times, I'm curious about the results please?

jangorecki commented 4 years ago
              data iquestion   master      PR PRindex
 1: G1_1e8_1e2_0_0         1    0.688   8.412  10.585
 2: G1_1e8_1e2_0_0         2    1.557   1.177   0.949
 3: G1_1e8_1e2_0_0         3  146.356 107.825   6.423
 4: G1_1e8_1e2_0_0         4    1.778   2.613   1.013
 5: G1_1e8_1e2_0_0         5    7.787  16.147   6.634
 6: G1_1e8_1e2_0_0         7  141.030  45.905   5.177
 7: G1_1e8_1e2_0_0         8 3731.613 392.796 504.179

q3 is 16 times faster (group by id3). q7 is 9 times faster (group by id3). some are slower but difference is not big.

@ravwojdyla data.table for example by setting a key on "id3" will reduce q3 timing from 1.53s to 0.322s (using 50% of available threads). If one would want to explore timings with indexes and statistics on the data, that should be made as a separate task, so it can be made fair. Unless of course it is something that happens internally by default on each query.

ravwojdyla commented 4 years ago

Thanks for the numbers. I see your point. Let's see if I can figure out another way to workaround this issue with large number of small groups. I might need to submit some fixes to Dask.

jangorecki commented 4 years ago

We can always do a followup improvements, once Dask will add features you are missing now. This PR is already waiting for 1 month, lets have it merged soon. AFAIU the most optimial revision was the one before using index? You made a force push to PR so cannot well invesigate that myself. Could we go back to that? @ravwojdyla

ravwojdyla commented 4 years ago

πŸ‘‹@jangorecki sure, no problem, I can revert to the previous version.

jangorecki commented 4 years ago

@ravwojdyla I had to push that forwards because I am planning to be a on a break soon. I manage to make diff quite tiny by using a separate script that checks for __name__ and then reads the actual script from another file. This way we can still run the actual script interactively, line by line. Which is very important for debugging. I checked timings on 1e8 and it looks consistent to what your PR, before the index, gave. I would appriciate if you can look at the current state of PR and see if it looks OK for you.

jangorecki commented 4 years ago

I will merge for now. @ravwojdyla your feedback is of course still very welcome.

ravwojdyla commented 4 years ago

πŸ‘‹ @jangorecki, thanks for moving forward with this PR, sorry I was out for some time and then lost track of this PR. If I find some time to work on this issue further, I will keep you updated!