dask / dask

Parallel computing with task scheduling
https://dask.org
BSD 3-Clause "New" or "Revised" License
12.58k stars 1.71k forks source link

Unexpected dataframe shuffle for sorted index #3135

Open bnaul opened 6 years ago

bnaul commented 6 years ago

Following up on https://stackoverflow.com/questions/48592049/dask-dataframe-groupby-apply-efficiency/48592529 with an example.

Read data w/ a sorted index column and perform a groupby; shouldn't require a shuffle:

from uuid import uuid4
import dask.dataframe as dd
from distributed import Client
import numpy as np
import pandas as pd

N = int(1e4); n = 50
uids = sorted(str(uuid4()) for i in range(N))
df = pd.DataFrame({'uid': np.repeat(uids, n), 'x': np.random.random(N * n),
                   'y': np.random.random(N * n)})
df.sort_values(by='uid', inplace=True)
df.to_parquet('/tmp/test.parquet', compression=None)
del df

c = Client('tcp://192.168.1.42:8786')
ddf = dd.read_parquet('/tmp/test.parquet')
ddf = c.persist(ddf.set_index('uid'))
#ddf = c.persist(ddf.repartition(npartitions=100))  # no shuffle if we don't repartition
future = ddf.groupby('uid').apply(sum).compute()
screenshot 2018-02-02 21 07 03

Looks good! But re-running with the repartition uncommented:

screenshot 2018-02-02 21 08 45

Based on my reading of the docs it doesn't seem like repartition should be so problematic if my index is sorted; am I just misinterpreting or is unexpected behavior?

mrocklin commented 6 years ago

So first, lack of shuffling on a single partitition dataframe should be expected. There is only one partition so everything can be done within that one pandas dataframe.

I'm going to walk through this problem narrating my thoughts. Hopefully this helps in future debugging.

One problem here is that the first call to read_parquet doesn't seem to collect it's min/max index values, which is unfortunate (cc @martindurant):

In [12]: ddf = dd.read_parquet('/tmp/test.parquet')
In [13]: ddf.divisions  # None means unknown here
Out[13]: (None, None)

And so when we repartition we get another dataframe, also with unknown partitions

In [14]: ddf2 = ddf.repartition(npartitions=100)

In [15]: ddf2.divisions[:10]
Out[15]: (None, None, None, None, None, None, None, None, None, None)

Of course, we would still expect the eventual set_index operation to check the values for sortedness, and a quick check shows that yes, the data is sorted.

In [16]: s = ddf2.uid.compute()

In [17]: s.is_monotonic_increasing
Out[17]: True

When I call the set index operation it does appear to do the right thing, and the dataframe with the index set has only a linear amount of extra tasks (3x in this case) not quadratic or n*log(n)

In [18]: ddf3 = ddf2.set_index('uid')

In [19]: ddf3
Out[19]: 
Dask DataFrame Structure:
                                            x        y
npartitions=100                                       
0008cc68-408d-4359-876b-d24bc7bbf124  float64  float64
02f51302-785d-460d-b6f9-5038c50e7e14      ...      ...
...                                       ...      ...
fd6a1eeb-5613-40d1-9e4b-0b9fc75e8eda      ...      ...
fffa93f8-4802-4194-bc1d-76cf0372949d      ...      ...
Dask Name: sort_index, 302 tasks  # <<------- there are hundreds of tasks here, not thousands

Similarly for the groupby-apply

In [23]: ddf3.groupby('uid').apply(sum)
/home/mrocklin/Software/anaconda/bin/ipython:1: UserWarning: `meta` is not specified, inferred from partial data. Please provide `meta` if the result is unexpected.
  Before: .apply(func)
  After:  .apply(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .apply(func, meta=('x', 'f8'))            for series result
  #!/home/mrocklin/Software/anaconda/bin/python
Out[23]: 
Dask DataFrame Structure:
                                            x        y
npartitions=100                                       
0008cc68-408d-4359-876b-d24bc7bbf124  float64  float64
02f51302-785d-460d-b6f9-5038c50e7e14      ...      ...
...                                       ...      ...
fd6a1eeb-5613-40d1-9e4b-0b9fc75e8eda      ...      ...
fffa93f8-4802-4194-bc1d-76cf0372949d      ...      ...
Dask Name: _groupby_slice_apply, 402 tasks

So in short, I can't reproduce. But hopefully by going through the process above you can identify where our paths diverge.

In [25]: c.get_versions()['client']
Out[25]: 
{'host': [('python', '3.6.2.final.0'),
  ('python-bits', 64),
  ('OS', 'Linux'),
  ('OS-release', '4.13.0-26-generic'),
  ('machine', 'x86_64'),
  ('processor', 'x86_64'),
  ('byteorder', 'little'),
  ('LC_ALL', 'None'),
  ('LANG', 'en_US.UTF-8'),
  ('LOCALE', 'en_US.UTF-8')],
 'packages': {'optional': [('numpy', '1.14.0'),
   ('pandas', '0.22.0'),
   ('bokeh', '0.12.14rc1'),
   ('lz4', '0.10.1'),
   ('blosc', '1.5.1')],
  'required': [('dask', '0.16.1+42.gc736c53'),
   ('distributed', '1.20.2+60.gfd9a68c'),
   ('msgpack', '0.4.8'),
   ('cloudpickle', '0.5.2'),
   ('tornado', '4.5.2'),
   ('toolz', '0.9.0')]}}
bnaul commented 6 years ago

Think I may have (partially) cracked it. I tried the steps you posted above and I see:

In [10]: ddf = dd.read_parquet('/tmp/fastparquet.parquet')
In [11]: print(ddf.divisions)
(0, 499999)

In[12]: ddf2 = ddf.repartition(npartitions=100)
In[13]: print(ddf2.divisions[:10])
(0, 4999, 9999, 14999, 19999, 24999, 29999, 34999, 39999, 44999)

In[14]: ddf3 = ddf2.set_index('uid')
In[15]: ddf3.groupby('uid').apply(sum)
Out[15]:
Dask DataFrame Structure:
                                            x        y
npartitions=100
0000ea10-7114-4f33-8559-676920266aca  float64  float64
02b24265-fe22-4cfc-aa6f-88caa0cba5d1      ...      ...
...                                       ...      ...
fd971227-cdac-4b7d-ad74-5550a230166b      ...      ...
fffa32cc-6cee-47e8-81a3-91cf873234f7      ...      ...
Dask Name: _groupby_slice_apply, 3401 tasks

Quite different from your result even though my package versions seem comparable (np==1.14, dask==0.16.1, distributed==1.20.2). But I realized I didn't specify the parquet engine in my example; this was using fastparquet so I tried again with pyarrow:

In[23]: dd.read_parquet('/tmp/pyarrow.parquet').divisions
Out[23]: (None, None)  # etc.

So I guess it's related to the storage engine? Not sure if that's expected behavior or not (also not sure if using compression would affect anything but I specified None in my example).

martindurant commented 6 years ago

Generally, fastparquet does compute the max/min of every column (although you can choose not to, at least in the direct fastparquet API), and if you use a column with ordered max/min as the index on load, then you should get known divisions.

I don't know about arrow.

I would have thought that having known divisions should, if anything, reduce the number of tasks rather than increase them. Certainly compression and other encoding options will have no effect on the graph planning, only on how long one given data-loading task might take.

xhochy commented 6 years ago

parquet-cpp also computes by default min/max on every column. Probably we need to add more Python interfaces to access these values but in general, the divisions "discovery" should be engine-independent code.

mrocklin commented 6 years ago

syntax tip

You can use ddf.persist() rather than c.persist(ddf) if desired.

Actual problem at hand

OK, I'm explicitly specifying engine='fastparquet' now in both the to_parquet and read_parquet operations.

In [12]: ddf.divisions
Out[12]: (0, 499999)
In [13]: ddf = c.persist(ddf.set_index('uid'))
In [14]: ddf.divisions
Out[14]: 
('00013b78-2281-4eba-8433-1319150a2248',
 'fffc1e9a-4674-4a6f-9f18-8b62a51b87e9')

OK, so now we repartition to 100 partitions. Typically here we would just interpolate between the existing division values. Unfortunately, we don't have nice code to do interpolation on strings, and so we give up and resort to unknown divisions.

In [15]: ddf = c.persist(ddf.repartition(npartitions=100))  # no shuffle if we d
In [16]: ddf.divisions[:10]
Out[16]: (None, None, None, None, None, None, None, None, None, None)

Any groupby-apply on this will result in a full shuffle.

In [27]: ddf.groupby('uid').apply(sum)
Dask DataFrame Structure:
                       x        y
npartitions=100                  
                 float64  float64
                     ...      ...
...                  ...      ...
                     ...      ...
                     ...      ...
Dask Name: _groupby_slice_apply, 3400 tasks  # <<-- note thousands of tasks

Ironically we could avoid this by resetting and then re-setting the index. This would force Dask to look at the values in the column.

In [30]: ddf2 = ddf.reset_index()
    ...: ddf2 = ddf2.set_index('uid')
    ...: ddf2
Dask DataFrame Structure:
                                            x        y
npartitions=100                                       
00013b78-2281-4eba-8433-1319150a2248  float64  float64
026170b9-b54a-4838-b295-4f27c12babf7      ...      ...
...                                       ...      ...
fd818866-8b2c-43af-ab20-b4df229ee9ac      ...      ...
fffc1e9a-4674-4a6f-9f18-8b62a51b87e9      ...      ...
Dask Name: sort_index, 400 tasks

Possible solutions for Dask

  1. We could develop code to interpolate between string index values. See code here

    ```python if (df.known_divisions and (np.issubdtype(divisions.dtype, np.datetime64) or np.issubdtype(divisions.dtype, np.number))): if np.issubdtype(divisions.dtype, np.datetime64): divisions = divisions.values.astype('float64') if isinstance(divisions, pd.Series): divisions = divisions.values n = len(divisions) divisions = np.interp(x=np.linspace(0, n, npartitions + 1), xp=np.linspace(0, n, n), fp=divisions) if np.issubdtype(original_divisions.dtype, np.datetime64): divisions = pd.Series(divisions).astype(original_divisions.dtype).tolist() elif np.issubdtype(original_divisions.dtype, np.integer): divisions = divisions.astype(original_divisions.dtype) if isinstance(divisions, np.ndarray): divisions = divisions.tolist() divisions = list(divisions) divisions[0] = df.divisions[0] divisions[-1] = df.divisions[-1] return df.repartition(divisions=divisions) else: ratio = npartitions / df.npartitions split_name = 'split-%s' % tokenize(df, npartitions) dsk = {} last = 0 j = 0 for i in range(df.npartitions): new = last + ratio if i == df.npartitions - 1: k = npartitions - j else: k = int(new - last) dsk[(split_name, i)] = (split_evenly, (df._name, i), k) for jj in range(k): dsk[(new_name, j)] = (getitem, (split_name, i), jj) j += 1 last = new divisions = [None] * (npartitions + 1) return new_dd_object(merge(df.dask, dsk), new_name, df._meta, divisions) ```
  2. We could have groupby-apply's compute sortedness on the column before returning. This would make them semi-immediate, which is something that we avoid. This would be helpful in this case but bad in all others
  3. We could track index-sortedness separately from divisions. This would mean adding an additional piece of metadata throughout all computations (I'm somewhat against this for maintenance reasons).
  4. ...