dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

Undulating cluster auto-scaling #5931

Open orf opened 2 years ago

orf commented 2 years ago

What happened:

After nearing the end of a simple yet long running job with ~1200 workers at peak, the scheduler auto-scaler seems to become confused and start launching and destroying large numbers of workers.

What's concerning about this is that new workers joining the cluster seems to be expensive: the scheduler CPU spikes, and large numbers of concurrent workers connecting at the same time has caused us instability in the past. So this seems like a perfect storm: A long running, expensive job starts to end, then the scheduler essentially starts trying to DDOS itself by launching and destroying hundreds of workers every minute or so.

The lower number of workers is probably the correct number, given the number of tasks remaining:

Screenshot 2022-03-10 at 23 59 52

But the scheduler will spin up another ~400 workers that sit idle, before tearing them down again:

https://user-images.githubusercontent.com/1027207/157775544-88f5dca5-3f97-4287-b9d1-712b48ef803a.mov

You can see this on Grafana, even though the numbers are slightly off and the resolution doesn't capture every peak and dip:

image

The task graph isn't complex: It's reading from parquet, repartitioning into small chunks, doing map_partitions, then splitting the results again before writing to parquet:

df = ddf.read_parquet("s3:/...")
df = df.repartition(4_462)
# Runtime for each partition: ~1 hour
df = df.map_partitions(expensive_function)
# Partitions produced are _big_, which can lead to OOM when writing parquet. So split it up again
df = df.repartition(df.npartitions * 3)
df.write_parquet("s3://...", compute=False).compute()

image

Anything else we need to know?:

I'm not able to run a dump state. I know this isn't helpful, but this is a very expensive and long running job and I can't take any chances that it may disrupt the job nearing completion. I can supply raw Dask logs from the scheduler via email or another method. I also can't get a raw client connection to the scheduler without a fair bit of effort.

I can probably run this job once more to collect some specific debug information if you tell me exactly what is needed: for example, would dumping the cluster state at the end of the run be OK? Or does it need to be in the middle of the scheduler exhibiting this issue?

Environment:

orf commented 2 years ago

After a bit of digging, it seems like the scheduler is trying to bring up workers to handle the ~15 repartition task on this one worker:

Screenshot 2022-03-11 at 00 34 01 image

However, they seem to join, not accquire any tasks, then shutdown. We set a few possibly relevant configuration options by default:

            "DASK_DISTRIBUTED__COMM__SOCKET-BACKLOG": "4096",
            "DASK_DISTRIBUTED__SCHEDULER__BANDWIDTH": "1000000000",  # 1gb/s

which I would assume would tell the scheduler that it's fine to transfer this data between workers.