Open wence- opened 1 year ago
there is a "hidden dashboard" <dashboard-address>/shuffle
which exposes some instrumentation. I'd be interested to see what this shows. If you are actually seeing memory peaks that are caused by the shuffle, that means our buffers are not flushing fast enough and this should be visible there.
Otherwise, a peak at the very end could mean this is the to_parquet
serialization which likely runs in parallel. In your setup, every worker should have 1 CPU and 25GB of RAM? I'm wondering how large an output partition actually is. Is it possible that the distribution of key
is so severely skewed that there are partitions of significant size s.t. a worker may run OOM?
The key/partition distribution can be calculated with
ddf = ddf.shuffle('key', shuffle="p2p")
len_per_partition = ddf.map_partitions(len).compute()
which just counts the length of every partition. If you can run this without hitting an OOM, it's likely parquet that is causing you trouble
Lastly, in which "stage" are the workers dropping? Still during transfer (keys: shuffle-transfer-foo
) or during unpack (keys: shuffle-p2p-bar
)?
Is it possible that the distribution of
key
is so severely skewed that there are partitions of significant size s.t. a worker may run OOM?
There is one key that is much more prevalent than others, so yes, this could be a cause. Am seeing if the shuffle + map_partitions completes.
Actually, it looks like I don't get as far as the shuffle proper at all, a worker gets killed when the assign
tasks are still processing. Is the problem here that the entire dataset is being persisted into memory?
If I run with split_row_groups=True
then I do start the shuffle proper, so that may be the issue.
With split_row_groups=True
and upping the default disconnect timeout from 30s, the shuffle aspect seems to be progressing without large memory footprint on any of the workers (peaking at around 3-3.5GB/worker).
Eventually that approach died in the map_partitions
call (or somewhere just before that when the shuffle was finishing and trying to bring the files back into memory [this is a guess]). This might well be due to the one large partition (which probably doesn't fit in 25GB).
The actual exception above is a bit hidden
2022-12-08 09:06:13,957 - distributed.worker - WARNING - Compute Failed
Key: ('shuffle-p2p-ddfa5cb26b70b7d1e54cf12e3b82b044', 370)
Function: shuffle_unpack
args: ('06e936888527f1380d9e3c9f5f1027b4', 370, None)
kwargs: {}
Exception: "AssertionError('Shuffle worker restrictions misbehaving')"
cc @hendrikmakait
My guess is that this is due to a single large partition which dask is trying to materialise on a single worker and blowing out the memory budget. Since I know this, I can manually split that partition and rather than calling DataFrame.shuffle
directly call rearrange_by_column
with my newly determined partitioning column. Will give that a go and see where we get to.
Maybe that's indeed due to a memory error but we would expect a different exception. The exception you got is something that should not be possible with https://github.com/dask/distributed/pull/7326
Can you tell us which commit you were using?
Sorry, I screwed up and was running on c9f75b12, let me try again..., this time with 3c90f508665
Here's an updated log, same kind of failure, but slightly different traceback.
The dashboard errors are more less expected. WE haven't fixed the dashboard in case a worker is dying. A worker is definitely dying during the shuffle. We haven't implemented a restart logic for it, yet. See https://github.com/dask/distributed/issues/7353
Is the problem here that the entire dataset is being persisted into memory
The entire dataset doesn't need to fit into memory. A single partition has to fit, of course (plus a little room in case pandas/arrow decides to copy stuff). If you are failing already at a task that is before the actual shuffle, you might have a partition that is too large. I would recommend then to run the same example with fewer workers, s.t. the memory limit of an individual worker is bigger.
You can also run a sanity check to ensure there is nothing super weird going on, e.g.
ddf = dd.read_parquet(inputdir, split_row_groups=False)
len(ddf)
This loads the entire data once and returns the row count. If this fails already, your machine is too small, or rather your workers are too small
The data exists on disk as ~300 parquet files (that each expand to around 300MB in size
... what I'm trying to say. Maybe there is the one bad file that expands to 20-30GB. IDK
I assume this is not a public dataset, is it?
I assume this is not a public dataset, is it?
Unfortunately not, AFAIK.
Is the problem here that the entire dataset is being persisted into memory
The entire dataset doesn't need to fit into memory. A single partition has to fit, of course (plus a little room in case pandas/arrow decides to copy stuff). If you are failing already at a task that is before the actual shuffle, you might have a partition that is too large. I would recommend then to run the same example with fewer workers, s.t. the memory limit of an individual worker is bigger.
You can also run a sanity check to ensure there is nothing super weird going on, e.g.
ddf = dd.read_parquet(inputdir, split_row_groups=False) len(ddf)
This loads the entire data once and returns the row count. If this fails already, your machine is too small ;)
The data exists on disk as ~300 parquet files (that each expand to around 300MB in size
... what I'm trying to say. Maybe there is the one bad file that expands to 20-30GB. IDK
I know the input files all expand to sane sizes (have tested this) [updated initial comment, it's always in the 2-4GiB range], so parquet-wise the inputs are sane. My reading of the errors, and perhaps you disagree, is that an (or maybe a few) output partitions are large (to the point that one output partition doesn't fit in a single worker's memory). I will try with fewer workers to see if upping the individual memory limit helps.
I know the input files all expand to sane sizes (have tested this) [updated initial comment, it's always in the 2-4GiB range], so parquet-wise the inputs are sane. My reading of the errors, and perhaps you disagree, is that an (or maybe a few) output partitions are large (to the point that one output partition doesn't fit in a single worker's memory). I will try with fewer workers to see if upping the individual memory limit helps.
Possibly. One important thing to note is that at the moment the entire dataset has to fit onto disk but not in memory. Maybe your disk size is too small? I am surprised you do not see a better exception. We might have been a bit too zealous with stripping "irrelevant" traces in https://github.com/dask/distributed/pull/7326
We might have been a bit too zealous with stripping "irrelevant" traces in https://github.com/dask/distributed/pull/7326
It sure looks like it. The way that I read these logs, a worker gets restarted because it is running out of memory, which causes the scheduler extension to fail the shuffle. This in turn triggers a cascade of errors in the shuffle_unpack
tasks once we propagate the shuffle failure to workers, these would be the errors that we see in the logs.
I tried overcommitting memory in the workers and eventually we nearly completed, but I think one worker (which needed to read 49GiB from disk) was removed because it didn't heartbeat for 5 mins:
Dask version: 2022.12.0
Distributed version: 2022.12.0+95.gbc317e20
2022-12-15 09:23:02,459 - distributed.scheduler - WARNING - Worker failed to heartbeat within 300 seconds. Closing: <WorkerState 'tcp://127.0.0.1:36461', name: 19, status: running, memory: 119, processing: 105>
Remind me how I up that limit, I think I should then manage to get this to complete. DASK_DISTRIBUTED__SCHEDULER__WORKER__TTL=3600s
?
I assume this is not a public dataset, is it?
Unfortunately not, AFAIK.
I'm curious, would you be comfortable creating a mimic if this functionality existed?: https://github.com/dask/dask/issues/9766
I'm curious, would you be comfortable creating a mimic if this functionality existed?: dask/dask#9766
In principle I have no objection to doing this, but would need to read the EULA on the data carefully to know exactly what that would need to cover.
In this particular instance, I have a hunch about what is causing things to fall over, and so I think I can by hand construct a synthetic dataset that will provoke the issue.
To follow up here, I was able to get the following script to run to completion:
This was on a machine with 40 physical cores and 1TB of RAM.
I needed to set:
export DASK_DISTRIBUTED__SCHEDULER__WORKER__TTL=3600s
export DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT=3600s
export DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP=3600s
(Probably they didn't need to be that high, but belt-and-braces)
I also need to overcommit the memory limit for each worker to 100GiB.
The reason for this, and the previous failures, is that this dataset has a very skewed distribution for the shuffle key. In particular, there is a single key value that corresponds to around 5% of the total rows (this leads to one worker peaking at 80GiB memory usage when performing the len
calculation, where all others sit comfortably around 4GiB).
The dataset has 2879987999 total rows, and the largest output partition has 132603535 rows.
In this particular instance, I know that downstream I don't need to do a merge of the dataset on this key (it's just a pre-sorting step), and so with the prior of the skewed key distribution I could write code to manually construct a better partitioning key. I wonder to what extent that might be automated. One could imagine extending the interface to allow the user to provide a prior on the key distribution that allows the shuffling mechanism to make sensible decisions.
In any case, having figured out the issues, I can, if it is interesting, construct a synthetic datasets that would allow you to test things too (I think one can also replicate the problem at a smaller scale by just doing the same thing but having tighter worker limits).
from pathlib import Path
import dask
import dask.dataframe as dd
import distributed
from distributed import Client, LocalCluster
if __name__ == "__main__":
print("Dask version:", dask.__version__)
print("Distributed version:", distributed.__version__)
cluster = LocalCluster(n_workers=20, memory_limit="100GiB")
client = Client(cluster)
inputdir = Path(".../input/")
outputdir = Path(".../shuffled/")
ddf = dd.read_parquet(inputdir, split_row_groups=True)
ddf = ddf.shuffle('shuffle_key', shuffle="p2p")
final_partition_sizes = ddf.map_partitions(len).compute()
print(f"Num out partitions = {len(final_partition_sizes)}")
print(final_partition_sizes.max(), final_partition_sizes.min())
print(final_partition_sizes)
I'm attempting to use to p2p shuffle implementation (using the branch proposed for merge in #7326) to shuffle an ~1TB dataset. The data exists on disk as ~300 parquet files (that each expand to around [edit 2GiB] in size, with 23 columns) and I'm trying to shuffle into around 300 output partitions and writing to parquet. The key column is a string (although I can convert to int or datetime if that would help), the other columns are a mix of string, int, and float.
This is on a machine with 1TB RAM, and 40 cores. I run like so:
This progresses quite well for a while, with peak memory usage hitting ~600GB, at some point though, some workers reach 95% their memory limits and are then killed by the nanny.
Am I configuring things wrong? Do I need to switch on anything else? Or should I not be expecting this to work right now?