rapidsai / dask-cuda

Utilities for Dask and CUDA interactions
https://docs.rapids.ai/api/dask-cuda/stable/
Apache License 2.0
292 stars 93 forks source link

Update explicit-comms for dask-expr support #1323

Closed rjzamora closed 7 months ago

rjzamora commented 8 months ago

Makes a few ~small~ changes to explicit-comms to support dask-expr.

EDIT: The changes are no longer "small".

quasiben commented 8 months ago

I gave this a try and I'm seeing errors. It's been a while since I've run this query so it could be user error:

python local_cudf_merge.py -p ucx -d 0,1,2,3,4,5,6,7 -c 10_000_000

2024-03-20 12:49:49,925 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 637a8bdfdc46dc6083f254fce7023e81 initialized by task ('shuffle-transfer-637a8bdfdc46dc6083f254fce7023e81', 0) executed on worker ucx://127.0.0.1:50385
2024-03-20 12:49:50,535 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-transfer-637a8bdfdc46dc6083f254fce7023e81', 6)
Function:  shuffle_transfer
args:      (              key  shuffle  payload  _partitions
0        60000000        1  9459992            1
1        60000001        5  5005508            5
2        60000002        3  3183193            3
3        60000003        1   536845            1
4        60000004        3  8484645            3
...           ...      ...      ...          ...
9999995  69999995        3  9745632            3
9999996  69999996        5  8371927            5
9999997  69999997        3  8066013            3
9999998  69999998        1  3762504            1
9999999  69999999        7  9411065            7

[10000000 rows x 4 columns], '637a8bdfdc46dc6083f254fce7023e81', 6, 8, '_partitions', Empty DataFrame
Columns: [key, shuffle, payload, _partitions]
Index: [], {0, 1, 2, 3, 4, 5, 6, 7}, True, True)
kwargs:    {}
Exception: "RuntimeError('P2P shuffling 637a8bdfdc46dc6083f254fce7023e81 failed during transfer phase')"

And with explicit-comms:

python local_cudf_merge.py -p ucx -d 0,1,2,3,4,5,6,7 -c 10_000_000 -b explicit-comms

2024-03-20 12:51:47,067 - distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name="execute(('shuffle-transfer-637a8bdfdc46dc6083f254fce7023e81', 5))" coro=<Worker.execute() done, defined at /datasets/bzaitlen/miniconda3/envs/dask-expr-rapid
s-24.06/lib/python3.10/site-packages/distributed/worker_state_machine.py:3615>> ended with CancelledError
Traceback (most recent call last):
  File "/datasets/bzaitlen/miniconda3/envs/dask-expr-rapids-24.06/lib/python3.10/site-packages/distributed/shuffle/_core.py", line 494, in handle_transfer_errors
    yield
  File "/datasets/bzaitlen/miniconda3/envs/dask-expr-rapids-24.06/lib/python3.10/site-packages/distributed/shuffle/_shuffle.py", line 79, in shuffle_transfer
    return get_worker_plugin().add_partition(
  File "/datasets/bzaitlen/miniconda3/envs/dask-expr-rapids-24.06/lib/python3.10/site-packages/distributed/shuffle/_worker_plugin.py", line 346, in add_partition
    return shuffle_run.add_partition(
  File "/datasets/bzaitlen/miniconda3/envs/dask-expr-rapids-24.06/lib/python3.10/site-packages/distributed/shuffle/_core.py", line 343, in add_partition
    shards = self._shard_partition(data, partition_id)
  File "/datasets/bzaitlen/miniconda3/envs/dask-expr-rapids-24.06/lib/python3.10/site-packages/distributed/shuffle/_shuffle.py", line 521, in _shard_partition
    out = split_by_worker(
  File "/datasets/bzaitlen/miniconda3/envs/dask-expr-rapids-24.06/lib/python3.10/site-packages/distributed/shuffle/_shuffle.py", line 339, in split_by_worker
    t = to_pyarrow_table_dispatch(df, preserve_index=True)
  File "/datasets/bzaitlen/miniconda3/envs/dask-expr-rapids-24.06/lib/python3.10/site-packages/dask/utils.py", line 772, in __call__
    meth = self.dispatch(type(arg))
  File "/datasets/bzaitlen/miniconda3/envs/dask-expr-rapids-24.06/lib/python3.10/site-packages/dask/utils.py", line 766, in dispatch
    raise TypeError(f"No dispatch for {cls}")
TypeError: No dispatch for <class 'cudf.core.dataframe.DataFrame'>
rjzamora commented 8 months ago

I gave this a try and I'm seeing errors.

Hmm - I actually am expecting errors in this PR. (I just realized that I never addressed the fact that we literally patch/wrap a bunch of deprecated shuffling code to automatically dispatch to explicit-comms). That said, I wasn't expecting a local-cudf-merge problem, so I'll try to reproduce.

rjzamora commented 8 months ago

It looks like explicit-comms is going to be a pretty-big headache to migrate over to dask-expr :/

I have the basic benchmarks working locally, but there is no longer a clean way to support the "explicit-comms" config that we used to support by monkey-patching dask.dataframe. Dask-expr no longer uses a monolithic function (like rearrange_by_coumn) to inject a shuffle operation for sorting and joining. Instead, these operations are now a composition of abstract expressions. It may be necessary to deprecate/prohibit this behavior when query-planning is enabled (for now).

pentschev commented 8 months ago

It looks like explicit-comms is going to be a pretty-big headache to migrate over to dask-expr :/

I have the basic benchmarks working locally, but there is no longer a clean way to support the "explicit-comms" config that we used to support by monkey-patching dask.dataframe. Dask-expr no longer uses a monolithic function (like rearrange_by_coumn) to inject a shuffle operation for sorting and joining. Instead, these operations are now a composition of abstract expressions. It may be necessary to deprecate/prohibit this behavior when query-planning is enabled (for now).

Thanks for debugging this Rick. I must admit I'm not of any help when it comes to either Dask Dataframe or explicit-comms, on the other hand @madsbk may have great ideas here.

madsbk commented 8 months ago

Hmm, maybe it is time to deprecate the monkey patching of dask and tell users to call shuffle() directly? And maybe create a basic merge() function that uses shuffle() people can use?

Do you guys know many are using explicit-comms?

rjzamora commented 8 months ago

Hmm, maybe it is time to deprecate the monkey patching of dask and tell users to call shuffle() directly? And maybe create a basic merge() function that uses shuffle() people can use?

Yes, I think this route makes the most sense for now. I have a feeling that there are clean ways to introduce an "official" explicit-comms shuffle using the new API, but I'd like to take some time to get it right.

Do you guys know many are using explicit-comms?

The only "official" user that I personally know of is nemo curator. However, (1) they are not using the monkey patching approach, and (2) they do not develop against the latest RAPIDS version. cc @VibhuJawa @ayushdg

quasiben commented 8 months ago

I'm not aware of any external usage. It is useful to have for benchmarking and continue exploration of shuffling generally. With that said, I agree with @rjzamora we should take our time getting it right

rjzamora commented 8 months ago

CI failures without the hacky test-skip logic are due to a bad/old dask-expr version being pulled (https://github.com/dask/distributed/issues/8574)

rjzamora commented 7 months ago

@pentschev - The upstream nightly situation should be resolved now. You approved this PR a while back when it looked pretty different. Did you have time for another pass?

rjzamora commented 7 months ago

/merge