dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 717 forks source link

Slow execution time on very large k8s-backed dask cluster #5384

Open bolliger32 opened 2 years ago

bolliger32 commented 2 years ago

What happened:

I'm trying to push the bounds of scaling kubernetes-backed dask clusters (using dask gateway), and the other day I started trying with a 4000-worker cluster, where 7 workers are packed into each node in our GCP cluster. I noticed that tasks started running ~1.5-2x slower than with a more reasonably sized cluster (couple hundred workers). This doesn't really surprise me at a high level - more workers means more time spent communicating with the scheduler, etc. But digging in a bit more, it is within the execution of the tasks themselves that the slowdown happens. I understand that communication could bog down the transmission of tasks through the scheduler, but I'm surprised that once a task begins executing, the task itself would take longer with a very large cluster.

What you expected to happen: I expected tasks to execute in roughly the same amount of time regardless of cluster size and the time it takes for client-scheduler and scheduler-worker comms.

Minimal Complete Verifiable Example:

import time

def test(x):
    st = time.time()
    for i in range(100000000):
        pass
    return time.time() - st

print(np.mean(client.gather(client.submit(test, range(4000))))

# ~1.9 on a 100-worker cluster
# ~3.75 on a 4000-worker cluster

Anything else we need to know?: This is kind of a follow-on to my general "best practices for large clusters" issue #5164 (for which I know I owe a blog post...working on it 😬 ) But I figured starting a new issue that was specific to this particular scaling challenge was better than continuing to post to that thread

Environment:

bolliger32 commented 2 years ago

I dug into this a bit more to just look at the scaling properties and came up with this graph showing how the mean runtime of this test function changes with the number of workers:

image

I also noticed that runtime does not decrease when scaling down from a big number. Here's that same plot where instead of slowly scaling the cluster up, I start w/ 5000 and slowly scale it down:

image
fjetter commented 2 years ago

I would try to find out how many workers were on the same host and how the runtime is affected by this (Haven't tested the code)

from dask import delayed
import dask.dataframe as dd
from distributed.worker import get_worker
from distributed.comm.utils import get_address_host_port

@delayed
def task():
    w = get_worker()
    host, _ = get_address_host_port(w.address)
    st = time.time()
    for i in range(100000000):
        pass
    duration = time.time() - st
    return pd.DataFrame({"host": [host], "address": w.address", "task_runtime": [duration]})

ddf = dd.from_delayed([task() for _ in range(5000)], meta={"host": "str", "address": str, "task_runtime": float})
df = ddf.compute()
df.groupby("host").agg({"address": pd.Series.nunique, "task_runtime": "mean"})

my suspicion is that colocated workers throttle each for some reason (maybe insufficient CPU?)

bolliger32 commented 2 years ago

@fjetter thanks for the tip. I tried this and they all seem to be getting unique hosts. The address column in that dataframe is always 1 and the task_runtime increases w/ number of workers as before.

fjetter commented 2 years ago

I don't know a lot about how k8s works but I assume every k8s pod receives its own host IP even though they are spawned on the same physical host. I'm just wondering if they are actually properly isolated or if they can interfere with each other. The reason why I'm emphasizing this is that a worker is only aware of other workers if it needs to communicate with it. the scheduler will eventually bottleneck if there are two many workers but a worker should be agnostic.

What I would expect

bolliger32 commented 2 years ago

I don't know a lot about how k8s works but I assume every k8s pod receives its own host IP even though they are spawned on the same physical host

Correct (I believe). Our setup is such that 7 single-thread dask worker pods can fit on one physical host, and it appears that they all have their own host IP

I'm just wondering if they are actually properly isolated or if they can interfere with each other.

I don't think it's necessarily just a function of the pods on the same host interfering with each other. If I just spawn 7 workers, and they are all on the same node, I see normal performance. It's only when we reach a high number of workers that the performance degrades, though again this degrading seems to occur within the hot loop, not the scheduler. (there may be degrading related to the scheduler as well, but there is definitely degrading within the single task)

The reason why I'm emphasizing this is that a worker is only aware of other workers if it needs to communicate with it. the scheduler will eventually bottleneck if there are two many workers but a worker should be agnostic.

That was my impression, which is why this is a little surprising

What I would expect runtime of a single task (the hot loop) stays constant runtime of the overall computation (scheduling 4k of these tasks) degrades due to the scheduler becoming a bottleneck

I would expect the same but doesn't appear to be the case. Perhaps its some sort of GCP throttle when we have many resources running? But I would expect that to show up as a cap rather than somehow artificially slowing down or limiting the number of CPUs available on a given node. Still puzzled. Or perhaps its our network policy (Calico) doing something? That's a little beyond my understanding, but I'll investigate a bit.

One thing I do see is that each worker seems to be running below it's CPU requests. We have 1 vCPU requested for each worker pod, which gets used when we only have a small number of workers. However, with a large number of workers, it drops to ~.7 or .8 usage. Not exactly sure why yet

fjetter commented 2 years ago

We have 1 vCPU requested for each worker pod, which gets used when we only have a small number of workers. However, with a large number of workers, it drops to ~.7 or .8 usage. Not exactly sure why yet

That sounds like the culprit. I doubt that dask is capable of doing anything like this.

I'm wondering if the same happens if yo have fewer pods but the individual pods have more threads. my gut feeling would generally recommend to use "bigger but fewer workers" if possible instead of having many single threaded ones. that should help reduce all sorts of comm related performance problems. Especially if you know that you end up deploying 7 pods on the same machine anyhow, I suggest one pod with 7 vCPUs / 1 worker w/ 7 nthreads instead. I'm wondering if this problem also shows up in this constellation

bolliger32 commented 2 years ago

That sounds like the culprit. I doubt that dask is capable of doing anything like this.

I might have been a little unclear - it's not that we change our requests to .7 vCPU, we still request 1 vCPU. But the usage goes from using the full request to only using .7 or .8 of the full request

I do wonder about the threads vs. pods approach. The problem with the threads approach is the GIL. When I run the snippet you proposed with just a single 7-vCPU worker, I get an average task completion time of ~10.2s and the total cpu usage never really gets above 100% (where 700% would be full usage of all of the 7 vCPUs requested). I assume this is due to the GIL

fjetter commented 2 years ago

's not that we change our requests to .7 vCPU, we still request 1 vCPU. But the usage goes from using the full request to only using .7 or .8 of the full request

Yes I get that. I'm just wondering if there is a throttling going on by the provider resulting in you not being able to utilize the entire vCPU

I do wonder about the threads vs. pods approach. The problem with the threads approach is the GIL.

Yes, I briefly ignored this yesterday 😅 . For a lot of usecases this is actually not a dealbreaker. pandas, numpy, etc. do a good job of releasing the GIL for heavy computations but I ignored in your example that you are running plain python.

FWIW dask can also use a processpool executor instead of a threadpool executor but the deployment is not straight forward, yet (xref https://github.com/dask/distributed/pull/5063). You'll need to initialize the processpool yourself for this, e.g. via a workerplugin or something like that. For the hot loop you're doing this would likely be the best approach but fo real applications processes are often a bit too slow, especially if there is a lot of output data involved so this might go into the wrong direction

bolliger32 commented 2 years ago

Sorry for dropping this for awhile as other things came up, but I am back to running some of these same workflows and seeing the same scaling issues. FWIW I have updated dask/distributed to 2021.11.1 just in case anything recent improved this behavior, but I'm still seeing similar results. I want to modify what I said earlier which is that in the real world workflow, I'm not actually seeing this drop to .7-.8 CPU usage - that was just happening in this hot loop example. But in either case, I'm still seeing slowdowns of up to 2x in task execution time, seemingly independent of any additional scheduler overhead. Curious if you (@fjetter ) happened to have any additional thoughts or if anyone out there might have suggestions. I could try the processpool executor but my hunch is that something else is likely going on. Is it possible that somehow the nanny process is eating up cycles waiting to communicate with the scheduler but this is being recorded as task execution time?

bolliger32 commented 2 years ago

NOTE: I think I spoke too soon on this being the reason for the original slowdown I was seeing (see below follow-up comment)

OK upon further investigation, I think I was deceived by the k8s scheduler earlier. I believe this is simply an issue with hyperthreading on our GKE nodes, and would be solved by your proposed approach to use larger workers with multiple threads per worker (if the workflows are sufficiently numpy-based to release the GIL).

To summarize, we use nodes with 8 virtual CPUs, which I believe means 4 physical cores w/ hyperthreading. There's some overhead, so if we want single-vCPU workers, we can in theory load 7 of them onto each node. When I do that and ensure that all 7 are instantiated on the same node, I see a close to 2x slowdown in execution time, similar to the max on that graph I was showing above. I think that 2x is meaningful b/c if a single physical core with hyperthreading enabled tries to execute two processes, each one will presumably take 2x longer (?) So the fact that that graph from earlier in this conversation maxed out at a 2x slowdown I think is due to the fact that this corresponded to fully-saturated nodes, while at lower numbers due to the under-the-hood k8s autoscaling algorithm, we had under-saturated nodes

The weird thing is that even if we, say, only put 3 1-vCPU workers onto a node, we can still observe a slowdown. I assume this is b/c whatever k8s is doing under the hood might allocate 2 of those pods to a single physical core...though I'll admit I don't totally understand if or why or how this would happen.

I guess the solution is to either (a) use larger workers with multiple threads per worker (if the workflow is appropriate for this), or (b) just accept that tasks will execute 2x as slow b/c there are really only half as many physical cores as "vCPUs".

If anyone has further insights I'd love to hear them, but I'm also happy if you want to close this issue as I believe after all this time that this is indeed a k8s challenge, and not dask-related. Sorry for the confusion!

bolliger32 commented 2 years ago

I guess this isn't the full story actually. The hyperthreading component might be whats responsible for the slowdown I'm seeing in that MRE above, but doesn't explain the more complex real world workflow I'm working with. In this workflow, running this with a single 1-CPU worker takes ~240s:

image

Mapping this task across 7 inputs, using 7 1-CPU workers all loaded onto the same physical node takes ~310s each:

image

However, running it on 3000 workers (mapped across 3000 inputs) takes very variable amounts of time, with a mean around ~450s:

image

Still a little baffled by whats going on here. Could it be that each individual processor is somehow waiting for communication with the scheduler while executing the task thread, such that when there are more workers it's spending more time waiting on the scheduler but not doing work on the task? If anyone has any thoughts on where to look to start debugging, let me know!

I'll note that unlike the MRE I included originally, the slowdown is not capped at ~2x the runtime for a single worker executing the task on a single input. Some of the longer-running tasks in the 3000-worker example are much longer than 2x the original time. I'll also note that the structure of this workflow is such that there shouldn't be any meaningful differences in runtime of the task across the various inputs that I map over.

I've also looked at the profile for each case (1 task vs. 3000 simultaneous tasks) and it's clear that there isn't one section within the overall function that is causing the slowdown. The profile graphs look roughly identical, but every call takes longer w/ 3000 workers executing simultaneously than when it's just one worker.