dask / dask

Parallel computing with task scheduling
https://dask.org
BSD 3-Clause "New" or "Revised" License
12.4k stars 1.69k forks source link

Dask-expr - Extremely slow with using .compute #11260

Open NCSUFeNiX opened 1 month ago

NCSUFeNiX commented 1 month ago

I installed the basic dask version using "pip install dask". When running, I receive a FutureWarning:

Dask dataframe query planning is disabled because dask-expr is not installed. You can install it with 'pip install dask[dataframe]' or 'conda install dask'. This will raise in a future version.

I proceeded to install using the provided pip command. However, upon doing so, my .compute() went from less than 1 second to run, to 22 seconds (sometimes longer in later parts of the code). These are small slices/excerpts from the entire dataframe so I can create a summary of certain events and do some calculations. The slices are only about 15 rows so I am just trying to extract them to pandas to get rid of any partitioning and do some calculations.

Code Excerpt (part of a for loop) --> I split the excerpt of the dataframe assignment and the compute steps up to figure out where the problem lies. For background, ddf is a dask dataframe that, in its final state, will consist of about 250,000 partitions, one for each CSV file read in.

    event_a = ddf.loc[start_index:end_index]
    print('Partial dataframe into event_i.', '---- Process Time: %s ----' % (datetime.now() - time_start_j))
    event_i = event_a.compute()
    print('Computed event_i to pandas dataframe.', '---- Process Time: %s ----' % (datetime.now() - time_start_j))
    event_i['Event ID'] = eventid
    print('Finished collecting Event DataFrame.', '---- Process Time: %s ----' % (datetime.now() - time_start_j))

Output (without dask-expr):

Starting ID 1 Partial dataframe into event_i. ---- Process Time: 0:00:00.017307 ---- Computed event_i to pandas dataframe. ---- Process Time: 0:00:00.133990 ---- Finished collecting Event DataFrame. ---- Process Time: 0:00:00.134989 ----

Output (with dask-expr):

Starting ID 1 Partial dataframe into event_i. ---- Process Time: 0:00:00.041680 ---- Computed event_i to pandas dataframe. ---- Process Time: 0:00:21.894825 ---- Finished collecting Event DataFrame. ---- Process Time: 0:00:21.894825 ----

phofl commented 1 month ago

Can you provide something this is reproducible and give us the dask version that you are running things with?

phofl commented 1 month ago

Could you create an example that doesn't rely on external files? I.e. something that we can copy paste. See https://matthewrocklin.com/minimal-bug-reports for a more detailed description

NCSUFeNiX commented 1 month ago

K, so I deleted my previous post based upon your most recent request and am providing the information requested below.

Version: Dask 2024.7.1

I have supplied a copy and paste version of the code below, as requested. The results I received with and without dask-expr for this particular code are also shown below. The problem does seem to scale with the size of the dask dataframe, even though I am only querying it once. What is interesting about the supplied code below versus the code I previously supplied (which referenced example CSV files):

  1. There is still a significant process time increase with the presence of dask-expr
  2. However, while there is still a notable time increase with the .compute() step, the largest time increase switched to the earlier extracting/slicing of the data with .loc in this latest version.

I am not sure if that has something to do with this latest version all being internalized code (i.e. not referencing CSV files) or the fact that my previously supplied example had lots of different data types and intermixed datatypes within the columns (i.e. there are explicit "NaN" strings present across rows when data is missing, and some cells are blank.

Output (without dask-expr):

Starting Program. Finished Reading ---- Process Time: 0:00:08.307470 ---- Start Add continuous index. Finished adding continuous index. ---- Process Time: 0:00:03.352517 ---- Starting event slicing. Partial dataframe into event_i. ---- Process Time: 0:00:00.017952 ---- Computed event_i to pandas dataframe. ---- Process Time: 0:00:00.029939 ---- Finished collecting Event DataFrame. ---- Process Time: 0:00:00 ----

Output (with dask-expr):

Starting Program. Finished Reading ---- Process Time: 0:00:05.408973 ---- Start Add continuous index. Finished adding continuous index. ---- Process Time: 0:00:00.037657 ---- Starting event slicing. Partial dataframe into event_i. ---- Process Time: 0:00:10.001254 ---- Computed event_i to pandas dataframe. ---- Process Time: 0:00:01.642147 ---- Finished collecting Event DataFrame. ---- Process Time: 0:00:00 ----

#!/usr/bin/env python3
from datetime import datetime
import pandas as pd
import dask.dataframe as dd
import numpy as np
# ---------------------------------------------------------
# Input
# ---------------------------------------------------------
lst_colnames = []
for i in range(0, 47): lst_colnames.append('Rand ' + str(i))
lst_dfs = [] # List of pandas dataframes to be populated
cnt_dfs = 160 # Number of dataframes to be generated
# ---------------------------------------------------------
# Read Files into Dask DataFrame
# ---------------------------------------------------------
time_start = datetime.now()
print('Starting Program.')
for i in range(0, cnt_dfs): #Populate the dataframe list
    cnt_rows = np.random.randint(8000, 50000)
    df_i = pd.DataFrame(np.random.random(size=(cnt_rows, 47)), columns=lst_colnames)
    lst_dfs.append(dd.from_pandas(df_i, npartitions=1))
ddf = dd.concat(lst_dfs)
print('Finished Reading', '---- Process Time: %s ----' % (datetime.now() - time_start))
# ---------------------------------------------------------
# Add Continuous Index across all partitions (important for later)
# ---------------------------------------------------------
time_start = datetime.now()
print('Start Add continuous index.')
ddf['x'] = 1
ddf['x'] = ddf.x.cumsum()
ddf = ddf.set_index('x', drop=True)
print('Finished adding continuous index.', '---- Process Time: %s ----' % (datetime.now() - time_start))
# ---------------------------------------------------------
# Process a random chunk
# ---------------------------------------------------------
start_index = 8651
end_index = 8665
eventid = 7
print('Starting event slicing.')
time_start = datetime.now()
event_a = ddf.loc[start_index:end_index]
print('Partial dataframe into event_i.', '---- Process Time: %s ----' % (datetime.now() - time_start))
time_start = datetime.now()
event_i = event_a.compute()
print('Computed event_i to pandas dataframe.', '---- Process Time: %s ----' % (datetime.now() - time_start))
time_start = datetime.now()
event_i['Event ID'] = eventid
print('Finished collecting Event DataFrame.', '---- Process Time: %s ----' % (datetime.now() - time_start))