dask / dask

Parallel computing with task scheduling
https://dask.org
BSD 3-Clause "New" or "Revised" License
12.47k stars 1.7k forks source link

pivot_table on a lazy Dask dataframe has significant performance cliffs #6562

Open sm-Fifteen opened 4 years ago

sm-Fifteen commented 4 years ago

What happened: Using pivot_table to try and convert a series of large vertically stored CSV files into a "wide" table, I've been running into memory limitation issues with pandas, so I turned to Dask. Dask has shown to have surprisingly poor performance when running pivot_table after read_csv (from 15 seconds to 10 minutes with the MCV below), with a fairly low CPU, Memory and Disk I/O usage, even though one would expect one of those three to be at maximum capacity.

Looking at the client's timing and profiling data, I found most of the runtime to be taken up by pivot_table_count (in purple) and pivot_table_sum (in green):

Capture-dask-overview

Furthermore, most of both tasks' runtime was taken up by something to do with boxing dates, which are considered to be extension dtypes:

Capture-dask-purple-flame Capture-dask-green-flame

I'm not entirely sure what's causing these tasks to take so long (GIL contention, maybe?), but it's making pivot_table perform far worse than it should. The long yellow line on both flamegraphs is from a slow is_unique operation on the categorical column, so there's another similar performance problem there even if we remove the dates.

What you expected to happen: The count and sum sections of the pivot aggregation should not be taking up as much of the computation's runtime as they currently are. Those unexpected performance cliffs are making Dask perform significantly worse than Pandas loading and manipulating each file one after the other, mostly because it appears to be unable to properly use the system resources for that operation and acting as a bottleneck for the entire task graph. Given how pivot_table appears to work, this could be impacting other groupby operations as well.

Minimal Complete Verifiable Example:

Creating data and testing performance in-memory:

import pandas as pd
import numpy as np
import dask.dataframe as dd
big_matrix = np.zeros((500,50000))
pd_matrix = pd.DataFrame(data=big_matrix)
pd_matrix_v = pd_matrix.unstack().reset_index()
pd_matrix_v['level_0'] = pd.to_datetime(pd_matrix_v['level_0'] + 1_000_000_000, unit='s', origin='unix')
dask_matrix_v = dd.from_pandas(pd_matrix_v, npartitions=10)
dask_matrix_v = dask_matrix_v.categorize(columns='level_1')

# Fast (10-15s)
pivot_task = dd.reshape.pivot_table(dask_matrix_v, index='level_0', columns='level_1', values=0)
pivot_task.compute()

# Fast (10-15s)
pd_matrix_v.pivot_table(index='level_0', columns='level_1', values=0)

dd.to_csv(dask_matrix_v, "./test.csv")

Loading data from disk and testing performance for lazy dataframes:

import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client
client = Client() # For monitoring

dask_matrix_v = dd.read_csv("./test.csv/*.part", usecols=['level_0', 'level_1', '0'], dtype={'level_1': 'int32', '0': 'float32'}, parse_dates=['level_0'])
dask_matrix_v = dask_matrix_v.categorize(columns='level_1')

# Very slow (10 minutes)
dask_matrix_v = dd.reshape.pivot_table(dask_matrix_v, index='level_0', columns='level_1', values='0')
dask_matrix_v.compute()

# Just slow without parse_dates (3 minutes)
dask_matrix_v = dd.read_csv("./test.csv/*.part", usecols=['level_0', 'level_1', '0'], dtype={'level_0': 'object', 'level_1': 'int32', '0': 'float32'})

dask_matrix_v = dask_matrix_v.categorize(columns='level_1')
dask_matrix_v = dd.reshape.pivot_table(dask_matrix_v, index='level_0', columns='level_1', values='0')
dask_matrix_v.compute()

Anything else we need to know?: Using the same example as above, this is the timing graph once the datetime values are are replaced by strings. There's still a bottleneck on the categorical axis causing pivot_table_count and pivot_table_sum to stand out (though not as much), but unlike the dates, this one can't be avoided since Dask's pivot_table implementation requires the columns axis to be categorized. Capture-dask-overview-dates-are-strings

Environment:

TomAugspurger commented 4 years ago

Thanks for the report. It'd be great if you could dig into things a little further to simplify.

I'd start with figuring out why pivot_count, which is just calling pandas' pivot_table is taking longer in the case of reading from csv. Something strange is going on there.

klnrdknt commented 3 years ago

I have a similar problem with pivot_table, except that for me it crashes the worker. The laptop I work on has 16 GB RAM. I'm trying to import around 8 files à 5-6GB (long format) and pivot them. I would have believed that the way Dask is designed, that this is specifically a problem it was built to tackle.

The pivot_table is a little frustrating, since pandas seems to be able to handle it, as I can use chunked import, pivot each chunk and then concat all chunks at the end. I was just hoping that dask would speed this process up, since pandas can of course only use a single process.

Is there any recommended way to get around this?

TomAugspurger commented 3 years ago

I think we're still looking for someone to investigate the issue @klnrdknt. Are you interested?

klnrdknt commented 3 years ago

Well, apart from the fact that this is probably out of my expertise, I can try to generate some more info. An in depth investigation is probably above me, unless you give me some hints on where to look and which tools to employ for this.

First I'll try to provide a more in-depth description of my issue.

CHAPTER 1:

For a machine of 16 GB RAM I created a test dataset using:

import numpy as np
import pandas as pd
from datetime import datetime
import dask

# create random datetime
def pp(start, end, n):
    start_u = start.value//10**9
    end_u = end.value//10**9

    return pd.DatetimeIndex((10**9*np.random.randint(start_u, end_u, n, dtype=np.int64)).view('M8[ns]'))

for i in range(9):
    ids = np.random.randint(0, 50, size=int(120e6))
    data = np.random.randn(int(120e6))
    index = pp(pd.to_datetime("2020-01-01"), pd.to_datetime("2020-03-30"), int(120e6))
    df = pd.DataFrame()
    df["Id"] = ids
    df["Values"] = data
    df["Time"] = index
    df.to_csv(f"test_{i}.csv", index=False)

This creates a couple of files which are around 5.1 GB, which is roughly what I use in my real example. The rows and contents are also the same, apart from the fact that there should be some rough order to the datetimeindex with real world data. However, it also did not make sense to create a linear index, as we want some indices to overlap.

These are the operations I perform on the data:

from dask.distributed import Client
client = Client()
client

image

import dask.dataframe as dd
df = dd.read_csv("test_*.csv", assume_missing=True)
df["Time"] = dd.to_datetime(
    df["Time"],
    format="%Y-%m-%d %H:%M:%S"
)
df = df.dropna(subset=["Time"])
df["Time"] = df["Time"].dt.round("S")
df = df.categorize(columns=["Id"])
df = df.pivot_table(
    index="Time",
    columns="Id",
    values="Values"
)
df = df.groupby("Time").mean()
df.columns = list(df.columns)
df = df.reset_index().set_index("Time")
df = df.resample("10S").mean()
df = df.dropna(how="all")
print(df)

Using the standard worker configuration as provided by the call to Client() this crashes even before pivoting and without any call to compute() or persist().

image

I've tried avoiding this error by using less threads and increasing memory size. That makes it a whole lot slower and doesnt change when it crashes.

I'm however using my own PC, not the one at work so the setup might be a little different.

Thinking about this, I guess the main difference between this dataset here and the one at work is that the indices aren't ordered at all, while the dates in the real dataset tend to be almost in correct order with some overlaps here and there.

CHAPTER 2

So I'm going to try a different approach, ordering the indices before pivoting.

df = dd.read_csv("test_*.csv", assume_missing=True)
df["Time"] = dd.to_datetime(
    df["Time"],
    format="%Y-%m-%d %H:%M:%S"
)
df = df.dropna(subset=["Time"])
df["Time"] = df["Time"].dt.round("S")
# added this line ########
df.set_index("Time").reset_index()
##########################
df = df.categorize(columns=["Id"])
df = df.pivot_table(
    index="Time",
    columns="Id",
    values="Values"
)
df = df.groupby("Time").mean()
df.columns = list(df.columns)
df = df.reset_index().set_index("Time")
df = df.resample("10S").mean()
df = df.dropna(how="all")
print(df)

I'm using standard worker configuration again for my machine, which is set by dask to be:

image

This performs a lot of operations making me believe it will work well this time, but well... it crashed on the read_csv operations...

image

Error says however:

image

So I dont know really... Does using parquet help?

klnrdknt commented 3 years ago

CHAPTER 3

Trying other stuff with parquet. I modified the generator script to produce a faster testable example.

I now only exports two rather small sets of parquet file. Want a spoiler? This also leads to a crash.

n = int(120e4)
for i in range(2):
    ids = np.random.randint(0, 50, size=n)
    data = np.random.randn(n)
    index = pp(pd.to_datetime("2020-01-01"), pd.to_datetime("2020-03-30"), n)
    df = pd.DataFrame()
    df["Id"] = ids
    df["Values"] = data
    df["Time"] = index
    # df.to_csv(f"test_{i}.csv", index=False)
    df.to_parquet(f"test_{i}.parquet", partition_columns=[""], index=False, engine="pyarrow")

In jupyter, I then also split part of the operations to see what works and what does not.

I changed the import to:

# df = dd.read_csv("test_*.csv", assume_missing=True)
df = dd.read_parquet("test_*.parquet", engine="pyarrow")
df["Time"] = dd.to_datetime(
    df["Time"],
    format="%Y-%m-%d %H:%M:%S"
)
df = df.dropna(subset=["Time"])
df["Time"] = df["Time"].dt.round("S")
# added this line ########
df = df.set_index("Time").reset_index()

This part is suuuuuper fast. Then:

df = df.categorize(columns=["Id"])
df = df.pivot_table(
    index="Time",
    columns="Id",
    values="Values"
)
print(df)

This also works, but here comes the crash down to earth:

df = df.compute()
#### killed it

KilledWorker: ("('pivot_table_sum-chunk-f827f0c57d7b6267a0d334b04a9cc465', 0, 0, 0)", <Worker 'tcp://127.0.0.1:38857', name: 3, memory: 0, processing: 2>)
klnrdknt commented 3 years ago

CHAPTER 4

One thing that seems to work well (same data as the post before) is to use groupby instead of pivot_table.

# df = dd.read_csv("test_*.csv", assume_missing=True)
df = dd.read_parquet("test_*.parquet", engine="pyarrow")
df["Time"] = dd.to_datetime(
    df["Time"],
    format="%Y-%m-%d %H:%M:%S"
)
df = df.dropna(subset=["Time"])
df["Time"] = df["Time"].dt.round("S")
# added this line ########
df = df.set_index("Time").reset_index()
df = df.groupby(["Time", "Id"]).mean()
df = df.compute()

For the smaller data set this runs flawlessly, and I can then simply do .unstack().

Trying this out with bigger data...

EDIT: Fails on bigger data too, around 10 files with 100e5 points each.

sm-Fifteen commented 3 years ago

Ah, I had started doing some profiling for this before running into issues with my FunctionTrace dumps being too large to be read correctly, but then I already had a working (if slow) Pandas script so I kind of forgot about this.

I'll see if I can dig up the test bench I had and get useful diagnosing data out of it as well.

sm-Fifteen commented 3 years ago

Ok, so killed the profiler after a couple minutes instead of letting it finish (and ended with 1GB of json profiling data, which still fits in what the Firefox profiler viewer can take) and judging by what I can see: image

The problem with dates mainly appears to be that all threads are trying to run a very hot loop in pandas' code that calls two python functions repeatedly: tz and freq, both of which are trivial but the threading appreas to cause enough GIL contention to make these pause for hundreds of milliseconds at a time.

I would share the profiling dump, but most dumps I can generate where the process takes long enough for the problem to be apparent (making the CSVs "too small" causes processing to complete too quickly) are far too large for me to send them. The code I was profiling is the same as the original post, just with the "fast" bits removed and an import for functiontrace:

Generating the files:

import pandas as pd
import numpy as np
import dask.dataframe as dd
big_matrix = np.zeros((350,50000))
pd_matrix = pd.DataFrame(data=big_matrix)
pd_matrix_v = pd_matrix.unstack().reset_index()
pd_matrix_v['level_0'] = pd.to_datetime(pd_matrix_v['level_0'] + 1_000_000_000, unit='s', origin='unix')
dask_matrix_v = dd.from_pandas(pd_matrix_v, npartitions=10)
dask_matrix_v = dask_matrix_v.categorize(columns='level_1')

dd.to_csv(dask_matrix_v, "./test.csv")

Pivoting everything:

import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

import functiontrace
functiontrace.trace()

if __name__ == '__main__':
    #client = Client()

    dask_matrix_v = dd.read_csv("./test.csv/*.part", usecols=['level_0', 'level_1', '0'], dtype={'level_1': 'int32', '0': 'float32'}, parse_dates=['level_0'])
    dask_matrix_v = dask_matrix_v.categorize(columns='level_1')

    # Very slow (10 minutes)
    dask_matrix_v = dd.reshape.pivot_table(dask_matrix_v, index='level_0', columns='level_1', values='0')

    my_mat = dask_matrix_v.compute()
sm-Fifteen commented 3 years ago

Could this issue actually be related to actual File I/O being what's causing GIL contention and making threads slow down? It's apparently a problem with basic file reading in Python and, while I know that Dask actually defers its file I/O to Pandas, I figured that, given the issue from the original post happens on disk but not in memory, and that concurrent disk I/O is apparently a nightmare, that the problem could reside there.

Whether or not this is actually the case is another story, given I'm not 100% sure of how to go about effectively benchmarking this and my understanding of Pandas' code is all surface-level. From a quick glance, Pandas' C file reader (which their CSV reader is built on top of, the C engine seems to be the default one?) releases the GIL while tokenizing and reading data from the disk, so I would understand this to mean that the disk IO wouldn't by itself block other tasks and that the problem might really just be GIL contention with no blocking.

Integrated GIL monitoring (#6391) would probably be useful for this.