rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.43k stars 903 forks source link

[FEA] Can't run full broadcast join a big cudf with a small series/list/sequence as a new column #6755

Closed roe246 closed 2 years ago

roe246 commented 3 years ago

I wish I could join a large cuDF with a small series/list/sequence in terms of full join in sql, or even better with the small series/list being broadcast for the full join like in spark sql, while there is no join key in the small series/list/sequence

I want a large cudf A to full join a small series/list/sequence of size n, so that each value of the small series can be a new column of A, with cudf A being repetitively expanded to n-fold.

For example, DF = {'id': ['a','b','c'], 'time': ['1', '2', '3']} full join series/list/sequence S = ['x', 'y', 'z'] would return DF = {'id': ['a','b','c','a','b','c','a','b','c'], 'time': ['1', '2', '3','1', '2', '3','1', '2', '3'], 'new_col': ['x', 'x', 'x', 'y', 'y', 'y', 'z', 'z', 'z']} When DF is super huge, we can broadcast S to avoid huge shuffles in the full join.

Currently, I have to loop over and append dataframes for each S segment, which doesn't seem to be efficient with linear incremental runtime and not much parallelization especially when size of S gets large, and even lazy evaluation takes ginormous amount of time to plan out the Dask graph.

In Spark/sparlyr, we establish a small sequence of size n seq <- sdf_seq(sc, from = 1, to = n, repartition = 1L) and then simply full join with seq broadcast to every worker .data %>% full_join(sdf_broadcast(seq), by = character())

beckernick commented 3 years ago

Thanks for filing an issue. The "outer" join type in Dask can be thought of as full outer join, but it appears that this doesn't go down the broadcast join codepath. Perhaps it might make sense to convert this issue to a feature request for broadcastable full outer joins on the Dask/dask repository (cc @yidong72 )

The provided example is a full join with no constraints, which is also known as cross join or cartesian join. The following example should illustrate how to do this as a broadcast join, using a temporary key column of a constant value.

import cudf
import dask_cudf
import numpy as np
from dask_cuda import LocalCUDACluster
from distributed import Client, wait

cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0,1")
client = Client(cluster)

nrows = 10000000
ncols = 10
bigdf = cudf.DataFrame(
{f"col_{x}": np.random.randint(0, 200, (nrows,)) for x in range(ncols)}
)

choices = ["w", "x", "y", "z", "q"]
smalldf = cudf.DataFrame({"relevant": choices})

bigdf["mutual_key"] = 0 # temporary key
smalldf["mutual_key"] = 0 # temporary key

When small_ddf has one partition, we will get the broadcast join.

big_ddf = dask_cudf.from_cudf(bigdf, 5).persist()
small_ddf = dask_cudf.from_cudf(smalldf, 1).persist()
wait([big_ddf, small_ddf]);
​
%time len(big_ddf.merge(small_ddf, on=["mutual_key"]))
CPU times: user 124 ms, sys: 16.6 ms, total: 141 ms
Wall time: 376 ms
50000000

When it has > 1 partition, we get a slower shuffle join.

big_ddf = dask_cudf.from_cudf(bigdf, 5).persist()
small_ddf = dask_cudf.from_cudf(smalldf, 2).persist()
wait([big_ddf, small_ddf]);
​
%time len(big_ddf.merge(small_ddf, on=["mutual_key"]))
CPU times: user 73.2 ms, sys: 3.21 ms, total: 76.4 ms
Wall time: 634 ms
50000000
beckernick commented 3 years ago

Will this be closed by https://github.com/dask/dask/pull/7143 @rjzamora ?

rjzamora commented 3 years ago

Will this be closed by dask/dask#7143 @rjzamora ?

Perhaps - The new broadcast_join code path does not apply to an "outer" merge, but it will cover the default ("inner") merge case if you manually add a "mutual_key" column (like in your example).

beckernick commented 3 years ago

Thanks for the summary!

github-actions[bot] commented 3 years ago

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.

rjzamora commented 2 years ago

I am going to close this issue because it has not been active for a long time, and it is technically possible to get the desired behavior with a temporary "mutual_key" column.

With that said, the desired merge type is now called a "cross" merge in pandas (added in 1.2). Therefore, I raised a separate issue to add this option: https://github.com/rapidsai/cudf/issues/12023