Open gjoseph92 opened 3 years ago
The documentation for shuffle
says
Either 'disk' for single-node operation or 'tasks' for distributed operation. Will be inferred by your current scheduler.
Which would suggest to me that this is already the case?
You give the docs too much credit :)
In this case I think "single-node" just means "not using a distributed
Client" and "distributed" means "using a distributed
Client". As that section of the code I referenced shows, it's not smart enough to recognize when the distributed
Client is connected to a cluster that's just running on one machine.
I wholeheartedly support @gjoseph92 suggestion. I use dask
on a multi-core, single-node cluster which I start with Client()
(i.e., LocalCluster
). If I knew that I could use shuffle = 'disk'
for merge()
, join()
, and set_index()
, I think that would have spared me from numerous cluster deadlocks.
If @gjoseph92 suggestion is adopted, I would also suggest that when shuffle = 'disk'
, the temporary_directory
(which is used by shuffle
and is defined in dask.config) defaults to local_directory
(which is used by LocalCluster
) to make sure we don't run out of disk space. Perhaps we could even suggest to users somewhere in documentation that they may consider installing python-snappy
package in order to compress shuffled data (and further save on disk space). That compression can be turned on with dask.config.set({"dataframe.shuffle-compression": 'Snappy'})
.
I think it would be better to keep using the disk-based shuffle if the Client is connected to a LocalCluster (not sure how to tell this).
A cluster could broadcast some metadata to a client upon connect. In this particular case, it could be something like "workers-share-disk": True
. I don't know if this can be true for other cluster types but this way it would be relatively generic and the mechanism could be used for other functionality as well. One example coming to mind are our synchronization primitives which could also have alternative implementations.
We have some sort of this already with Client._update_scheduler_info
but that doesn't include any cluster information, yet.
I would also suggest that when shuffle = 'disk' , the temporary_directory (which is used by shuffle and is defined in dask.config) defaults to local_directory (which is used by LocalCluster) to make sure we don't run out of disk space. Perhaps we could even suggest to users somewhere in documentation that they may consider installing python-snappy package in order to compress shuffled data (and further save on disk space). That compression can be turned on with dask.config.set({"dataframe.shuffle-compression": 'Snappy'}).
That all makes sense. It would be helpful if you could create another issue for this. Changing defaults and/or documentation should be relatively straight forward compared to the "detect cluster type"
@gjoseph92 and @fjetter, I have found that merge-ing two Dask DataFrames is 5x slower when shuffle = 'disk'
compared to shuffle = 'tasks'
. Is that expected behavior? If not, I could try to write a MVE and submit another issue.
Is that expected behavior? If not, I could try to write a MVE and submit another issue.
Not necessarily. It should depends on the size of the shuffle, the data involved and most importantly the disk you have on your machine. If you can produce a MVE, that would be helpful
@fjetter and @gjoseph92, I have created a new issue (#5554) with a MVE that shows how merging is much slower when using shuffle='disk'
.
Reposting from
https://github.com/dask/dask/pull/9826#issuecomment-1398500767
The reason why I dislike picking tasks for the one operation but disk for the other is because performance characteristics for both algorithms are more or less exclusively depending on the data distribution and number of partitions..
I agree that it feels messy to use a different shuffle default for different algorithms. With that said, I do think it is safe to consider the groupby case to be somewhat "special". The difference is that the true default behavior for groupby is to not include any shuffle at all. It just so happens that it is possible to boost the performance of
groupby(...).agg(.., split_out>1)
by leveraging a task-based shuffle (but not a disk-based shuffle).Is your benchmark code/data still available somewhere?
I don't think you need a particularly unusual data distribution to show that the shuffle based groupby is only better than the conventional ACA groupby when shuffle is not set to
"disk"
. For example, I'm sure the performance of shuffle="disk" would be pretty low across the board for Ian's results shared in #9302 (comment). Another example (a high-cardinality groupby where a shuffle-based reduction should outperformshuffle=False
):import time from dask.datasets import timeseries def run(shuffle="tasks"): ddf = timeseries( id_lam=int(1e18), start='2000-01-01', end='2001-06-01', freq='1s', seed=15, ) agg = ddf.groupby( "id", sort=False, ).agg( {"x": "sum", "y": "mean"}, split_out=4, shuffle=shuffle, ) t0 = time.time() result = agg.compute() total_time = time.time() - t0 print(f"shuffle: {shuffle}\ntime: {total_time} sec\nnum-rows: {len(result)}\n") run(shuffle="tasks") # time: 10.846901416778564 sec run(shuffle=False) # time: 18.84290647506714sec run(shuffle="disk") # time: 26.46630048751831 sec
Here we see why we probably shouldn't use
shuffle="disk"
overshuffle=False
as a default.
In feedback for https://github.com/dask/dask/pull/8223, I've noticed users trying to do large shuffles on single-machine clusters: https://github.com/dask/dask/pull/8223#issuecomment-961610095, https://github.com/dask/dask/issues/8294#issuecomment-961301624.
When creating any default Client, the default shuffle mode automatically gets set to
"tasks"
: https://github.com/dask/distributed/blob/69814b4aa7459476dcefa133341b566a5ed4e24a/distributed/client.py#L727-L730However, on a single machine, a disk-based shuffle is likely to be a lot more efficient, plus much lower load on the scheduler.
I think it would be better to keep using the disk-based shuffle if the Client is connected to a LocalCluster (not sure how to tell this). Most users don't know about the different shuffle modes, and shouldn't have to.
Client
detects whether or not it is connected to aLocalCluster
and sets the default shuffle config to diskNote: