apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.29k stars 3.47k forks source link

[Python][C++] How to limit the memory consumption of to_batches() #33759

Open Treize44 opened 1 year ago

Treize44 commented 1 year ago

In order to get the unique values of a column of a 500GB Parquet dataset (made of 13 000 fragments) on a computer with 12GB of memory, I chose to use to_batches() as following :

import pyarrow as pa import pyarrow.dataset as ds

partitioning = ds.partitioning( pa.schema([(timestamp, pa.timestamp("us"))]),flavor="hive",) unique_values = set() dataset = ds.dataset(path, format="parquet", partitioning=partitioning) batch_it = dataset to_batches(columns=[column_name]) for batch in batch_it: unique_values.update(batch.column(column_name).unique())

The problem is that the process quickly accumulates memory and exceeds the amount available. When I put a breakpoint on the line "for batch in batch_it", the process continues to accumulate memory until it crashes.

I understand that to_batches readahead but I thought I could limit it with "fragment_readahead" parameter. Is there a way to limit readahead ? Is there a way to "free" memory after a batch has been consumed ? Is there another way to go ? My first try was using to_table() but it needs 20GB of memory in that case. It seems that to_batches would also need 20GB

Component(s)

Python

westonpace commented 1 year ago

Which version of pyarrow are you using?

Treize44 commented 1 year ago

I use pyarrow 10.0.1

westonpace commented 1 year ago

Hmm, backpressure should be applied then. Once you call to_batches it should start to read in the background. Eventually, at a certain point, it should stop reading because too much data has accumulated. This is normally around a few GB. You mention there are 13k fragments, just to confirm this is 13k files right? How large is each file? How many row groups are in each file?

Treize44 commented 1 year ago

Yes, there are 12 918 fragments to be precise. Each file contains 460 rows. I can provide the code to generate a comparable dataset

westonpace commented 1 year ago

Code to generate a dataset would be helpful. We likely need some refinement on memory usage (and monitoring) in the execution engine in general but I don't know that I will be able to get to it soon.

Treize44 commented 1 year ago

While writing code to write and read the dataset, I have noticed that my problem is linked to the size of the metadata of the schema ; is there a limit ? (see second line of write_fragment())

Here is my code :

import json from typing import Any

import numpy as np import pandas as pd import pyarrow as pa import pyarrow.dataset as ds

DATASET_HEADER = "dataset_header" PARQUET_FLAVOR = "hive" PARQUET_FORMAT = "parquet" TIME_COLUMN = "gps_time_stamp" DEFAULT_PARTITIONING: ds.Partitioning = ds.partitioning( pa.schema([(TIME_COLUMN, pa.timestamp("us"))]), flavor=PARQUET_FLAVOR, )

dataset_path = "/tmp/dataset" nb_data_values = 60000

def generate_dataframe(): nb_rows = 400 data = np.float32(np.random.normal(size=nb_data_values)) return pd.DataFrame([{"ATTRIBUTE": i, "data": data} for i in range(nb_rows)])

def write_fragment(df, path): dataset_header: dict[str, Any] = {} dataset_header["sampling"] = np.arange(0, nb_data_values, 1).tolist() # if removed, the problem disappears schema_with_metadata = pa.Schema.from_pandas(df).with_metadata({DATASET_HEADER: json.dumps(dataset_header)}) ds.write_dataset( data=pa.Table.from_pandas(df).cast(schema_with_metadata), base_dir=path, format=PARQUET_FORMAT, partitioning=DEFAULT_PARTITIONING, existing_data_behavior="overwrite_or_ignore", file_options=ds.ParquetFileFormat().make_write_options(allow_truncated_timestamps=True), )

def write(): trace_length_pd = pd.Timedelta(milliseconds=60000) first_timestamp = pd.Timestamp("2023-01-23 09:15:00.000000") nb_timestamps = 13000 for i in range(nb_timestamps): df = generate_dataframe() df[TIME_COLUMN] = pd.Timestamp(first_timestamp + i * trace_length_pd) print("Generating data for timestamp ", i) write_fragment(df, dataset_path)

def read(): dataset = ds.dataset(dataset_path, format="parquet", partitioning=DEFAULT_PARTITIONING) unique_values = set() for batch in dataset.to_batches(columns=[TIME_COLUMN]): unique_values.update(batch.column(TIME_COLUMN).unique().to_numpy()) print(len(unique_values))

write() read()

Treize44 commented 1 year ago

I think the problem is on my side : I divert the use of metadata from the schema of each fragment to put a large amount of data (around 400kB). I have centralised the metadata in a _metadata file (like in https://arrow.apache.org/docs/python/parquet.html#writing-metadata-and-common-metadata-files) and the problem disapears.

westonpace commented 1 year ago

Thanks for the explanation. I do think we cache the metadata per-file when opening a dataset. The original thought was that a user might open a dataset and then scan it multiple times. If we cache the data the first time then we can save time on future reads. However, if you have large metadata and many files then I think that becomes problematic. I will open a separate issue for that.

westonpace commented 1 year ago

I've opened #33888 to address this metadata caching problem.

rando-brando commented 1 year ago

I wanted to second this issue as I am having the same problem. In my case the problem stems from the python package deltalake which uses the arrow format. We use deltalake to read from Delta with arrow because Spark is less performant in many cases. However, when trying dataset.to_batches() it appears that all available memory is quickly consumed even if the dataset is not very large (e.g. 100M rows x 50 cols). I have reviewed the documentation and its not clear what I can do to resolve the issue in its current state. Any suggestions workarounds would be much appreciated. We are using pyarrow==10.0.1 and deltalake==0.6.3.

rando-brando commented 1 year ago

Also for reference #33624 is the same issue where a 54MB small file results in GBs of memory usage.

westonpace commented 1 year ago

I wanted to second this issue as I am having the same problem. In my case the problem stems from the python package deltalake which uses the arrow format. We use deltalake to read from Delta with arrow because Spark is less performant in many cases. However, when trying dataset.to_batches() it appears that all available memory is quickly consumed even if the dataset is not very large (e.g. 100M rows x 50 cols). I have reviewed the documentation and its not clear what I can do to resolve the issue in its current state. Any suggestions workarounds would be much appreciated. We are using pyarrow==10.0.1 and deltalake==0.6.3.

Do you also have many files with large amounts of metadata? If you do not then I suspect it is unrelated to this issue. I'd like to avoid umbrella issues of "sometimes some queries use more RAM than expected".

33624 is (as much as I can tell) referring to I/O bandwidth and not total RAM usage. So it also sounds like a different situation. Perhaps you can open your own issue with some details about the dataset you are trying to read (how many files? What RAM consumption are you expecting? What RAM consumption are you seeing?)

rando-brando commented 1 year ago

I wanted to second this issue as I am having the same problem. In my case the problem stems from the python package deltalake which uses the arrow format. We use deltalake to read from Delta with arrow because Spark is less performant in many cases. However, when trying dataset.to_batches() it appears that all available memory is quickly consumed even if the dataset is not very large (e.g. 100M rows x 50 cols). I have reviewed the documentation and its not clear what I can do to resolve the issue in its current state. Any suggestions workarounds would be much appreciated. We are using pyarrow==10.0.1 and deltalake==0.6.3.

Do you also have many files with large amounts of metadata? If you do not then I suspect it is unrelated to this issue. I'd like to avoid umbrella issues of "sometimes some queries use more RAM than expected".

33624 is (as much as I can tell) referring to I/O bandwidth and not total RAM usage. So it also sounds like a different situation. Perhaps you can open your own issue with some details about the dataset you are trying to read (how many files? What RAM consumption are you expecting? What RAM consumption are you seeing?)

I thought it was likely related as both issues are caused when using ‘to_batches()’ on small data with the difference being I am reading directly from a mounted disk and the OP is reading over the network. If the scanner is the cause as some comments have suggested both our issues would be resolved by a fix.

westonpace commented 1 year ago

I thought it was likely related as both issues are caused when using ‘to_batches()’ on small data with the difference being I am reading directly from a mounted disk and the OP is reading over the network. If the scanner is the cause as some comments have suggested both our issues would be resolved by a fix.

OP's issue has been identified and they have found a workaround (don't store full metadata in each file) and we have identified a long term fix (#33888). That problem and fix do not have anything to do with #33624. In #33624 the total data transferred is larger than the on-disk size of the data. This would not be caused by arrow retaining metadata in RAM.

rando-brando commented 1 year ago

Ok thank you for redirecting me to the appropriate issue number.