modin-project / modin

Modin: Scale your Pandas workflows by changing a single line of code
http://modin.readthedocs.io
Apache License 2.0
9.59k stars 647 forks source link

Poor performance of df.insert and df.to_parquet #7325

Open yx367563 opened 1 week ago

yx367563 commented 1 week ago

Modin version checks

Reproducible Example

import ray
# import pandas as pd
import modin.pandas as pd
import time

@ray.remote
def process_data():
    modin_df_list = [] # stored about 60 modin_dfs
    for i in range(60, size):
        t = time.perf_counter()
        modin_df = pd.read_csv("xxx.csv")
        modin_df_list.append(modin_df)

        source_df = pd.concat([df for df in modin_df_list])
        print(f'cost: {time.perf_counter() - t:.8f}s')
        source_df["xxx"].apply(round)
        print(f'cost: {time.perf_counter() - t:.8f}s')
        target_df = source_df.groupby(['xxx', 'yyy'])[select_cols].sum().reset_index()
        print(f'cost: {time.perf_counter() - t:.8f}s')
        target_df.insert(1, 'new-line', 'test')
        print(f'cost: {time.perf_counter() - t:.8f}s')
        target_df.to_parquet(f"xxx.parquet")
        print(f'cost: {time.perf_counter() - t:.8f}s')
        del(modin_df_list[0])

if __name__ == '__main__':
    ray.init()
    ray.get(process_data.remote())

Issue Description

When performing concat, group_by, and map operations, the performance is 2-3 times faster than pandas, but a significant degradation in performance occurs when performing to_parquet and insert operations, where a large number of small parquet files are observed. Note: The size of all read-in files is around 400M. Ray cluster configured with eight 32-core machines.

Expected Behavior

Performance can't be much worse than native pandas, at least.

Error Logs

No response

Installed Versions

modin: 0.30.1 ray: 2.23.0
YarShev commented 6 days ago

Hi @yx367563, thanks for filing this issue! You should not wrap process_data into ray.remote decorator. Modin itself takes care of distributing computation. If you remove ray.remote decorator and still see performance worse pandas, check out these posts on how to boost performance (1, 2).

yx367563 commented 5 days ago

@YarShev Thank you for your reply, in fact I only added ray.remote at the very beginning, it was to allow the initial procedure to be executed on a non-head node. I found that the performance optimisations provided are mainly for modin dataframe calculations, but is it possible to improve the performance of operations like to_parquet, I think a possible reason for slow performance is the generation of too many small files.

YarShev commented 4 days ago

@yx367563, can you try calling this before to_parquet?

import modin.config as cfg

with cfg.context(NPartitions=1):
    df = df._repartition(axis=0)

df.to_parquet(...)

This should write a single parquet file.

yx367563 commented 4 days ago

@YarShev I tried executing the following code but it still generates a lot of small files

with cfg.context(NPartitions=1):
    df._repartition(axis=0)
    df.to_parquet("xxx.parquet")
YarShev commented 4 days ago

@yx367563, sorry, I didn't put the code correctly. Please see the updated comment above.

yx367563 commented 4 days ago

@YarShev Yes, currently only one part file will be generated in the folder, but the performance is still very poor. Can I assume that modin has poor performance for operations such as storage and inserting columns, and is more suitable for some computationally intensive operations?

YarShev commented 4 days ago

@yx367563, the operations such as storage and inserting columns should also perform well depending on the data size. It would be great if you could share the exact script and data you are using so we could reproduce the performance issue.

yx367563 commented 4 days ago

@YarShev I'm sorry I can't provide the exact data file, but I can tell you it's a simple file with 100 columns of data, about 1 million rows, and a file size of about 500M. The operation is to read the file into a modin dataframe, select 50 columns, and concat 60 such dataframes, and then execute:

source_df["xxx"].apply(round)
target_df = source_df.groupby(['xxx', 'yyy'])[select_cols].sum().reset_index()
target_df.insert(1, 'new-column', date)
with cfg.context(NPartitions=1):
            target_df = target_df._repartition(axis=0)
            target_df.to_parquet("xxx.parquet")
yx367563 commented 4 days ago

@YarShev There is another very strange phenomenon. I tested the time taken to call each method. I found that if the operation of inserting columns is added, the time taken by to_parquet will be very short (about 2 seconds), but if the operation of inserting columns is deleted, the time taken by to_parquet will be very long (more than 30 seconds).

yx367563 commented 4 days ago

In addition, I found that the total number of CPUs in the Ray cluster is used to set NPartitions in the initialize_ray method. Will it affect performance when there are too many CPUs? Can other configurations modify this value? Would you happen to have any suggestions or best practices?

YarShev commented 4 days ago

In addition, I found that the total number of CPUs in the Ray cluster is used to set NPartitions in the initialize_ray method. Will it affect performance when there are too many CPUs? Can other configurations modify this value? Would you happen to have any suggestions or best practices?

NPartitions is intentionally set with the value of CPUs count in the Ray cluster to have maximum performance. If the CPUs count is low (e.g., <=4), you will have a low number of partitions to be processed in parallel. You can modify the number of partitions on your own with the following config.

import modin.config as cfg

cfg.NPartitions.put(<N>)

Note that if you set a value that is much greater than the number of CPUs, you will get a dataframe that is overpartitioned, which also affects performance. You can find some performance tips on Optimizations Notes page.

YarShev commented 4 days ago

@YarShev There is another very strange phenomenon. I tested the time taken to call each method. I found that if the operation of inserting columns is added, the time taken by to_parquet will be very short (about 2 seconds), but if the operation of inserting columns is deleted, the time taken by to_parquet will be very long (more than 30 seconds).

We will try to reproduce your performance issue with a generated file of the size you mentioned.

The operation is to read the file into a modin dataframe, select 50 columns, and concat 60 such dataframes, and then execute:

Could you provide a script with the operations you perform? That would be helpful because your sentence and the script in the issue description seem to have some discrepancies.

YarShev commented 4 days ago

Could you also share the following info?

yx367563 commented 4 days ago

Could you provide a script with the operations you perform? That would be helpful because your sentence and the script in the issue description seem to have some discrepancies.

@YarShev The general logic is the same. The script provided at the beginning slightly simplifies some unimportant parts. You can refer to this script

import ray
# import pandas as pd
import modin.pandas as pd
import time

@ray.remote
def process_data():
    modin_df_list = [] # stored 59 modin_dfs. The logic of reading and preprocessing is the same as below
    for i in range(59, size):
        # Read file and do some preprocess
        modin_df = pd.read_csv("xxx.csv")
        select_cols = list(modin_df.columns[-40:])
        select_cols_plus = select_cols[:]
        select_cols_plus.extend(['a', 'b', 'c'])
        modin_df = modin_df[select_cols_plus]
        modin_df_list.append(modin_df)

        # Start data analysis
        source_df = pd.concat([df for df in modin_df_list])
        source_df["a"].apply(round)
        target_df = source_df.groupby(['a', 'b'])[select_cols].sum().reset_index()
        target_df.insert(1, 'new-line', 'test')
        target_df.to_parquet(f"xxx.parquet")
        del(modin_df_list[0])

if __name__ == '__main__':
    ray.init()
    ray.get(process_data.remote())

python version: 3.10.12 Modin version: 0.30.1 Engine: Ray 2.23.0 CPUs number: 256

yx367563 commented 4 days ago

@YarShev If I have a large number of CPUs in the cluster, say around 300, and the processing logic is as described above, will I still achieve better performance by setting it to 300? In addition, if I enable auto-scaling in Ray cluster, then the number of CPUs will change dynamically with different loads, how should I configure it better?

YarShev commented 3 days ago

How many nodes do you have in the Ray cluster?

@YarShev If I have a large number of CPUs in the cluster, say around 300, and the processing logic is as described above, will I still achieve better performance by setting it to 300?

Actually, you should adjust this value for each concrete case.

  1. If you are running in a single node, it is not always beneficial to use all CPUs cores since it depends on the workload.
  2. If you are running in a cluster, there might be data transfers across nodes, which can affect performance. The less data transfers occur, the better performance you get. Modin relies on Ray scheduling mechanism, which is quite mature and should take into account data placement. You could try to use RayInitCustomResources and RayTaskCustomResources to specify task parallelism and Ray nodes where tasks will be scheduled on.

In addition, if I enable auto-scaling in Ray cluster, then the number of CPUs will change dynamically with different loads, how should I configure it better?

There are no certain recommendations here as well. You should adjust CPUs and NPartitions values for each concrete case.

yx367563 commented 3 days ago

@YarShev OK. In Ray Cluster I used 8~10 nodes, each with 32 cores.

yx367563 commented 3 days ago

@YarShev By the way, is there a way to turn off the parallelism of Modin and only keep the dataframe data structure used by Modin, and if so, will the performance be the same as pandas?

YarShev commented 3 days ago

@yx367563, we are currently working on this in https://github.com/modin-project/modin/pull/7259 to allow for execution with native pandas under the hood. You could try to set cfg.NPartitions.put(1) but it still involves data transfer between processes for a single partition.

yx367563 commented 2 days ago

@YarShev It is not necessary to call the implementation logic of pandas. Just keep the dataframe data structure of modin, but remove the parallel logic and execute it serially on a single core. Is this possible?

YarShev commented 2 days ago

When you set NPartitions to 1, you have a single partitioned Modin dataframe and thus it will be proccessed on a single core (but on a worker process). Note that it requres Ray to pull data onto that worker process to be able to process the operation. We are trying to avoid this overhead in #7258 and operate directly on a pandas dataframe in the main process.

Retribution98 commented 22 hours ago

Hi @yx367563, Thanks for the details, I was able to reproduce your problem, but unfortunately I don't have a solution for you.

First of all, "achieving performance as good as native pandas, at least" is the goal for Modin, but right now it doesn't always work for some operations or some pipelines. The reason for your slowdown and phenomenons, as you said, is the materialization of data from workers. (This happens when necessary for correct executing and wastes a lot of time.) In your pipeline, this happens on the insert operation, and if you remove it, it goes into the to_parquet operation.

The next point I would like to pay your attention to working on a Ray cluster. Modin expects it to run on the head node of the cluster, allowing it to manage resources correctly. If you use Modin in a remote function, it may cause performance slowdown, so we heigly recommend not using it. If you want to avoid executing on the local machine, you can create a head node on the remote machine and connect to it via ssh or ray submit. You can find more information about using modin in a cluster here.

Will it affect performance when there are too many CPUs?

Since using a cluster increases the slowdowns caused by data transfer to/from workers, we recommend using it if local execution is not possible (For example if data is very large). This note also applies to the number of CPUs, because if there are too many partitions, the time overhead increases. Sometimes it is better to choose fewer CPUs, which will give a better perfomance. Just try different options and find the optimal configuration for your pipeline.

yx367563 commented 21 hours ago

@Retribution98 In fact, I set the CPU of the head node to 0 on the Ray Cluster and turned on the autoscaler, so if I run it directly on the head node, I will encounter the error No KeyError CPU, because in the Modin code, the number of partitions will be determined according to the number of CPUs in the current cluster during initialization. To solve this problem, I can only convert the corresponding logic to the Worker Node. In fact, this configuration method of Ray Cluster is officially recommended and many users are using it. Maybe you can fix this problem and try to consider the scenario with Ray autoscaler and make some optimizations. In any case, thank you for your feedback and efforts!