dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 720 forks source link

Slow Joins (w/dask-cudf) #3413

Open quasiben opened 4 years ago

quasiben commented 4 years ago

In https://github.com/rapidsai/dask-cuda/pull/226#pullrequestreview-348129792 an interesting conversation is happening between folks which I'd like to bubble up as an issue unto itself -- that is, why are dask joins slow (at least with cuDF/dask-cuDF)?

In exploring and experimenting with ucx-py, @madsbk and @rjzamora have looked at performing explicit worker-worker communications which by-passes the dask-scheduler entirely. Performing this calculation with explicit communication is extremely performant and we are measuring a 4X speed up over the dask-main line implementation. Again, the question is, why ? Some targeted areas of identifying bottlenecks might include internal cuDF slowness ? Or perhaps the scheduler is slow ?

@kkraus14 has previously commented that there may be some improvements in cudf concat and perhaps some unnecessary deep copies.

Below is a link to @rjzamora's performance profile when running on 4 GPUs with 128 partitions for additional context

https://gistcdn.githack.com/quasiben/74115292d42fc0e96fcf097abeb4ae82/raw/628edba1f8685ce32e3bd0130151f11b880084fc/profile_4gpus_128chunks.html

cc @mrocklin

kkraus14 commented 4 years ago

There's definitely room for optimization in cuDF, but I imagine that the explicit comms would be hitting the same APIs and have the same overheads, no?

rjzamora commented 4 years ago

There's definitely room for optimization in cuDF, but I imagine that the explicit comms would be hitting the same APIs and have the same overheads, no?

Yes, but using explicit-comms will typically aggregate smaller partitions together when they are persisted on the same worker, thus improving GPU utilization for libcudf primitives.

mrocklin commented 4 years ago

@madsbk mentioned this:

but I just want to mentioned that I have seen benchmarks that generates more than 200k tasks and 1k partitions!

Yeah, so Dask's merge algorithms currently generate n log(n) tasks where n is the number of partitions. I'm a little surprised by generating 200k tasks from 1k partitions, but certainly at some point this becomes problematic.

(actually, I just checked and I got close to this)

In [1]: import dask                                                                                      

In [2]: dask.config.set(shuffle="tasks")                                                                 
Out[2]: <dask.config.set at 0x7f5b4bdacfd0>

In [3]: df = dask.datasets.timeseries().repartition(npartitions=1000)                                    

In [4]: df2 = df.set_index("id")                                                                         

In [5]: df2                                                                                              
Out[5]: 
Dask DataFrame Structure:
                    name        x        y
npartitions=1000                          
835               object  float64  float64
884                  ...      ...      ...
...                  ...      ...      ...
1118                 ...      ...      ...
1168                 ...      ...      ...
Dask Name: sort_index, 78730 tasks

At some point I agree that Dask's algorithms will probably stop scaling very nicely, but that point might be somewhat large, and we might be able to push it back a bit. On the other hand. I suspect that many cases will be of moderate size.

Regardless, before we jump into re-implementing all of these algorithms in a new system I think that we should do a bit of profiling. If there we are able to keep things in the core then there is a lot of value in that. We'll lose things like profiling, resilience, the dashboard, logging, plugins, and so on if we jump entirely outside of mainline. Additionally, there might be things that this group can do (like increasing scheduler speed) that would benefit all Dask workloads, not just dataframe merges.

rjzamora commented 4 years ago

We'll lose things like profiling, resilience, the dashboard, logging, plugins, and so on if we jump entirely outside of mainline. Additionally, there might be things that this group can do (like increasing scheduler speed) that would benefit all Dask workloads, not just dataframe merges.

Right - While I suspect something like an explicit-comms api might be useful to have long-term in dask/distributed, the goal is certainly to have all dataframe-related algorithms implemented in main-line dask. We have implemented merge and set_index to explore oportunities for optimization.

I think we would definitely prefer the long-term solution to be a more-inteligent scheduler. A significant disadvantage of the current explicit-comms solution is the fact that we effectively require synchronization between workers before a collective/optimized task is executed. This is fine for an isolated merge/sort benchmark, but is a problem if we are using an embarrassingly-parallel workflow (like parallel IO + preprocessing) to generate he initial data.

mrocklin commented 4 years ago

Right, so if we do end up needing something that avoids the scheduler (which I think we should verify that we absolutely need first) then it might be worth thinking about what sorts of things we could do that would be general and long-lived, rather than hand-crafting a custom merge algorithm.

For example, Spark's internal system creates keyed-buckets that workers can drop tasks into, and then Spark moves those data around when ready as larger chunks. Maybe we should think about adding something similar. (I think that we can probably find something better than this, but I bring it up as one kind of general solution).

I think that the explicit comms merge experiment has been awesome at demonstrating the kind of performance that we should expect. However, I would love it if we could pause for a moment and do a bit of general design work before devoting a of resources running down that path. I think that this issue and discussion are a start for that.

To help motivate that discussion I think that it would be great to get more information about what we're trying to do, and what is slow today. The first step for that, I think, is a performance_report that shows that things are slow with mainline Dask, and maybe some information about the equivalent fast solution?

rjzamora commented 4 years ago

@mrocklin - Here are some performance reports for both merge and set_index with 4 GPUs. Note that the explicit-comms version of both benchmarks take ~1/3 the time...

mrocklin commented 4 years ago

Oh awesome. I have a bunch of feedback on these plots and I think that there are probably some low hanging fruit. I'm happy to discuss that here, but I would also be happy to jump on a call with folks. My goal of the call would be for us to walk through these plots together and develop some intuition on how to interpret them and find performance issues.

Quick summary though, you're definitely not bottlenecked by scheduler overhead :)

(We have only a few hundred tasks, so Dask should be able to schedule those in tens to hundreds of milliseconds. And yet the computations are taking tens of seconds. Something else is clearly going on)

madsbk commented 4 years ago

I think it is a very good idea to have a meeting about this. I am available tomorrow and Thursday if that works for you guys?

mrocklin commented 4 years ago

I'm chatting with Ben in side-channels to find a time that works.

mrocklin commented 4 years ago

Also @rjzamora , are those profiles generated using ucx or tcp? I suspect that they're using TCP comms?

rjzamora commented 4 years ago

Right, TCP - This is not with UCX (to avoid the separate issues with NVlink)

Sent from my iPhone

On Jan 27, 2020, at 12:46 PM, Matthew Rocklin notifications@github.com wrote:

 Also @rjzamora , are those profiles generated using ucx or tcp? I suspect that they're using TCP comms?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or unsubscribe.

mrocklin commented 4 years ago

There is some other excellent conversation on this topic happening here: https://github.com/rapidsai/ucx-py/issues/402#issuecomment-579904310

(including a full pair of profiles with UCX)

mrocklin commented 4 years ago

From the conversation there there are clearly a set of things still going on with Dask/UCX/cuDF interactions that make things oddly slow. It's interesting that we don't run into these with @madsbk's solution. This makes me curious on how these two systems differ.

For example, if we knew that @madsbk's solution did everything in a single thread then that might point us to some odd CUDA cross-thread contention issue. Is this the case by any chance? If so, we might try experimenting with dropping compute threads from Dask, and using a tornado.concurrent.DummyExecutor for computation to keep everything in a single thread.

Or, if @madsbk's solution does use threads, are there any other similar differences that we might test quickly?