dask / dask

Parallel computing with task scheduling
https://dask.org
BSD 3-Clause "New" or "Revised" License
12.45k stars 1.7k forks source link

Multiply reduces partitions #3403

Open ckaz02 opened 6 years ago

ckaz02 commented 6 years ago

I have a pandas dataframe that looks like this

index type value1 value2 0 a .5 .6 1 b .25 .2 2 c .25 .2

Then a dask dataframe that looks like this

index type value1 value2 0 a 8 9 1 a 7 6 2 b 2 10 3 b 4 5 4 c 2 7 5 c 1 6

desired output

index value1 value2 a 4 5.4 a 3.5 3.6 b 0.5 2 b 1 1 c 0.5 1.4 c 0.25 1.2

I want to set the type of both dataframes to the index then do dask_df.mul(pandas_df) so the values in the pandas dataframe are multiplied across all the matching row types in the dask dataframe. This gives me the result that I want but after the multiply the partitions in my dask dataframe are reduced from 124 to 2 and then any operations after the the multiply fail due to a memory error. I'm just wondering if there is a way to stop this repartition or another approach I can take to give the desired output without and wouldn't repartition.

mrocklin commented 6 years ago

Thanks for the bug report @ckaz02 . Can I ask you for a reproducible example that a developer can copy-paste to easily see the issue that you're having? This makes it more likely that a volunteer will take a closer look at your problem.

joergdietrich commented 6 years ago

This reproduces the report above:

import numpy as np
import pandas as pd
import dask.dataframe as dd

df1 = pd.DataFrame({'value1': [0.5, 0.25, 0.25],
                    'value2': [0.6, 0.2, 0.2]},
                   index=['a', 'b', 'c'])

df2 = pd.DataFrame({'value1': np.random.randint(100, size=1_000_000),
                    'value2': np.random.randint(100, size=1_000_000),
                    'type': np.random.choice(['a', 'b', 'c'], size=1_000_000)})

ddf = dd.from_pandas(df2, npartitions=124)
ddf = ddf.set_index('type')
assert ddf.npartitions == 124
ddm = ddf.mul(df1)
ddm.npartitions
mrocklin commented 6 years ago

Thank you for the excellent reproducer @joergdietrich ! This really helps.

I think that this can be tracked down to this line:

https://github.com/dask/dask/blob/daeaaaa3bcf65aab753c5a7b24d7c6f4cb92ddc7/dask/dataframe/multi.py#L107

I think that the reason why we see a decrease in the number of partitions is that the division values are not unique. When multiple dataframes interact we first align them so that they have the same division structure (for example so that all rows indexed with 'a' are in the same partition). When index values are the same across division boundaries (as will necessarily be the case when you have fewer unique values than partitions) then this becomes hard.

We have no way to ensure that corresponding rows are in corresponding partitions without constraining partitions to have disjoint index values.