modin-project / modin

Modin: Scale your Pandas workflows by changing a single line of code
http://modin.readthedocs.io
Apache License 2.0
9.75k stars 651 forks source link

to_parquet() needs option of how many files to create, or like rays implementation: num_rows_per_file #7362

Open Liquidmasl opened 1 month ago

Liquidmasl commented 1 month ago

Currently loading a .parquet folder with mutliple .parquet files is kinda broken with read_parquet()

But generally is it harder for ray to manage less big files, then many small files, as the RAM requirement goes up.

So loading a large dataset can only be done by loading small batches and concatenating them. https://github.com/modin-project/modin/issues/6639#issuecomment-2209027785 This is all fine and well

BUT the current to_parquet() implementation does not support choosing how many parts, or how big those parts of saved df might be. It is (as it seams) just the number of logical processors. Also repartitioning before writing to parquet does not change this.

So while I can load a large dataset a million lines at a time, and concatenating the partial dataframes, i can (on my pc) just save it as 20 parts in the parquet. Those parts are then to big to load again, on the same machine.

Rays implementation of write_parquet gives the option to suggest the lines per file: https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.write_parquet.html

This would give more control over the size of parquet partitions, and maybe even enable loading the same data again. (as soon as read_parquet()) works again

Retribution98 commented 1 month ago

Hi @Liquidmasl

The number of files in the output depends on the number of partitions in the DataFrame.

If you want to customize the number of files, you need to change NPartitions and call _repartition by axis=0 on the DataFrame:

import modin.config as cfg

with cfg.context(NPartitions=1):
    df = df._repartition(axis=0)

df.to_parquet(...)

Be careful, _repartition requires loading the full DataFrame into RAM. If you do not have enough memory, you can customize the NPartitions before creating the DataFrame:

import modin.config as cfg
import modin.pandas as pd

cfg.NPartitions.put(1)

df = pd.DataFrame(...)
df.to_parquet(...)
Liquidmasl commented 1 month ago

Hi @Liquidmasl

The number of files in the output depends on the number of partitions in the DataFrame.

If you want to customize the number of files, you need to change NPartitions and call _repartition by axis=0 on the DataFrame:

import modin.config as cfg

with cfg.context(NPartitions=1):
    df = df._repartition(axis=0)

df.to_parquet(...)

Be careful, _repartition requires loading the full DataFrame into RAM. If you do not have enough memory, you can customize the NPartitions before creating the DataFrame:

import modin.config as cfg
import modin.pandas as pd

cfg.NPartitions.put(1)

df = pd.DataFrame(...)
df.to_parquet(...)

Hi there, good morning and thank you for your numerous responses to my cry for helps. Also, sorry for them

I did try to change number of partitions, and repartition before saving, without success. But between all I have tried I might have made a mistake. I will try again.

Although I still think some separate logic here would be nice. It might not be practical to use so many partitions while processing a dataset, but still necessary to have small .parquet parts for transferring data, or further processing them on less powerful machines. (In case repartitioning does not work because of RAM limitations) Rays documentation states that the row_per_file parameter is a suggestion that might be over or undershot. Maybe an approach for modin similar to this would be neat. This logic could still just apply to each partition separately, so to not hurt parallelism. If a partition is out of points for the last .parquet part of a partition it could just have less points.