dask / dask

Parallel computing with task scheduling
https://dask.org
BSD 3-Clause "New" or "Revised" License
12.57k stars 1.71k forks source link

Dask crashes or hangs during out-of-core dataframes sort #7613

Open stephanie-wang opened 3 years ago

stephanie-wang commented 3 years ago

This is essentially the same issue as this one on dask/community, but I thought it would be worth a try to see if anyone can help here. Please let me know if I should close one of these!

What happened: An out-of-core sort of a large dataframe hangs or crashes.

What you expected to happen: The sort should finish.

Minimal Complete Verifiable Example:

I set up a single-node dask distributed cluster on a machine with 32 vCPUs and 128GB RAM. I started a dask worker with this command (note that I used nprocs=8 because this seemed to help reduce some of the errors compared to nprocs=32):

dask-worker localhost:8786 --nthreads 1 --nprocs=8 --memory-limit=10000000000  # 10GB.

Then, I ran this script, which is meant to benchmark a full sort of a large dataframe. The script generates partitions on-disk, then shuffles and blocks on the result using this line:

print(df.set_index('a').head(10, npartitions=-1))  # Force `head` to wait on all partitions.

On 10GB with 100 (100MB) partitions, I started to get a lot of errors with nprocs=32 (stdout here). Then I tried nprocs=8 and the script finished successfully (results here).

However, I haven't been able to get the script to work with 100GB yet. Here is an example of the output. So far, in addition to changing nprocs and nthreads, I've also tried partitions=100 and 1000 and lowering the target and memory config parameters described here.

Anything else we need to know?:

Environment:

quasiben commented 3 years ago

I ran with the following and did not see any errors:

python test-sort.py --dask-nprocs=8 --dask-nthreads=1 --dask-memlimit=10000000000 --data-dir /tmp/bzaitlen python test-sort.py --dask-nprocs=32 --dask-nthreads=1 --dask-memlimit=10000000000 --data-dir /tmp/bzaitlen

results:

num_nodes,nbytes,npartitions,dask_nprocs,dask_nthreads,dask_memlimit,duration
1,1000000,100,8,1,10000000000,1.0982487201690674
1,1000000,100,8,1,10000000000,0.9893317222595215
1,1000000,100,8,1,10000000000,1.1653563976287842
1,1000000,100,8,1,10000000000,1.0320801734924316
1,1000000,100,8,1,10000000000,1.0052003860473633
1,1000000,100,8,1,10000000000,1.0402514934539795
1,1000000,100,8,1,10000000000,1.110060214996338
1,1000000,100,8,1,10000000000,1.0987215042114258
1,1000000,100,8,1,10000000000,1.0460119247436523
1,1000000,100,8,1,10000000000,1.1311535835266113
1,1000000,100,32,1,10000000000,1.01358962059021
1,1000000,100,32,1,10000000000,0.9143905639648438
1,1000000,100,32,1,10000000000,0.8085291385650635
1,1000000,100,32,1,10000000000,0.7534053325653076
1,1000000,100,32,1,10000000000,0.796821117401123
1,1000000,100,32,1,10000000000,0.7891464233398438
1,1000000,100,32,1,10000000000,0.7381155490875244
1,1000000,100,32,1,10000000000,0.7748186588287354
1,1000000,100,32,1,10000000000,0.7413227558135986
1,1000000,100,32,1,10000000000,0.7646501064300537

This was done on a DGX1 which has 1.5TB of RAM and an 2 Intel Xeons

stephanie-wang commented 3 years ago

Thanks for looking into this, @quasiben! A few things:

  1. Can you try running the script again with --nbytes=$(( 10 ** 11 )) for 100GB? The default actually runs with only 1GB (you can check by looking at the nbytes column in the CSV).
  2. Are you starting Dask separately? The dask args for the script I provided are a bit misleading; they're just for recording and don't actually set the dask-worker parameters.
  3. I'm not sure if the results will be the same since your machine has 10x the amount of RAM as the one I was testing on. Maybe you could also try scaling up the script to 1TB if you don't see any errors still with 100GB?
quasiben commented 3 years ago

Unfortunately, I don't have more time today to test. You could try changing the other memory config values and see if that prevents workers from dying. For example, you could set the workers to spill earlier. This can be done with an inline environment variable when starting the dask-worker DASK_DISTRIBUTED__WORKER__MEMORY__SPILL=0.5 or additional methods outlined here

stephanie-wang commented 3 years ago

Thanks! Previously I tried lowering the "target" and "spill" values described here (I think to 30% and 40% respectively), but that didn't seem to change anything. Is that the same as what you're suggesting?

jakirkham commented 3 years ago

It looks like the Dask version being used here is quite old. Have you tried with a more recent version of Dask + Distributed?

stephanie-wang commented 3 years ago

It looks like the Dask version being used here is quite old. Have you tried with a more recent version of Dask + Distributed?

Ah, my bad! I just upgraded to 2021.4.1 and tried again. The error messages are different but it seems like the overall behavior is about the same (exception on 10GB with nprocs=32, succeeds on 10GB with nprocs=8, exception on 100GB with nprocs=8).

I updated the issue description accordingly. It does look like the script now crashes reliably instead of hanging. I'll try adjusting the memory parameters for this version of dask too and report back.

jakirkham commented 3 years ago

Sounds good.

Something else to note is we do use pickle protocol 5 on Python 3.8+ (or Python 3.7 with the pickle5 backport package). This allows for zero-copy serialization of objects using pickle (like Pandas DataFrames), which impacts spilling and communication. Would suggest either using Python 3.8+ (if it is an option for you) or installing pickle5 on Python 3.7.

stephanie-wang commented 3 years ago

Hmm yes I saw the same error for 100GB, nprocs=8 after changing the memory target parameters. Just to confirm, I'm doing this by writing this in the ~/.config/dask/distributed.yaml file (I tried target=10%, spill=20% and target=30%, spill=40%):

distributed:
   worker:
     # Fractions of worker memory at which we take action to avoid memory blowup
     # Set any of the lower three values to False to turn off the behavior entirely
     memory:
       target: 0.10  # target fraction to stay below
       spill: 0.20  # fraction at which we spill to disk
       pause: 0.80  # fraction at which we pause worker threads
       terminate: 0.95  # fraction at which we terminate the worker

I'm also trying the 100GB sort with 1000 100MB partitions instead of 100 1GB partitions right now (nprocs=8, memory-limit=10GB). I don't see any errors, but it's taken >1h so far. Does that seem right?

Thanks for the tip on pickle5, I'll try to get a version running with 3.8!

jakirkham commented 3 years ago

Let's wait to dig in further until pickle protocol 5 is used. Wouldn't be surprised if additional copies created when pickling with earlier protocols are causing us issues

stephanie-wang commented 3 years ago

Unfortunately I'm still seeing similar errors after switching to python=3.8.8. I was seeing that data was getting spilled to disk (I think ~20GB before the crash).

This was with nprocs=8, nthreads=1, memory-limit=10GB and the default parameters for target memory. Should I try a different configuration?

rjzamora commented 3 years ago

Thank you for sharing your benchmarking efforts here @stephanie-wang! It is great to see.



~I am not surprised that set_index is a bit fragile for a 10x bytes-to-memory ratio~. We have pushed on memory optimizations a bit more for the shuffle code path compared to set_index. However, both of these shuffle-based algorithms are likely to become dominated by spilling once the total size of the dataset is ~significantly~ larger than the global memory. With that said, you should be able to get the 100GB case working with ~10GB~ 80GB of memory (although performance will be poor ~if your disk/file-system are slow~).



If you are having memory problems with the default settings of dask-worker (killed workers, etc_) then:

(1) First try reducing your partition size. If 1GB partitions are giving you problems, try 500MB or 250MB (2) If going down to 250MB is still fragile, you should then try decreasing the default spilling thresholds. For example, something like memory_target_fraction=0.2, memory_spill_fraction=0.4, and memory_pause_fraction=0.6 would be quite conservative (and should probably work) ~if your partition size is small enough to hold 5 partitions in memory on each worker~. (3) Check that you are using a fast/reliable storage space for spilling by setting local_directory.



Note that I typically use LocalCluster to simplify things when I am testing on a single machine. So, I have something like the following in mind for the cluster:

from dask.distributed import LocalCluster

cluster = LocalCluster(
    n_workers=8,
    threads_per_worker=1,
    memory_limit=1e10,
    memory_target_fraction=0.2,
    memory_spill_fraction=0.4,
    memory_pause_fraction=0.6,
    local_directory="/datasets/rzamora/scratch/dask-space",
)

I’ll try to test things out locally soon and let you know what I find.

jakirkham commented 3 years ago

This might also be a use case that benefits from some of the spilling optimizations that we've done in RAPIDS (cc @madsbk 🙂)

stephanie-wang commented 3 years ago

Thanks, this is really helpful! I think this is approximately what I've been trying, but it's really helpful to get confirmation that this is the right way to go about it.

Just to clarify (since I'm not very familiar with the dask-worker settings), regarding this:

I am not surprised that set_index is a bit fragile for a 10x bytes-to-memory ratio. We have pushed on memory optimizations a bit more for the shuffle code path compared to set_index. However, both of these shuffle-based algorithms are likely to become dominated by spilling once the total size of the dataset is significantly larger than the global memory. With that said, you should be able to get the 100GB case working with 10GB of memory (although performance will be poor if your disk/file-system are slow).

I thought that by running dask-worker --nprocs=8 --nthreads=1 --memory-limit=$(( 10 ** 10 )), this would give each process a memory limit of 10GB, for a total of 80GB. Or does this actually mean a total of 10GB for all 8 workers? I've since lost the stdout, but I think I do remember seeing several messages about a worker process being started with ~9GB of memory, so I've thought it was the former.

If you do get around to running this script, here is the command I've been using for 100GB, 100 partitions (note that the version of the script that I posted assumes that dask is already started with the same worker parameters):

python test_sort.py --nbytes=$(( 10 ** 11 )) --npartitions=100 --data-dir=/data --dask-nprocs=8 --dask-nthreads=1 --dask-memlimit=$(( 10 ** 10 ))
rjzamora commented 3 years ago

this would give each process a memory limit of 10GB, for a total of 80GB

Ah right - My mistake! You are correct that you are giving each worker a 10GB limit.

stephanie-wang commented 3 years ago

Just a quick update on this. I tried 100GB with nprocs=32, 1000 partitions (since 100 wasn't working), and the recommended memory thresholds. I got a slightly different error this time. It still exited with a KilledWorker exception, but there were also some messages about the scheduler becoming unresponsive. Hopefully this is helpful! You can see the relevant output here.

GenevieveBuckley commented 3 years ago

@stephanie-wang just to let you know, there's quite a bit of work going on behind the scenes on improving Dask dataframe shuffles. I don't think https://github.com/dask/dask/pull/8223 is intended to be runnable (is that right @gjoseph92?) but you might like to subscribe to that PR for updates.

gjoseph92 commented 3 years ago

Hey @stephanie-wang, nice to see you again! Yeah, I think https://github.com/dask/dask/pull/8223 will be interesting to you. We also have a blog post about some of the technical details: https://coiled.io/blog/better-shuffling-in-dask-a-proof-of-concept/. Could be a natural fit to recreate some of this with actors, though locality might be awkward. I'd be curious how it compares to the multi-output shuffle in ray (probably a lot slower right now, but hopefully it would at least succeed).

stephanie-wang commented 3 years ago

Hey all, thanks for the update! The blog post that you linked looks very interesting. We've taken a pretty different approach in Ray where we are trying to implement shuffle natively with tasks, without "bypassing" the core system. I agree, it would be great to see how these compare.

We ended up writing a blog post about some of those earlier results here. I'd be happy to update it or link to your blog post once you have results that you're happy with in the new Dask-based shuffle!

gjoseph92 commented 3 years ago

Yeah, it would be much nicer to not bypass the core system. The one thing with tasks is that it's inherently O(N^2) in graph size (without a way to describe a key as having multiple outputs, which isn't possible in the dask graph spec). So with, say, a 100,000-partition DataFrame (large but possible) you'd have 10 billion tasks for the shuffle. That's clearly way beyond distributed's capacity, and I'd imagine even Ray would struggle a bit at that scale? In general graphs are just an annoying way to represent a shuffle.

Though for Ray, you could probably keep an O(N) graph size by eliminating the multiple_return_gets from the graph, and instead launching tasks-within-tasks, which lets you hide the N^2 part from dask by taking more advantage of Ray's multiple-return support, at the expense of a little more indirection.

remote_shuffle_group = ray.remote(shuffle_group)

def split(part: DataFrame) -> Tuple[ObjectID, ...]:
    return remote_shuffle_group.options(num_returns=n_parts_out).remote(part)

def group(i: int, object_ids: List[Tuple[ObjectID, ...]]) -> DataFrame:
    return _concat(ray.get([ids[i] for ids in object_ids]))

dsk = {}
for i in range(npartitions_input):
    dsk[(shuffle_split_name, i) = (split, name_input, i)
all_split_objectids = list(dsk)
for i in range(n_parts_out):
    dsk[(shuffle_group_name, i) = (group, i, all_split_objectids)

You'd still have O(N^2) ObjectIDs, but only O(N) TaskIDs, which maybe would be preferable?

Just writing this up because it's maybe an interesting route we could consider in distributed as well (tasks-within-tasks in some way), which might be a little less "bypass-y" than https://github.com/dask/dask/pull/8223.