databricks / koalas

Koalas: pandas API on Apache Spark
Apache License 2.0
3.32k stars 356 forks source link

pivot_table performance is extremely slow compared to Pandas #2179

Open danielknight opened 2 years ago

danielknight commented 2 years ago

Issue:

I am noticing that the koalas DataFrame.pivot_table is performing much slower than the pandas version. The pandas pivot_table works almost instantaneously, whereas the fastest I can get the koalas pivot_table to run on my local machine, is roughly 8 and a half minutes.

Problem Background

I am trying to create a pivot table from some data that looks roughly like:

The data has shape (5800, 3) roughly, and looks like the following: destination_vm port num_sources
0 vm1 80 .0234
1 vm2 88 .123
3 vm3 3100 .0045
The desired result is something like the following, but with a much larger shape (2058, 1146): port 80 88 3100
destination_vm
vm1 .0234 0 0
vm2 0 0.123 0
vm3 0 0 .0045

Running a pivot to transform the data into a sparse matrix like the above, I run the following:

spark = SparkSession.builder.appName("my_job").config( "spark.driver.memory", "8g").config( "spark.executor.memory", "6g").config( "spark.sql.execution.arrow.pyspark.enabled", "true").getOrCreate()

ks.set_option('compute.default_index_type', 'distributed-sequence')

self._logger.info("Creating pivot table")

df_listening_vms_by_port = df_listening_for_distribution.pivot_table( values=["num_sources"], index=['destination_vm'], columns='port', aggfunc={'num_sources': 'sum'}, fill_value=0 )

self._logger.info("Done creating pivot table")

Output 2021-07-23 16:56:57,914 - feature_engineering - INFO - Creating pivot table 2021-07-23 17:05:28,163 - feature_engineering - INFO - Done creating pivot table

The ~8m30s gap seems like a very long time to pivot a DF that is not too large. The output shape is (2058, 1146), so the size has gotten much bigger. However, Pandas does a similar pivot almost instantaneously.

Things I've tried

I have made sure to use the distributed-sequence default index, but this doesn't seem to improve the performance. In local mode, I have tried doing a local_checkpoint on the input DF to try and clear out overhead from previous processing steps. I have tried running this in cluster mode with 8 executors with 4GB ram each, but it doesn't speed up much, which makes me think there is some Window bottleneck or something.

I've also tried exporting the input DF as a csv, reading it into a koalas DF in a local pyspark shell, and then watching the Job via the spark UI. The pivot job finishes quickly, but the actual process doesn't return in the pyspark shell for a long time, and there is a long gap in between where it appears like nothing is happening. Screenshot from 2021-07-23 18-22-54

Here is more detail on the DAG when running the pivot in a local pyspark shell: Screenshot from 2021-07-23 18-33-44

Is there something obvious I am missing here to help speed this up?

danielknight commented 2 years ago

Am I doing something obvious that would cause the behavior I am seeing where there is a long gap between the pivot Job and the later toPandas job? (See the first smaller screenshot). Is there some way I can gain visibility into what is happening in the interim?

If I set the sparkContext loglevel to DEBUG, there appears to be nothing happening for long periods of time.

I am noticing this pattern across my application, not just with the pivot_table operation. Can you help me understand what triggers a toPandas job and why there might be long gaps where no jobs are even scheduled, then the toPandas jobs will come suddenly. This happens even when running on a cluster, with 4 executors, 8 cores each, and using the default number of partitions, 200. My data size is not very large.

Consider the following snippet: self._logger.info('Concatenating kdf1 and kdf2 frames') ks.set_option('compute.ops_on_diff_frames', True) df_all_port_features = ks.concat([kdf1, kdf2], sort=False, axis=1) ks.reset_option('compute.ops_on_diff_frames') self._logger.info('Filling na') df_all_port_features.fillna(0, inplace=True) self._logger.info('foo')

The time between the "Concatenating..." message and the "Filling na" message is ~10 minutes, with nothing happening in the interim in the Spark UI. Here are the relevant dependencies of the container running the application; maybe I have something misconfigured.:

pip3 install kafka-python requests==2.25.1 kubernetes pyOpenSSL protobuf sklearn==0.0 scikit-learn==0.24.2 scipy==1.5.4 numpy==1.19.5 pandas==1.1.5 xgboost==1.4.2 pyarrow==4.0.0 pyspark==3.0.1 koalas==1.8.0

danielknight commented 2 years ago

cc @xinrong-databricks and @itholic; maybe you guys need more information from me?

I have also tried with spark 2.4, and get the same results.

itholic commented 2 years ago

Hi, @danielknight . Sorry for the late.

I just tried to reproduce the issue, and I ran the code below:

from databricks import koalas as ks
import random
num_data = 5800

df_listening_for_distribution = ks.DataFrame(
    {
        "destination_vm": [f"vm{random.randrange(1, 2058)}" for _ in range(num_data)],
        "port": [
            random.randrange(10000, 11146)
            for _ in range(num_data)
        ],
        "num_sources": [random.random() for _ in range(num_data)],
    }
)

result_df = df_listening_for_distribution.pivot_table(
    values=["num_sources"],
    index=["destination_vm"],
    columns="port",
    aggfunc={"num_sources": "sum"},
    fill_value=0,
)  # took 1.85 minutes

result_df.shape  # (1931, 1138)

And it took just around 2 mins for computing the pivot_table (It's seems like still not fast, though)

Seems like the shape of table is same as your example - (5800, 3) - , and the shape of the result table - (1931, 1138) - looks similar to your example case.

I used 8 executors with 8GB ram, though.

Could you take a look into my code if did I reproduce your case correctly ??

danielknight commented 2 years ago

I agree that your code is similar to this pivot I am doing. In a local pyspark shell, running your code exactly in local mode returns after ~10 minutes.

On my cluster, the most recent run with 8 executors (4G memory each) ran against a KDF with shape (28618, 3) and took around 3m17s, which I can infer from these log messages:

2021-07-29 22:17:17,237 - feature_engineering - INFO - Creating pivot table
2021-07-29 22:20:34,545 - feature_engineering - INFO - Done creating pivot table

Obviously (28618, 3) is a larger data-size than the (5800,3) the pivots we both tried, so it is at least a good sign that it only costs an additional minute and a half, even with less RAM per executor. So maybe I am getting the best performance possible for the time being out of the pivot_table?

I am encountering some additional issues later in this same feature engineering code though. As I build the KDF's from the pivot_table operations, I then need to concatenate them along axis=1, and this operation takes another 13.8 minutes. Later I do a fillna.

self._logger.info('Concatenating TCP and UDP frames')
ks.set_option('compute.ops_on_diff_frames', True)
df_all_port_features = ks.concat([TCP_one_hot, UDP_one_hot], sort=False, axis=1)
ks.reset_option('compute.ops_on_diff_frames')
# this takes 13.8 minutes
# some other stuff happens that potentially creates NA values, but the data shape doesn't change.
self._logger.info('Filling na')
df_all_port_features.fillna(0, inplace=True)
# this takes another 11.3 minutes

For the concat, I am probably experiencing the cost of doing a large join. Any way around this? I will try a merge when I have availability to test that and post my findings

For the fillna I am probably experiencing the cost of moving the big concatenated DF into a single parition, as suggested by the warning in the docs here, https://koalas.readthedocs.io/en/latest/reference/api/databricks.koalas.DataFrame.fillna.html This leads to move all data into single partition in single machine and could cause serious performance degradation. Avoid this method against very large dataset.

In general, maybe I am running up against the performance constraints of koalas? I am not using this in a notebooked environment, but rather trying to make a POC of a production service. It will run a k8s-based pyspark job which will run on a certain schedule, thus it is important to us that it is both reliable and performant, as to not hog resources from the cluster. Maybe I will encounter the same overhead in raw pyspark, and our use-case will require more resources regardless of using koalas or raw pyspark.