rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.45k stars 908 forks source link

Make a plan for sort_values/set_index #2272

Closed mrocklin closed 4 years ago

mrocklin commented 5 years ago

It would be nice to be able to use the set_index method to sort the dataframe by a particular column.

There are currently two implementations for this, one in dask.dataframe and one in dask-cudf which uses a batcher sorting net. While most dask-cudf code has been removed in favor of the dask.dataframe implementations this sorting code has remained, mostly because I don't understand it fully, and don't know if there was a reason for this particular implementation.

Why was this implementation chosen? Was this discussed somewhere? Alternatively @sklam, do you have any information here?

cc @kkraus14 @randerzander

mrocklin commented 5 years ago

NVIDIA folks have asked if there is some way to integrate an MPI or NCCL enabled multi-gpu sort into Dask for improved efficiency. My initial reaction to this is that it's likely to be difficult to integrate smoothly in a way that respects other Dask management like resilience, spilling to disk, load balancing, and so on. Lets expand on this.

First, if we have a multi-node sorting algorithm, we can always treat it how we treat XGBoost. Dask gives up control to some other higher performance system, it does its thing, we claim control back. If anything fails during this stage then we just retry the whole thing. We give up on any kind of memory management or load balancing during this process and just hope that the external system can handle things well without blowing up.

Second question is if we just have a single-node multi-GPU system, maybe we can use that? This is also a bit tricky currently, but we might be able to make structural changes to Dask to make it less tricky. The cost-benefit analysis of those changes might make this undesirable though. Currently the approach most people seem to be using with Dask and GPUs is to have one Dask worker per GPU. Currently Dask workers don't have any knowledge of other Dask workers on the same node, so there isn't anything built up to handle local collective action. We would be doing something similar to what is done above where we would more or less stop Dask from doing its normal task-scheduling thing, hand-write a bunch of control flow, hope nothing breaks, run custom code, and then have Dask take back control when we're done.

Both are totally doable, but would require us to build something like dask-xgboost, and raise general concerns around memory management, resilience, diagnostics, load balancing, spilling to disk, and so forth. We lose a lot of Dask's management when we switch into this mode.

So maybe Dask should start thinking more about collective actions. This is coming up often enough that it probably deserves more attention. That's a large conversation though and probably requires dedicated time from someone somewhat deeply familiar with Dask scheduler internals.

I think that, short term, we should continue with the current approach of using efficient single-core/gpu sorts and shuffling techniques currently done in dask.dataframe. We should tune these to the extent that we can both by making the single-gpu sort algorithms faster, the book keeping operations faster, and the communication faster. If this isn't enough then we should investigate collective actions as discussed above, but that should be part of a larger effort than just sorting.

datametrician commented 5 years ago

I agree with this approach. Thrust has a good single GPU sorting, and I think UCX should help tremendously. Any thoughts on book keeping, or was that more of a placeholder for if we need it?

mrocklin commented 5 years ago

There are two forms of book keeping that are relevant here:

  1. The slicing and concatenation of cudf dataframes. This is largely a test of memory management.
  2. The administrative tracking of tasks within the scheduler. This becomes more of an issue as we scale out to more nodes.
mrocklin commented 5 years ago

There are currently two implementations for this, one in dask.dataframe and one in dask-cudf which uses a batcher sorting net. While most dask-cudf code has been removed in favor of the dask.dataframe implementations this sorting code has remained, mostly because I don't understand it fully, and don't know if there was a reason for this particular implementation.

Why was this implementation chosen? Was this discussed somewhere? Alternatively @sklam, do you have any information here?

@sklam can you expand on the motivation behind your use of batcher sort net? Why was this decided on rather than the approach take in the mainline dask dataframe codebase?

sklam commented 5 years ago

@mrocklin, I was trying to avoid the following of the mainline sort/shuffle:

I chose the sorting network because:

mrocklin commented 5 years ago

OK, so these concerns seem similar to the concerns we had while designing the various shuffle algorithms for dask dataframe/bag. I'm not seeing anything here that is highly specific to GPU computation. (please correct me if I'm wrong).

My inclination then is to try to unify both CPU and GPU around a single implemenatation, probably starting with the one in dask.dataframe, but then maybe we would consider batcher-sorting-networks as an alternative for both, rather than just for dask-cudf, after doing some more extensive benchmarking.

Thanks for the information @sklam , I really appreciate the continuity here.

mrocklin commented 5 years ago

I recently took a look at Dask dataframe's task-based shuffle and improved docstrings here in order to help others dive in: https://github.com/dask/dask/pull/4674/files

I think that we need the following:

mrocklin commented 5 years ago

Also, just to direct people, I think that the core function that we'll have to make work is rearrange_by_column_tasks. I think that all of set_index/merge/sort_values can be made to rely on functionality there.

mtjrider commented 5 years ago

@mrocklin @datametrician I have a vested interest in seeing this work succeed, and will begin by implementing the solution path you've outlined above. If we need to get into the specifics of MPI/NCCL, we can cross that bridge later. I think a general solution which puts Dask first is going to help the ecosystem the most.

mrocklin commented 5 years ago

A minimal test would look something like the following:

import cudf, dask.dataframe as dd

# Make a dataframe, we'd like to divide the data by y
df = cudf.DataFrame({'x': [1, 2, 3, 4, 5, 6], 'y': [0, 4, 0, 4, 0, 4]})

# Split it up into a few partitions with Dask
ddf = dd.from_pandas(df, npartitions=3)

# Try to create a new dataframe with two partitions sorted on the y column, split by y 0->2 and 2->4
out = dd.shuffle.rearrange_by_divisions(ddf, column='y', divisions=[0, 2, 4], shuffle='tasks')

# compute in a single thread so it's easy to use %pdb and %debug
out.compute(scheduler='single-threaded')
mrocklin commented 5 years ago

I just tried this and ran into an minor problem of cudf.DataFrame.drop not supporting the axis= keyword. As a suggestion, these errors are easier to identify if you remove some of Dask's error reporting with the following diff

diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py
index 6a08af9..894fba6 100644
--- a/dask/dataframe/core.py
+++ b/dask/dataframe/core.py
@@ -3736,8 +3736,8 @@ def _emulate(func, *args, **kwargs):
     Apply a function using args / kwargs. If arguments contain dd.DataFrame /
     dd.Series, using internal cache (``_meta``) for calculation
     """
-    with raise_on_meta_error(funcname(func), udf=kwargs.pop('udf', False)):
-        return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
+    kwargs.pop('udf')
+    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
mrocklin commented 5 years ago

https://github.com/rapidsai/cudf/issues/1074

mrocklin commented 5 years ago

I imagine that, like with the groupby aggregations work, this will end up triggering many small PRs in cudf.

mrocklin commented 5 years ago

Potential fix here: https://github.com/rapidsai/cudf/pull/1396

mrocklin commented 5 years ago

Next thing I run into, searchsorted

In [1]: import cudf, dask.dataframe as dd
   ...:
   ...: # Make a dataframe, we'd like to divide the data by y
   ...: df = cudf.DataFrame({'x': [1, 2, 3, 4, 5, 6], 'y': [0, 4, 0, 4, 0, 4]})
   ...:
   ...: # Split it up into a few partitions with Dask
   ...: ddf = dd.from_pandas(df, npartitions=3)
   ...:
   ...: # Try to create a new dataframe with two partitions sorted on the y column, split by y 0->2 and 2->4
   ...: dd.shuffle.rearrange_by_divisions(ddf, column='y', divisions=[0, 2, 4], shuffle='tasks')
Out[1]: <dask_cudf.DataFrame | 40 tasks | 2 npartitions>

In [2]: _.compute(scheduler='single-threaded')
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
... <removed for clarity>
TypeError: can't compute boolean for <class 'cudf.dataframe.series.Series'>

In [3]: debug
> /home/nfs/mrocklin/cudf/python/cudf/dataframe/series.py(325)__bool__()
    323         into a boolean.
    324         """
--> 325         raise TypeError("can't compute boolean for {!r}".format(type(self)))
    326
    327     def values_to_string(self, nrows=None):

ipdb> up
> /home/nfs/mrocklin/miniconda/envs/cudf/lib/python3.7/site-packages/pandas/core/series.py(2337)searchsorted()
   2335             sorter = ensure_platform_int(sorter)
   2336         result = self._values.searchsorted(Series(value)._values,
-> 2337                                            side=side, sorter=sorter)
   2338
   2339         return result[0] if is_scalar(value) else result

ipdb>
> /home/nfs/mrocklin/dask/dask/dataframe/shuffle.py(434)set_partitions_pre()
    432
    433 def set_partitions_pre(s, divisions):
--> 434     partitions = pd.Series(divisions).searchsorted(s, side='right') - 1
    435     partitions[(s >= divisions[-1]).values] = len(divisions) - 2
    436     return partitions

ipdb>
mrocklin commented 5 years ago

Searchsorted work is happening here: https://github.com/rapidsai/cudf/pull/2156

rjzamora commented 5 years ago

Just a minor update here - After #2156 goes through, the next issue we run into is the very next line of set_partitions_pre:

~/workspace/cudf-dask-devel/dask/dask/dataframe/shuffle.py in set_partitions_pre(s, divisions)
    546
    547 def set_partitions_pre(s, divisions):
    548     partitions = pd.Series(divisions).searchsorted(s, side='right') - 1
--> 549     partitions[(s >= divisions[-1]).values] = len(divisions) - 2
    550     return partitions

AttributeError: 'Series' object has no attribute 'values'

Since values is not a cudf.Series property, perhaps the solution is as simple as making it one (and just returning the column values on host)?

rjzamora commented 5 years ago

Adding a simple values property to cudf.Series does seem to get us through the small test. Changes to cudf/python/cudf/cudf/dataframe/series.py:

@property
def values(self):
    return self._values

@property
def _values(self):
    return self.to_pandas().values

Test:

In [1]: import cudf, dask.dataframe as dd 
   ...:  
   ...: # Make a dataframe, we'd like to divide the data by y 
   ...: df = cudf.DataFrame({'x': [1, 2, 3, 4, 5, 6], 'y': [0, 4, 0, 4, 0, 4]}) 
   ...:  
   ...: # Split it up into a few partitions with Dask 
   ...: ddf = dd.from_pandas(df, npartitions=3) 
   ...:  
   ...: # Try to create a new dataframe with two partitions sorted on the y column, split by y 0->2 and 2->4 
   ...: out = dd.shuffle.rearrange_by_divisions(ddf, column='y', divisions=[0, 2, 4], shuffle='tasks') 
   ...:  
   ...: # compute in a single thread so it's easy to use %pdb and %debug 
   ...: print(out.compute(scheduler='single-threaded'))                                                                                                                                  
   x  y
0  1  0
2  3  0
4  5  0
1  2  4
3  4  4
5  6  4

@kkraus14 - Is there a reason we might want to avoid adding a values property to Series (especially one that returns a non-device array)?

ayushdg commented 5 years ago

@rjzamora Just tried a small example of set_index this morning and saw the same issue of missing the values property. #2395

jrhemstad commented 5 years ago

@kkraus14 - Is there a reason we might want to avoid adding a values property to Series (especially one that returns a non-device array)?

So copying an entire column from device to host? That would be very expensive.

ayushdg commented 5 years ago
548     partitions = pd.Series(divisions).searchsorted(s, side='right') - 1

One issue I see above is a hard dependency on pandas.

549 partitions[(s >= divisions[-1]).values] = len(divisions) - 2

If partitions were to be a cudf series instead of a pandas series , values would not need to be a non-device array

rjzamora commented 5 years ago

@jrhemstad Sorry - That was a silly question!

@ayushdg - Right. I guess I am just unsure of the best way (if possible) to handle both pandas and cudf using the same logic in Dask, but I haven't exactly tried much yet :)

mrocklin commented 5 years ago

Some options to explore:

  1. Can .values return something like a cupy array (if it is installed)?
  2. Do we strictly need to use .values? Is this because the result of searchsorted is a numpy array rather than a Series?

    def set_partitions_pre(s, divisions):
        partitions = pd.Series(divisions).searchsorted(s, side='right') - 1
        partitions[(s >= divisions[-1]).values] = len(divisions) - 2
        return partitions

I agree that it would be good not to call .to_pandas() here and try to keep things on the device.

See also some conversation about Series.values here: https://github.com/rapidsai/cudf/issues/1824

ayushdg commented 5 years ago

Also see: #2373 which attempts to add .values support by moving to host. Not sure if it's the best option. There is some discussion echoing this point on that pr as well.

rjzamora commented 5 years ago

@mrocklin I explored some options to avoid copying the entire column from device to host. I made this simple gist with some experiments.

For a dataframe with 1e8 rows, the dask+pandas version takes about 13.5s, while the dask+cudf version takes about 7.6s. If I use a to_pandas()-based values property, the operation takes ~11s

Note that the gist also shows that the cudf version of searchsorted is a bit different than the pandas version (it returns a series, rather than a numpy array). However, this seems reasonable to me..

kkraus14 commented 5 years ago

7.6s sounds very high, what's taking the time in this situation? Could you share a profile?

rjzamora commented 5 years ago

7.6s sounds very high, what's taking the time in this situation? Could you share a profile?

@kkraus14 agreed - I'll take a closer look and collect a profile

mrocklin commented 5 years ago

Note that the gist also shows that the cudf version of searchsorted is a bit different than the pandas version (it returns a series, rather than a numpy array). However, this seems reasonable to me..

So lets take a look at the set_partitions_pre function for a moment

https://github.com/dask/dask/blob/e0a77233fd4c5c3fc0ea8d533145778289c69078/dask/dataframe/shuffle.py#L547-L550

def set_partitions_pre(s, divisions):
    partitions = pd.Series(divisions).searchsorted(s, side="right") - 1
    partitions[(s >= divisions[-1]).values] = len(divisions) - 2
    return partitions

The reason that .values has come up is in the second line, where presumably we're trying to index into partitions with something. If it's a numpy array (as is the case when s is a pandas.Series) then we apparently need a .values call (although I would be a surprised if NumPy couldn't figure things out here). (for background, I think that this is saying "Hey! Every row that is targetting something beyond the last parittion, go to the last partition instead!" I have no idea why we need that, but there is probably some test for it that would fail that comes up in some corner case (this is worth checking to make sure though).

So, as you say, we might have a type issue in that cudf returns a series rather than an array-like. This seems to me, as you say, a sensible choice of cudf's part. So if we focus on just this function we might do a few things:

  1. Implement .values and coerce things to an array-like
  2. Not implement .values but keep things as a series all the way through, perhaps with an if is_arraylike(partitions) branch
  3. ??

To figure out the right approach we probably need to zoom out a bit and look at the context in which this is called, and what happens next to these objects. Are the next operations also array/dataframe agnostic, or will we need to coerce at some point anyway?

It looks like the next thing that happens is that they get assigned back into the dataframe. So if we go the array route it would be good to ensure that this is possible. If we go the series route then we might not care.

https://github.com/dask/dask/blob/e0a77233fd4c5c3fc0ea8d533145778289c69078/dask/dataframe/shuffle.py#L180

        df2 = df.assign(_partitions=partitions)

I'm curious how much works if we add an is_arraylike condition like the following:

def set_partitions_pre(s, divisions):
    partitions = pd.Series(divisions).searchsorted(s, side="right") - 1
    index = s > divisions[-1]
    if is_arraylike(partitions):
        index = index.values
    partitions[index] = len(divisions) - 2
    return partitions

Or if we can remove the .values call entirely, or if this causes something to break, what is it, and should we fix that instead?

def set_partitions_pre(s, divisions):
    partitions = pd.Series(divisions).searchsorted(s, side="right") - 1
    partitions[s > divisions[-1]] = len(divisions) - 2
    return partitions
rjzamora commented 5 years ago

Thanks for the feedback @mrocklin! Just to provide a bit more information: For the "cupy" experiment I shared, I am using a custom set_partitions_pre_cupy_series function, which takes in divisions as a series (e.g. cupy.Series):

def set_partitions_pre_cupy_series(s, divisions):
    partitions = (divisions.searchsorted(s, side="right") - 1).values_cupy
    partitions[(s >= divisions.iloc[-1]).values_cupy] = len(divisions) - 2
    return partitions

This ultimately returns partitions as a cupy array. To do this, I added a (rough) values_cupy property to Series:

@property
def values_cupy(self):
     try:
         import cupy
         return cupy.asarray(self._column._data.mem)
    except ImportError:
        return self.to_array()

Overall, I am thinking it is likely best to do something like the is_arraylike(partitions) branch you mentioned. It may also make sense to remove the explicit pandas dependency by changing the divisions input type to be a series. The question I am struggling with at the moment is how to efficienly reset the partitions elements in a general way

I have no idea why we need that, but there is probably some test for it that would fail that comes up in some corner case (this is worth checking to make sure though).

This actually happens for every value belonging to the last partition here. So, might not be much of a corner case.

Regarding the values property: #2373 added this today. So, we don't have to worry about implementing a host-based values definition if it is needed.

Regarding performance:

As far as I can tell, a good chunk of time is likely python/dask. Otherwise, there seems to be a good chunk of time (~45%) in Dask's shuffle_group, which does make some pandas and numpy-specific calls.

rjzamora commented 5 years ago

Just a followup - The test I described above uses cupy to calculate the new partition of each row. Unfortunately, the next phase of actually sorting the dataframe into these new partitions requires everything to be copied to host anyway.

The actual splitting of the dataframe into new partitions (within each of the original partitions) is accmplished in shuffle_group (and shuffle_group_2). My profiling indeed shows that these functions account for much of the rearrange_by_divisions call. Here is shuffle_group (shuffle_group_2 is similar):

def shuffle_group(df, col, stage, k, npartitions):
    """ Splits dataframe into groups ..."""
    if col == "_partitions":
        ind = df[col] # True in our case
    else:
        ind = hash_pandas_object(df[col], index=False)

    c = ind._values  # Need `_values` property in cudf series for this
    typ = np.min_scalar_type(npartitions * 2)

    c = np.mod(c, npartitions).astype(typ, copy=False)
    np.floor_divide(c, k ** stage, out=c)
    np.mod(c, k, out=c)

    indexer, locations = groupsort_indexer(c.astype(np.int64), k)  # Numpy/pandas specific
    df2 = df.take(indexer)
    locations = locations.cumsum()
    parts = [df2.iloc[a:b] for a, b in zip(locations[:-1], locations[1:])]

    return dict(zip(range(k), parts))

Like set_partitions_pre, this function introduces a couple issues for cudf:

Overall, the groupsort_indexer function is pretty simple. It takes in an array of groups (partition numbers in our case), and the total number of groups (number of partitions). The output is the new group-sorted index, and an array with the number of items in each group. @kkraus Does a similar algorithm already exist in cudf?

mrocklin commented 5 years ago

cc'ing @kkraus14 (note the 14 in his username)

Repeating the question here:

Overall, the groupsort_indexer function is pretty simple. It takes in an array of groups (partition numbers in our case), and the total number of groups (number of partitions). The output is the new group-sorted index, and an array with the number of items in each group. @kkraus Does a similar algorithm already exist in cudf?

mrocklin commented 5 years ago

I think that in private conversation Keith also mentioned that there may be some operation in cudf that does several of these steps all at once, which may be another approach

On Wed, Jul 31, 2019 at 1:59 PM Richard (Rick) Zamora < notifications@github.com> wrote:

Just a followup - The test I described above uses cupy to calculate the new partition of each row. Unfortunately, the next phase of actually sorting the dataframe into these new partitions requires everything to be copied to host anyway.

The actual splitting of the dataframe into new partitions (within each of the original partitions) is accmplished in shuffle_group (and shuffle_group_2). My profiling indeed shows that these functions account for much of the rearrange_by_divisions call. Here is shuffle_group ( shuffle_group_2 is similar):

def shuffle_group(df, col, stage, k, npartitions): """ Splits dataframe into groups ...""" if col == "_partitions": ind = df[col] # True in our case else: ind = hash_pandas_object(df[col], index=False)

c = ind._values  # Need `_values` property in cudf series for this
typ = np.min_scalar_type(npartitions * 2)

c = np.mod(c, npartitions).astype(typ, copy=False)
np.floor_divide(c, k ** stage, out=c)
np.mod(c, k, out=c)

indexer, locations = groupsort_indexer(c.astype(np.int64), k)  # Numpy/pandas specific
df2 = df.take(indexer)
locations = locations.cumsum()
parts = [df2.iloc[a:b] for a, b in zip(locations[:-1], locations[1:])]

return dict(zip(range(k), parts))

Like set_partitions_pre, this function introduces a couple issues for cudf:

  • We are assuming a _values property exists for the series/column being used for partitioning. This is not true for cudf (I believe even

    2373 https://github.com/rapidsai/cudf/pull/2373 left this out).

  • Pandas' groupsort_indexer function requires that a numpy array be input for the first argument. This is fine if we return a numpy array for _values, but not if we use a zero-copy cupy array

Overall, the groupsort_indexer function https://github.com/pandas-dev/pandas/blob/c7a1321029e07ee6d7ea30036649b488b2e362f7/pandas/_libs/algos.pyx#L151 is pretty simple. It takes in an array of groups (partition numbers in our case), and the total number of groups (number of partitions). The output is the new group-sorted index, and an array with the number of items in each group. @kkraus https://github.com/kkraus Does a similar algorithm already exist in cudf?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/rapidsai/cudf/issues/2272?email_source=notifications&email_token=AACKZTBLIIHEKHAFSZ45R2TQCH4JLA5CNFSM4ID3GGRKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD3IRHYQ#issuecomment-517018594, or mute the thread https://github.com/notifications/unsubscribe-auth/AACKZTDDFE3WSK5FT2TUBLLQCH4JLANCNFSM4ID3GGRA .

kkraus14 commented 5 years ago

We currently only have a hash partitioner function which does the equivalent of hash the input dataframe based on some key column(s), run a modulo against the number of partitions N, split the dataframe into N dataframes based on the hash % N.

This doesn't seem quite the same, no?

mrocklin commented 5 years ago

No, not quite the same.

At this point we have a dataframe with a new column that has the output partition of each row. We now want to split our dataframe up into many small dataframes, one for each output partition.

We do this by creating an indexer which will effectively sort things by that column, and then calling take to do the reordering, and then calling iloc many times to split things apart (though this could be a bulk operation in preferred

    c = ... # the output partition locations
    indexer, locations = groupsort_indexer(c.astype(np.int64), k)  # link to this above in Rick's comment
    df2 = df.take(indexer)
    locations = locations.cumsum()
    parts = [df2.iloc[a:b] for a, b in zip(locations[:-1], locations[1:])]
jrhemstad commented 5 years ago

At this point we have a dataframe with a new column that has the output partition of each row. We now want to split our dataframe up into many small dataframes, one for each output partition.

So in libcudf we already have a gdf_hash_partition algorithm that does this, but uses the hash values of DF rows to determine the partition mapping. (It would appear the shuffle_group algorithm has the option to do this in the else branch).

It would appear that here you're instead using an arbitrary column of values to determine the partition mapping.

It would be fairly trivial to split up and generalize the gdf_hash_partiton function to do what you need here.

All of this code could be replaced with a single libcudf function that I'm guessing will be orders of magnitude faster.

    c = np.mod(c, npartitions).astype(typ, copy=False)
    np.floor_divide(c, k ** stage, out=c)
    np.mod(c, k, out=c)

    indexer, locations = groupsort_indexer(c.astype(np.int64), k)  # Numpy/pandas specific
    df2 = df.take(indexer)
    locations = locations.cumsum()
    parts = [df2.iloc[a:b] for a, b in zip(locations[:-1], locations[1:])]
mrocklin commented 5 years ago

Right, we're being a little bit careful about where things go. Sometimes we move them around in multiple stages, or have particular destinations in mind based on values.

Putting all of those lines into a single operation sounds fine from my perspective.

On Thu, Aug 1, 2019 at 1:38 PM Jake Hemstad notifications@github.com wrote:

At this point we have a dataframe with a new column that has the output partition of each row. We now want to split our dataframe up into many small dataframes, one for each output partition.

So in libcudf we already have a gdf_hash_partition algorithm that does this, but uses the hash values of DF rows to determine the partition mapping.

It would appear that here you're instead using an arbitrary column of values to determine the partition mapping.

It would be fairly trivial to split up and generalize the gdf_hash_partiton function to do what you need here.

All of this code could be replaced with a single libcudf function that I'm guessing will be orders of magnitude faster.

c = np.mod(c, npartitions).astype(typ, copy=False)
np.floor_divide(c, k ** stage, out=c)
np.mod(c, k, out=c)

indexer, locations = groupsort_indexer(c.astype(np.int64), k)  # Numpy/pandas specific
df2 = df.take(indexer)
locations = locations.cumsum()
parts = [df2.iloc[a:b] for a, b in zip(locations[:-1], locations[1:])]

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/rapidsai/cudf/issues/2272?email_source=notifications&email_token=AACKZTH5N4HMEBLJMOFCEHLQCNCV3A5CNFSM4ID3GGRKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD3L2LFY#issuecomment-517449111, or mute the thread https://github.com/notifications/unsubscribe-auth/AACKZTBMW7OP43M7K467QMDQCNCV3ANCNFSM4ID3GGRA .

jrhemstad commented 5 years ago

Right, we're being a little bit careful about where things go. Sometimes we move them around in multiple stages, or have particular destinations in mind based on values. Putting all of those lines into a single operation sounds fine from my perspective.

I'm a bit confused because that sounds different than what you originally described.

To be clear, what I'm envisioning is an algorithm that does the following:

Inputs:

Outputs:

This is more generic than I originally envisioned. There's opportunity for optimizing this further by combining in the step of first generating the partition_map.

mrocklin commented 5 years ago

That sounds good to me

jrhemstad commented 5 years ago

Out of curiosity, how is the partition_map being generated for this particular use case? Obviously it has to do with which partition a row maps to based on the sorting, but I'm curious how that's being determined.

rjzamora commented 5 years ago

Thanks for the input @jrhemstad @kkraus14 @mrocklin! This is super useful. Sorry - I didn't see this recent discussion until now.

rjzamora commented 5 years ago

Out of curiosity, how is the partition_map being generated for this particular use case? Obviously it has to do with which partition a row maps to based on the sorting, but I'm curious how that's being determined.

If I follow the question correctly, the partition map (partitions) is currently determined using searchsorted:

partitions = pd.Series(divisions).searchsorted(s, side="right") - 1

where divisions is a list of divisions along the index/column (s) that we want to partition on.

jrhemstad commented 5 years ago

So it sounds like the size of partitions is equal to the number of desired partitions. That's not quite the same thing as the partition_map I described, which is equal in size to the number of rows in the DF being partitioned.

How do you determine the mapping of each row in the DF to a particular partition? (this is what the partition_map I was describing is)

rjzamora commented 5 years ago

In the case I described, the size of partitions is indeed equal to the number of rows in the DF being partitioned, because s corresponds to a full column.

jrhemstad commented 5 years ago

~What is the size of divisions? I think I'm misunderstanding how searchsorted works.~

Nevermind, I understand now. divisions is equal in size to the number of partitions, and you're using search_sorted to generate the partition_map from divisions.

For my own elucidation, how is divisions determined?

It would actually be very useful if you could just provide a high level description of the distributed sorting algorithm.

It would also be helpful to understand how many partitions there will be, a rough range of orders of magnitude would be sufficient.

rjzamora commented 5 years ago

Thats right - divisions is just a list with length (npartitions + 1). There are ways divisions can be calculated, but you can really just think of divisions as a user input for now. For example, the user can specify that they want to partition along some column y using divisions = [0,0.3,0.6].

To do this, they might call:

df  = pd.DataFrame( {'x': [i for i in range(size)], 'y': np.random.randn(size)} )
ddf = dd.from_pandas(df, npartitions=3)

out = dd.shuffle.rearrange_by_divisions(ddf, column='y', divisions=[0,0.3,0.6], shuffle='tasks')
result_df = out.compute()

rearrange_by_divisions will start by calculating the partition_map (partitions) you described (using search_sorted). The function looks like this:

def set_partitions_pre(s, divisions):
    partitions = pd.Series(divisions).searchsorted(s, side="right") - 1
    partitions[(s >= divisions[-1]).values] = len(divisions) - 2
    return partitions

After we have partitions, we need call rearrange_by_column to perform the actual shuffling (either in disk or in memory). On a high level, the in-memory version (rearrange_by_column_tasks) does two things:

My understanding is that the number of partitions is typically orders of magnitude smaller than the size of the dataframe in practice (in order for each worker/task to have sufficient data/computation for efficiency)

@mrocklin feel free to correct me or expand.

jrhemstad commented 5 years ago

Thanks @rjzamora that's very helpful.

I'm trying to think what the right primitive(s) is for libcudf/cuDF to expose to accelerate this operation.

We have a few options:

  1. We could make a primitive that could entirely replace rearrange_by_divisions
    • You give us a DF and a set of divisions, and we'll partition up the DF
    • This would likely be the fastest
  2. We could keep Dask's rearrange_by_division, and instead replace shuffle_group

Curious what @kkraus14 @harrism think the right primitive to expose is here.

rjzamora commented 5 years ago

I might be missunderstanding, but since rearrange_by_column_tasks operates on a dask dataframe, the libcudf/cuDF primitives would most likely need to be more fine-grained (along the lines of option 2). It would be great to be wrong about this :)

From my perspective, it seems useful to replace the set_partitions_pre function (shown above) with a libcudf/cuDF primitive, as well as shuffle_group, and possibly shuffle_group_2. The latter two are both based on pandas' groupsort_indexer at the moment. I will look through these functions in detail to get a better idea of if/how the logic can be combined.

mrocklin commented 5 years ago

In order to avoid centralized overhead, dask.dataframe will sometimes move things around in a couple of stages. What we need is the in-memory operation that occurs in each stage. This operation is shuffle_group. Note that it includes a bit of finicky math here:

    c = ind._values
    typ = np.min_scalar_type(npartitions * 2)

    c = np.mod(c, npartitions).astype(typ, copy=False)
    np.floor_divide(c, k ** stage, out=c)
    np.mod(c, k, out=c)

And then the actual splitting

    indexer, locations = groupsort_indexer(c.astype(np.int64), k)
    df2 = df.take(indexer)
    locations = locations.cumsum()
    parts = [df2.iloc[a:b] for a, b in zip(locations[:-1], locations[1:])]

    return dict(zip(range(k), parts))

I think that a single function in cudf that did all of this would be great. I would also understand if cudf only wanted to implement the second part, which is a bit more generally useful.

jrhemstad commented 5 years ago

@mrocklin can you explain the finicky math part?

It sounds like the partition mapping returned by set_partitions_pre isn't the final mapping and requires some additional modification.

mrocklin commented 5 years ago

I've explained the motivation here as a PR with a nicer docstring. If you need the specific approach let me know. It'll probably take some time to write down, so I haven't done it yet. If you're just curious here then I'll pass for now in the interests of time. If it's important for solving this problem then I'll prioritize this appropriately.

Next time we're together perhaps we can find a whiteboard, go through it, and then I can pester you to write it up into docs somewhere? :)

On Fri, Aug 2, 2019 at 8:59 AM Jake Hemstad notifications@github.com wrote:

@mrocklin https://github.com/mrocklin can you explain the finicky math part?

It sounds like the partition mapping returned by set_partitions_pre isn't the final mapping and requires some additional modification.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/rapidsai/cudf/issues/2272?email_source=notifications&email_token=AACKZTBFWPMOCJSTLSK53Y3QCRKW5A5CNFSM4ID3GGRKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD3OE7FA#issuecomment-517754772, or mute the thread https://github.com/notifications/unsubscribe-auth/AACKZTFLQF2DZ2TXUT7UBILQCRKW5ANCNFSM4ID3GGRA .