rapidsai / ucx-py

Python bindings for UCX
https://ucx-py.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
120 stars 57 forks source link

Dask-cudf multi partition merge slows down with `ucx` #402

Closed VibhuJawa closed 3 years ago

VibhuJawa commented 4 years ago

Dask-cudf multi partition merge slows down with ucx .

Dask-cudf merge seems to slow down with ucx .

Wall time: (15.4 seconds on tcp) vs (37.8 s on ucx) (exp-01)

In the attached example we see a slow down with ucx vs just using tcp .

Wall Times on exp-01

UCX Time

CPU times: user 19.3 s, sys: 1.97 s, total: 21.2 s
Wall time: 38.4 s
2945293

CPU times: user 16.7 s, sys: 1.71 s, total: 18.4 s
Wall time: 37.8 s
2943379

TCP times

CPU times: user 10.8 s, sys: 815 ms, total: 11.6 s
Wall time: 15.7 s
2944022

CPU times: user 10.9 s, sys: 807 ms, total: 11.7 s
Wall time: 15.4 s
2943697 

Repro Code:

Helper Function to create distributed dask-cudf frame


import dask_cudf
import cudf
import os
import time
import dask.dataframe as dd
import dask.array as da

from dask_cuda import LocalCUDACluster
from dask.distributed import Client,wait
from dask.utils import parse_bytes

def create_random_data(n_rows=1_000,n_parts = 10, n_keys_index_1=100_000,n_keys_index_2=100,n_keys_index_3=100, col_prefix = 'a'):

    chunks = n_rows//n_parts

    df = dd.concat([
        da.random.random(n_rows, chunks = chunks).to_dask_dataframe(columns= col_prefix + '_non_merge_1'),
        da.random.random(n_rows, chunks = chunks).to_dask_dataframe(columns= col_prefix + '_non_merge_2'),
        da.random.random(n_rows, chunks = chunks).to_dask_dataframe(columns= col_prefix + '_non_merge_3'),
        da.random.randint(0, n_keys_index_1, size=n_rows,chunks = chunks ).to_dask_dataframe(columns= col_prefix + '_0'),
        da.random.randint(0, n_keys_index_2, size=n_rows, chunks = chunks ).to_dask_dataframe(columns= col_prefix +'_1'),
        da.random.randint(0, n_keys_index_3, size=n_rows, chunks = chunks ).to_dask_dataframe(columns= col_prefix +'_2'),

    ], axis=1).persist()

    gdf = df.map_partitions(cudf.from_pandas)
    gdf =  gdf.persist()
    _ = wait(gdf)
    return gdf

RMM Setup:

def setup_rmm_pool(client):
    client.run(
        cudf.set_allocator,
        pool=True,
        initial_pool_size= parse_bytes("26GB"),
        allocator="default"
    )
    return None

setup_rmm_pool(client)

Merge Code:

The slow down happens on the merge step.

rows_1, parts_1 = 140_176_770, 245
rows_2, parts_2 = 21_004_393, 171

df_1 = create_random_data(n_rows= rows_1, n_parts = parts_1, col_prefix = 'a')
df_2 = create_random_data(n_rows= rows_2,  n_parts = parts_2, col_prefix = 'b')

merged_df = df_1.merge(df_2, left_on = ['a_0','a_1','a_2'], right_on = ['b_0','b_1','b_2'])
%time len(merged_df)

Additional Context:

There has been discussion about this on our internal slack channel, please see for more context.

jakirkham commented 4 years ago

Thanks Peter! No worries. It's better to be confident before it proliferates.

Well there's also still the overhead of the send/recv as Ben has pointed out as well. Have a few things in the works (most of which I think you've seen), which should help.

jakirkham commented 4 years ago

Added PR ( https://github.com/rapidsai/cudf/pull/4077 ) and PR ( https://github.com/dask/distributed/pull/3442 ), which should cut out the overhead involved in serializing as seen in the worker administrative profile earlier.

pentschev commented 4 years ago

Added PR ( rapidsai/cudf#4077 ) and PR ( dask/distributed#3442 ), which should cut out the overhead involved in serializing as seen in the worker administrative profile earlier.

Did you happen to profile that? Could you share some numbers, preferably for this workflow?

jakirkham commented 4 years ago

Not yet no. We have been trying a few approaches for the cuDF change based on feedback, impact, etc. So probably a little early to profile, but agree that would be a good thing to do.

jakirkham commented 4 years ago

Should add I'm guessing PR ( https://github.com/dask/distributed/pull/3453 ) will have more impact, but will need to work on it a bit more before we have something usable. As mentioned earlier the plan is to work @madsbk on profiling this.

kkraus14 commented 4 years ago

@jakirkham I would also add that if that PR turns out to be a nightmare there's likely options at the cudf serialization level / the algorithm level where we can make things return into one contiguous allocation that can be sent instead of breaking it down to the Column / Buffer level like we currently do.

jakirkham commented 4 years ago

Sorry for not updating this yet @kkraus14.

I think PR ( https://github.com/rapidsai/cudf/pull/4101 ) might be more approachable near term. Just waiting for CI to report. If you have a chance to look, that would be great (no pressure though). πŸ™‚

mrocklin commented 4 years ago

So about a week ago we had a meeting where people decided that it would be interesting to make sure that we were usIng RMM in the ucx-py code. Did someone run that experiment? If so, what was the result? It seems like people have moved on from that so I suspect that there was a finding. I would be curious if anyone has results to share.

jakirkham commented 4 years ago

Previously we were seeing Numba popping up quite a bit in the worker administrative profile in Dask. Mainly this was related to grabbing the context. This happened during things like grabbing __cuda_array_interface__ (with Distributed and UCX-Py) and serialization of cuDF data (as it creates Numba DeviceNDArrays internally to send). It also happened in spilling (due to costly creation and destruction), but was less obvious (at least from the profile).

We have since fixed the spilling issue by reading and writing directly to RMM DeviceBuffers that have much less overhead to work with. ( https://github.com/rapidsai/dask-cuda/pull/235 ) This is now in dask-cuda nightlies. As to cuDF serialization, there is a PR to make cuDF Buffer objects themselves serializable ( https://github.com/rapidsai/cudf/pull/4101 ), which eliminates the remaining Numba overhead.

After quickly running some tests, Numba is not featured in the worker profile at all. This isn't to say there are no more improvements to be made here (suspect there is still plenty of room to improve). That said, figured this step forward was worth sharing. It would be good if people can start profiling with these changes and report back.

Edit: We've now extended these improvements to StringColumns. ( https://github.com/rapidsai/cudf/pull/4111 )

mrocklin commented 4 years ago

I would love to see a performance report with the changes if anyone has access to a machine and the time to run the previous benchmark.

jakirkham commented 4 years ago

Yep that's a good idea @mrocklin. Just have been pushing in the last related fixes and getting some nightlies out to test with.

Have rerun on a DGX-1 with the latest nightlies and a ucx-py fix that @madsbk recently made (should produce nightlies soon). Also set UCX_RNDV_THRESHOLD=8192 for the UCX case. Ran using a notebook included the Gist (please point out errors anyone if you see any πŸ™‚). Here are the profiles I got for TCP and UCX.

TCP Profile

UCX Profile

pentschev commented 4 years ago

What about the time to execute the merge operation? I would be interested in seeing that before anything else.

jakirkham commented 4 years ago

Should add when looking at the UCX profile, I'm seeing a lot of time spent in .is_closed(...). Admittedly this might just be hiding other things happening at the C level, but wanted to point it out in case there was something else we should be doing here πŸ™‚

Screen Shot 2020-02-11 at 12 34 45 PM
jakirkham commented 4 years ago

What about the time to execute the merge operation? I would be interested in seeing that before anything else.

What do you mean? The overall runtime? If so, UCX remains a lot slower than TCP. So then the question is, "why?"

jakirkham commented 4 years ago

Should add I ran this on a DGX-1 as the DGX-2s are pretty heavily occupied ATM.

pentschev commented 4 years ago

What do you mean? The overall runtime? If so, UCX remains a lot slower than TCP. So then the question is, "why?"

Yes, I mean the overall/wall time on a DGX-2. I think this will give us some insight on potential speedups achieved, and hopefully present no regressions on performance either. My current best time runs around 16 seconds on a DGX-2 with NVLink and UCX_RNDV_THRESH=8192, which is still around 1 second slower than TCP.

jakirkham commented 4 years ago

Really? Only 1 second slower? Maybe I'm doing something wrong. Do you mind looking at my notebook briefly?

jakirkham commented 4 years ago

Have rerun on a DGX-2 using the same setup as before.


Here are the overall runtimes (based on %time in the notebook):

Protocol CPU time (user) CPU time (sys) CPU time (total) Wall time
TCP 11.7 s 965 ms 12.6 s 14.6 s
UCX 4min 37s 20.5 s 4min 58s 25min 5s

Here are the profiles I got for TCP and UCX:

TCP Profile

UCX Profile

pentschev commented 4 years ago

Running again, it's actually 2 seconds slower:

TCP Merge time: 14.165126085281372
UCX+NVLink Merge time Run 1: 16.27089762687683
UCX+NVLink Merge time Run 2: 15.756562232971191
UCX+NVLink Merge time Run 3: 16.542734384536743

Your code seems right, but 25 minutes as above seems absolutely off. I honestly don't know what could have happened, but I would say anything above 1 minute (and that's already very stretched) has something wrong in it.

EDIT: Just for a reminder from https://github.com/rapidsai/ucx-py/issues/402#issuecomment-579986636, my previous best without UCX_RNDV_THRESH=8192 was around 22 seconds.

jakirkham commented 4 years ago

Based on @pentschev’s debugging internally, it appears my Conda environment was hosed. We’re working on new reports.

pentschev commented 4 years ago

As discussed offline, it seems that @jakirkham 's environment has something off, I've tested that same environment and could confirm UCX was taking absurdly long.

I now created a new environment as follows:

conda create -n rapids-nightly-0.13 -c rapidsai-nightly -c nvidia -c conda-forge -c defaults cudatoolkit=10.1 rapids=0.13 python=3.7

And for the first time I saw better results for UCX (with NVLink) compared to TCP:

TCP Merge time Run 1: 15.855406522750854
TCP Merge time Run 2: 15.03106141090393
TCP Merge time Run 3: 14.64222264289856
UCX+NVLink Merge time Run 1: 13.793559312820435
UCX+NVLink Merge time Run 2: 13.05189847946167
UCX+NVLink Merge time Run 3: 13.032773733139038

Here are Dask reports for that:

Dask Report TCP Dask Report UCX+NVLink

mrocklin commented 4 years ago

To me it looks like we're still spending a lot of time creating and destroying rmm.DeviceBuffers in the UCX runs. Does that match your understanding as well?

mrocklin commented 4 years ago

Did we ever try enabling RMM at the top of ucp/__init__.py to make sure that it is always active?

jakirkham commented 4 years ago

Does appear that way. Not sure about people's experience. Maybe others can comment?

@pentschev added an RMM plugin to dask-cuda, which I'm guessing he's using here (though I could be wrong). ( https://github.com/rapidsai/dask-cuda/pull/236 ) Even without that people have been pretty good about enabling RMM in all workflows. So don't think it is an issue of not having RMM enabled.

More likely (to parrot @kkraus14 πŸ˜‰) this is showing us that RMM's pool allocator (CNMeM) is experiencing degrading performance due to lots of allocations/deallocations. Here's the numbers to back that up.

pentschev commented 4 years ago

Yes, we're always using RMM pool. Without RMM pool there's basically two scenarios:

  1. End up OOMing (implies UCX_CUDA_IPC_CACHE=y); or
  2. It's incredibly slow (implies UCX_CUDA_IPC_CACHE=n).
mrocklin commented 4 years ago

More likely (to parrot @kkraus14 wink) this is showing us that RMM's pool allocator (CNMeM) is experiencing degrading performance due to lots of allocations/deallocations. Here's the numbers to back that up.

Ah, I now see the reason to bulk allocate in the Dask UCX comm

jakirkham commented 4 years ago

That's one way to go about it at least. Another would be some sort of optimization at the graph level to avoid the need for as much communication to begin with ( https://github.com/dask/dask/issues/5809 ). Perhaps you have other ideas still? πŸ˜‰

mrocklin commented 4 years ago

I think that that optimization is generally a good idea, but that won't affect the particular profile results we're seeing here, right?

On Tue, Feb 11, 2020, 4:08 PM jakirkham notifications@github.com wrote:

That's one way to go about it at least. Another would be some sort of optimization at the graph level to avoid the need for as much communication to begin with ( dask/dask#5809 https://github.com/dask/dask/issues/5809 ). Perhaps you have other ideas still? πŸ˜‰

β€” You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/rapidsai/ucx-py/issues/402?email_source=notifications&email_token=AACKZTF3EYA4DXP4X7QQDKLRCM4YVA5CNFSM4KM5MV42YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOELOVTWQ#issuecomment-584931802, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTALUXEGO7RUGM2VZH3RCM4YVANCNFSM4KM5MV4Q .

jakirkham commented 4 years ago

Honestly I haven't looked into that thread in any detail. So I can't say.

Anyways open to other ideas if something occurs to you πŸ™‚

VibhuJawa commented 4 years ago

I think that that optimization is generally a good idea, but that won't affect the particular profile results we're seeing here, right?

Yup it really should not effect these profile results.

Thats just to prevent redundant columns being transferred over which is not the case here.

jakirkham commented 4 years ago

Sure though redundant columns being transferred would exacerbate the problem we are seeing here.

jakirkham commented 4 years ago

We can also get a sense that the memory pool is degrading in performance by looking at the first allocations made when copying in the data from host compared to later allocations during transfers.


Initial allocations:

Screen Shot 2020-02-11 at 6 18 48 PM

Allocations for transfers:

Screen Shot 2020-02-11 at 6 19 56 PM
kkraus14 commented 4 years ago

The other option we have is to possibly dispatch to some different cuDF methods in the case of the shuffle functions / other memory allocation heavy functions to contiguous_* APIs that return multiple DataFrames all backed by a single RMM allocation.

It should relieve the allocation pressure on the sender and then we can also look at optimizing the sends / receives to send entire DataFrames as one contiguous chunk of memory instead of going down inside of Columns.

harrism commented 4 years ago

Another option is to optimize RMM... cnmem uses a free list which requires linear search. I may be able to resurrect my prototype allocator which is easier to modify to use a set or other tree-based data structure.

jakirkham commented 4 years ago

Yep these are all good ideas.

Using bigger allocations less frequently generally seems helpful (where possible). Perhaps we will see the downside of that if we go too far (as Keith mentioned to me offline)?

Improving the allocator's performance also seems really useful.


While I think our working theory is reasonable, I'd like us to consider a few other possible theories/poke holes in our existing one (we may rule them out quickly):

  1. What if we are doing a ton of spilling?
  2. What if we are having a lot of failed transmissions?
  3. ?

If 1 is true, we also have a lot of allocations happening when we restore spilled data. So could also degrade RMM performance over time. Is this (not) happening? If so, what denies/confirms that hypothesis?

If 2 is true, we could get a lot of allocations due to trying to receive data and failing for some reason only to try again with a fresh allocation. Is there any indication we are seeing this?

Finally what other things might be causing frequent allocations/deallocations? The answer might be none, but it's worth pausing for a moment and making sure this is true. πŸ™‚

harrism commented 4 years ago

For my efforts on RMM, can you help me get some data about the allocations?

  1. Total number of allocations / deallocations
  2. Size distribution: min, max, mean
  3. Any ordering information you can provide, e.g. FIFO, LIFO, completely random, etc.

Any or all of the above will be helpful. Total number is most important and probably easiest for you to provide.

Thanks!

kkraus14 commented 4 years ago

1) Do you want the number of allocations / deallocations per process or total across all the processes? I'd assume the former but want to be sure.

2) This may be a bit difficult to get as Python doesn't track the allocations anywhere outside of normal reference counting.

3) Generally random with a skew towards FIFO. In general many of the allocations are likely temporaries that are freed relatively quickly, but things can stay alive arbitrarily because of the nature of Python + reference counting.

harrism commented 4 years ago
  1. For a process where you think RMM allocation / free is expensive.

  2. Use RMM logging. Need to set this variable to true (https://github.com/harrism/rmm/blob/153224aa51da27fc1d6478a8997d31d2a5d9e48a/include/rmm/rmm.hpp#L55), recompile, and then enable logging in the RMM initialization. Then you would need to call rmm.csv_log() (can do that from a notebook and then analyze in Pandas/cuDF if you want).

  3. I will just go with random.

harrism commented 4 years ago

I have a simple benchmark that allocates N random blocks from 1 to k bytes, freeing with a certain probability at each iteration, or when the maximum memory size is reached. I can use this to profile and optimize, I think. With 100,000 allocations of at most 2MB and a max allocated size of 87% of 16GB, it takes 30s with cnmem. If you think this is a sufficiently close comparison, I'll just use it. Or you can give me different parameters.

kkraus14 commented 4 years ago

That sounds good to me. Additionally I wrote a quick Python script with much more synthetic behavior: https://github.com/rapidsai/ucx-py/issues/402#issuecomment-580776895 which could be good to use for seeing performance across allocations versus frees as well.

jakirkham commented 4 years ago

As we are always using DeviceBuffer, I think we can wrap it with an object that returns us info about the allocation that occurred. Also could store timestamps on each allocation/deallocation as well. Should give us plenty of details. Will run through this workflow tomorrow and update with what I find. Please let me know if you see any issues with this or if I’ve left anything out πŸ™‚

harrism commented 4 years ago

RMM logging will give you the sizes, device IDs, pointers, timestamps, and the source (file and line) of the call (probably all would be DeviceBuffer since it only goes one level up the call stack), but your own logging can give you more context if it helps, I guess.

jakirkham commented 4 years ago

Thanks for that info. Yeah this is pretty close to what I'd want.

We'd want to get some more context about what happens in Python (like line numbers in Python files). In particular am hoping to discover how much was due to things like allocating buffers for receiving data vs. spilling.

jakirkham commented 4 years ago

For my efforts on RMM, can you help me get some data about the allocations?

  1. Total number of allocations / deallocations

At least for the MRE given above we are looking at 298948 allocations and deallocations.

  1. Size distribution: min, max, mean

Interestingly there are a lot of 0 size allocations (as discussed offline). So that's the min. Of the non-zero values though the smallest is 4 (somewhat surprising to me at least).

The max is 4776520.

The mean including 0s is 363015 (rounded) and without 0s is 471108 (so about ~10% of the max).

The standard deviation is including 0s is 864456 (rounded) and without 0s is 958580 (so about ~10% of the max).

  1. Any ordering information you can provide, e.g. FIFO, LIFO, completely random, etc.

Will see if I can extract this from the data, but I think Keith is right that this has a FIFO skew.

kkraus14 commented 4 years ago

At least for the MRE given above we are looking at 298948 allocations and deallocations.

Do you happen to have any info about what the largest number of allocations we had alive at any given time is? I think that's important as well because if we're just doing 1000 allocations, 1000 deallocations repeatedly we wouldn't have performance problems.

harrism commented 4 years ago

Thanks for this. My random benchmark isn't too far off. But answering Keith's question would really help.

jakirkham commented 4 years ago

Certainly I'll poke at the time info next πŸ™‚

harrism commented 4 years ago

Not time. Maximum number of active allocations.

jakirkham commented 4 years ago

Adding some plots below to hopefully give more context. This is for one worker, but other workers look similar.

The first plot shows a histogram of the number of allocations for a particular number of bytes. The second plot shows how many allocations are alive over in "time steps" (when an allocation occurs).

Allocations (#) vs  Size (bytes)

Alive Allocations (#) vs  Time (Operations)

kkraus14 commented 4 years ago

This looks like we're keep the number of allocations to a very reasonable amount.