rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.43k stars 903 forks source link

[QUESTION] Merge algorithm is not optimal if merging dataframes of non-matching partitions/indices not being join keys #6757

Closed roe246 closed 3 years ago

roe246 commented 3 years ago

Merging data frames from multiple non-matching partitions of each data frame creates a lot of “shuffles” that may aggregate processing into one GPU vs distributed uniformly, and may spills to CPU or disk, which are extremely slow like in Spark.

It is hard to create a bug report because it is a scalability issue where the data needs to be very large and complex, with all sorts of prior dataframes being in different partitions with different indices than the join keys.

The expected behavior would be something like Spark/SQL handles, even without explicit partition info or index info, the merge() can be done faster in a more clear way, with the option broadcast small dataframes enabled.

Trying to align indices and partitions across the board is computation intense on its own and takes significant amount of time as well. Not sure how to deal with joins from dask-cudf's with non-matching index or partitions by join keys.

Pain points -

  1. Even with simple data transformation, joining multiple tables sequentially are processed with the implied task stream looks extremely complex and not intuitive. The task graph includes a ton of shuffle-split, shuffle-group, shuffle-join, reset_index, repartition, shuffle-shuffle etc.. going on during a complex join where not much matching between the df's
  2. Shuffles may aggregate processing into one GPU, not distributed uniformly, and may spills to CPU or disk, which are extremely slow like in Spark

Additionally, our upstream data production process is likely hesitant to change their process in Spark, especially given different requirements of using their production data.

rjzamora commented 3 years ago

Thanks for opening this issue @roe246

My intuition is that you are correct that certain workflows will benefit from a broadcast-based algorithm in lieu of the shuffle-based approach. Right now, dask_cudf relies on dask.dataframe for most of the merge logic. The upstream routine will use a broadcast-based approach when one of the DataFrame collections comprises a single partition. Otherwise, if the merge is not happening on an index with known divisions, it requires a shuffle. I do think it would make sense to allow the user to request a broadcast-based merge in other cases. For example, it may still make sense to broadcast a 10-partition collection in some cases.

The ideal place for a change like this to go in is dask.dataframe. However, it may make sense to experiment in dask_dataframe a bit before trying to move the changes upstream.

github-actions[bot] commented 3 years ago

This issue has been marked rotten due to no recent activity in the past 90d. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed.

github-actions[bot] commented 3 years ago

This issue has been marked stale due to no recent activity in the past 30d. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be marked rotten if there is no activity in the next 60d.

rjzamora commented 3 years ago

@roe246 - dask/dask#7143 was recently merged. That PR adds the broadcast= option to dask.dataframe.DataFrame.merge (and therfore to dask_cudf). The broadcast-algorithm will be used by default in many cases where one of your collections is much larger than the other, but you can force the broadcast (for supported conditions) with broadcast=True. Let us know if that change addresses this issue (if so, we can close this).

kkraus14 commented 3 years ago

Closing as this has been addressed.