dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.56k stars 715 forks source link

Can we get rid of sharding? #7824

Open crusaderky opened 1 year ago

crusaderky commented 1 year ago

The distributed.comm.shard setting, which defaults to 64 MiB, is supposed to split a buffer that is larger than the shard size into separate buffers. The intent is to prevent issues that commonly happen beyond 2 GiB with various compression and network protocols.

This setting does nothing for dask.dataframe:

>>> import numpy
>>> import pandas
>>> from distributed.protocol import serialize_bytelist
>>> a = numpy.random.random((2**23,2))  # 128 MiB
>>> frames = serialize_bytelist(a)
>>> [memoryview(f).nbytes for f in frames]
[32, 236, 67108864, 67108864]
>>> frames = serialize_bytelist(pandas.DataFrame(a))
>>> [memoryview(f).nbytes for f in frames]
[40, 123, 554, 134217728, 0]

This should be a straightforward bug to fix. However, it also strongly indicates that the whole system tries to solve a purely hypothetical problem. No dask.dataframe user, to my knowledge, has ever complained about crashes.

Do we have an inventory of protocols that break beyond 2 GiB, and/or create temporary deep copies of whole buffers while they work on them?

Sharding causes major performance problems with compression, where any algorithm that doesn't support a decompress_into function - all currently supported algorithms; the only ones I know that do are cramjam and blosc2 - is plagued by deep-copies and memory spikes because of this:

Sharding also adds complexity to the already bloated serialization layer.

CC @milesgranger

jacobtomlinson commented 1 year ago

cc @pentschev @quasiben @madsbk

pentschev commented 1 year ago

UCX has an internal mechanism to do that which can be controlled via UCX_TCP_{RX,TX}_SEG_SIZE environment variables, so I don't believe sharding is of much relevance for UCX. I'm also not aware of any issues with messages larger than 2GiB with or without UCX, but to be fair I rarely see dataframe (or rather series) larger than a couple hundred MBs being transferred in Dask, so I don't know how common large transfers are in the wild.