dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Merging using ```shuffle = 'disk'``` is 25x slower than when using ```shuffle='tasks'``` #5554

Open bsesar opened 2 years ago

bsesar commented 2 years ago

What happened: When using shuffle = 'disk' merging took 50 minutes compared to 2 minutes when using shuffle = 'tasks'. Also, Dask dashboard was showing very low CPU utilization when using shuffle = 'disk'.

What you expected to happen: I expected merging with shuffle = 'disk' to be faster than merging with shuffle = 'tasks' since the load on the scheduler is supposed to be lower in that case.

Minimal Complete Verifiable Example:

from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd
import numpy as np
import os
from time import time
import pandas.util.testing
import dask.config

# folder in which data will be saved
data_folder = 'path_to_folder_for_saving_data'

# enable shuffle compression
# NOTE: you need to install python-snappy package in order to use Snappy compression!!
dask.config.set({
  "dataframe.shuffle-compression": 'Snappy',
  'temporary_directory': data_folder
  })

# start a local Dask cluster
# (note: I used a single machine with 14 cores and started the cluster with n_workers=14 and threads_per_worker=1)
client = Client(local_directory=data_folder )

# create the folder for fake data for the right side of the merge
os.mkdir('%s/right_ddf' % data_folder)

# number of partitions for the right side of the merge
N_partitions_right = 40

# number of rows per partition
Nrows_right = 1038790

# create data for the right side of the merge
for partition_id in range(N_partitions_right):
  df = pd.DataFrame({'ID':pandas.util.testing.rands_array(16, Nrows_right, dtype='O'),
                     'Date':pd.Timestamp('2019-12-01')})
  df.to_parquet('%s/right_ddf/%d.parquet' % (data_folder, partition_id))

# create the folder for fake data for the left side of the merge
os.mkdir('%s/left_ddf' % data_folder)

# number of partitions for the left side of the merge
N_partitions_left = 1325

# number of rows per partition
Nrows_left = 99063

# to create the left side of the merge, sample Nrows_left from a randomly
# selected partion on the right side
partitions_to_sample = np.random.randint(0, high=(N_partitions_right-1), size=N_partitions_left)
for partition_id, partition_to_load in enumerate(partitions_to_sample):
  df = pd.read_parquet('%s/right_ddf/%d.parquet' % (data_folder, partition_to_load))
  df = df.sample(Nrows_left, replace=True)
  df.to_parquet('%s/left_ddf/%d.parquet' % (data_folder, partition_id))

# benchmark merging using tasks and disk shuffle
for shuffle_type in ['tasks', 'disk']:  

  # load data
  left_ddf = dd.read_parquet('%s/left_ddf' % data_folder)
  right_ddf = dd.read_parquet('%s/right_ddf' % data_folder)

  # merge
  left_ddf = left_ddf.merge(right_ddf, on='ID', how='inner', shuffle=shuffle_type)

  t0 = time()
  left_ddf = left_ddf.persist().head()
  elapsed_time = (time() - t0)/60.
  print('Merging using %s takes %.1f minutes' % (shuffle_type, elapsed_time))

EDIT: If the above MVE is too much for your machine, use Nrows_right = 1038, Nrows_left = 99, and N_partitions_left = 600. Merging with shuffle='tasks' then takes 0.4 min, and merging with shuffle='disk' takes 2.4 minutes, or a factor of 6 difference (with n_workers=14 and threads_per_worker=1).

Environment:

fjetter commented 2 years ago

Thanks for this example. I'm currently trying to reproduce but a few notes up front

  1. I realized your example is extremely asymmetric. Right dataframe has few and large files while the left dataframe has many partitions with few rows. This kind of asymmetry typically causes less than optimal performance since dask cannot fix this distribution itself. I would strongly recommend for you to repartition the left DF before shuffling, e.g. left_ddf = left_ddf.repartition(npartitions=40) (40 is kind of random, you might even be better of with a smaller value since htere are fewer rows left)
  2. The minimal example you provided is already relatively large. I'm executing it on a small-ish notebook but even the task based shuffle is just swapping and I am disk bound. Do you see the same problem if you scale the problem down?
bsesar commented 2 years ago

Thank you for the comments, @fjetter. My answers are below:

  1. The example is extremely asymmetric because my real data are asymmetric :-) My real left dataframe has 1300 partitions of 100 MB in size and the right dataframe has 40 partitions of 100 MB in size. Yes, I could repartition data, but then I am running the risk of running out of memory (per worker). Besides, even though the dataframes are asymmetric, merging with shuffle='tasks' does not seem to have a problem with them (i.e., merging takes about 2 minutes in that case). That being said, reducing the number of partitions in the left dataframe by half does reduce the merging time in half. So it seems that shuffle='disk' is quite sensitive to the number of dataframes, while shuffle='tasks' is not as sensitive (or is just too fast to not make any notable difference).
  2. Yes, I see the same problem if I set Nrows_right = 1038, Nrows_left = 99, and N_partitions_left = 600 in the MVE code. Merging with shuffle='tasks' takes 0.4 min, and merging with shuffle='disk' takes 2.4 minutes, or a factor of 6 difference.

In the end, my point is that on the same machine with the same data and disk, merging with shuffle='tasks' is much faster than merging with shuffle='disk'.