dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

P2P shuffle is slow with string dtypes #7880

Open mrocklin opened 1 year ago

mrocklin commented 1 year ago
import coiled
import dask.dataframe as dd
from dask.distributed import wait

cluster = coiled.Cluster(
    n_workers=30,
    worker_cpu=4,
    region="us-east-2",  # start workers close to data to minimize costs
    arm=True,
)

client = cluster.get_client()
# this takes 1m21s
df = dd.read_parquet("s3://coiled-datasets/uber-lyft-tlc/")
df = df.set_index("request_datetime", shuffle="tasks").persist()
_ = wait(df)
# this takes 2m12s
df = dd.read_parquet("s3://coiled-datasets/uber-lyft-tlc/")
df = df.set_index("request_datetime", shuffle="p2p").persist()
_ = wait(df)

GIL contention is very high during the p2p shuffle (also during tasks) and cpu usage is at 100+%, implying, maybe, that the creation/deletion of lots of Python objects is slowing us down considerably.

cc @hendrikmakait @jrbourbeau

hendrikmakait commented 1 year ago

I'm wondering how to address this best. In your example, the string columns are of type string[python], so converting back to that type feels like the right thing to do for me even if it's overly expensive. Maybe converting string[python] to string[arrow] during a P2P shuffle if dataframe.convert-string is set and raising a warning of we encounter a string[python] column and it's not set would be the right way?

mrocklin commented 1 year ago

Sorry, I had the config option set. Dtypes coming in were string[pyarrow].

mrocklin commented 1 year ago
dask.config.set({"dataframe.convert-string": True})  # use PyArrow strings by default
fjetter commented 12 months ago

Brief update about this in case somebody stumbled over it.

with current main we're mostly at the same performance as tasks. However, when disabling disk (dask.config.set({"distributed.p2p.disk": True})) entirely we're at 10s so disk appears to slow us down much more than one would naively assume (this is trying to be addressed in https://github.com/dask/distributed/pull/8323)

Method Duration
tasks 1min 8s
p2p 1min 15s
p2p (w/out disk) 10s