apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
13.9k stars 3.38k forks source link

[Docs][Python] Improved Tooling/Documentation on Constructing Larger than Memory Parquet #30316

Open asfimport opened 2 years ago

asfimport commented 2 years ago

I have ~800GBs of csvs distributed across ~1200 files and a mere 32GB of RAM. My objective is to incrementally build a parquet dataset holding the collection. I can only hold a small subset of the data in memory.

Following the docs as best I could, I was able to hack together a workflow that will do what I need, but it seems overly complex. I hope my problem is not out of scope, so I would love it if there was an effort to:

1) streamline the APIs to make this more straight-forward 2) better documentation on how to approach this problem 3) out of the box CLI utilities that would do this without any effort on my part

Expanding on 3), I was imagining something like a parquet-cat, parquet-append, parquet-sample, parquet-metadata or similar that would allow interacting with these files from the terminal. As it is, they are just blobs that require additional tooling to get even the barest sense of what is within.

Reproducible example below. Happy to hear what I missed that would have made this more straight-forward. Or that would also generate the parquet metadata at the same time.

EDIT: made example generate random dataframes so it can be run directly. Was to close to my use case where I was reading files from disk


import itertools

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds

def gen_batches():
    NUM_CSV_FILES = 15
    NUM_ROWS = 25
    for _ in range(NUM_CSV_FILES):
        dataf = pd.DataFrame(np.random.randint(0, 100, size=(NUM_ROWS, 5)), columns=list("abcde"))

        # PyArrow dataset would only consume batches iterable
        for batch in pa.Table.from_pandas(dataf).to_batches():
            yield batch

batches = gen_batches()

# using the write_dataset method requires providing the schema, which is not accessible from a batch?
peek_batch = batches.__next__()
# needed to build a table to get to the schema
schema = pa.Table.from_batches([peek_batch]).schema

# consumed the first entry of the generator, rebuild it here
renew_gen_batches = itertools.chain([peek_batch], batches)

ds.write_dataset(renew_gen_batches, base_dir="parquet_dst.parquet", format="parquet", schema=schema)
# attempting write_dataset with an iterable of Tables threw: pyarrow.lib.ArrowTypeError: Could not unwrap RecordBatch from Python object of type 'pyarrow.lib.Table'

Reporter: Damien Ready

Related issues:

Note: This issue was originally created as ARROW-14781. Please see the migration documentation for further details.

asfimport commented 2 years ago

Joris Van den Bossche / @jorisvandenbossche: [~ludicrous_speed] question about your example: you have a generator that produces arrow batches. I suppose in your real use case this generator yields batches from reading the individual csv files? In case you don't know: the ds.dataset(..) also supports reading csv files, which in principle should allow you to write those to parquet as:


csv_dataset = ds.dataset("...", format="csv")
ds.write_dataset(csv_dataset, "parquet_dst.parquet", format="parquet")
asfimport commented 2 years ago

Damien Ready: Sorry, I lost a bit of the forest from the trees in my write-up.

Yes, I am generating each csv from a function that uses pandas to pull from disk, does some light munging, and then yields a batch. I was inspired by this blurb from the documentation https://arrow.apache.org/docs/python/dataset.html#writing-large-amounts-of-data which made me think this was going to be the proper way to create a full parquet collection.

What I failed to communicate is I was hoping there was something of a higher-level interface for what feels like a common interaction: big pile of data into parquet. I can already use pandas to sequentially make individual .parquet files.


df = pd.read_csv("filename.csv")
df.to_parquet("local.parquet")

My thinking was that converting each file into a parquet file was naive, and that the dataset interface would take care of some book-keeping if I informed it there was stream of data. That is, it would create the appropriate metadata files and generate N parquet blocks of standard size (eg 256 MB). In my particular use case, each csv files fits comfortably in RAM, so using the pyarrow interface directly rather than letting pandas handle the transition is more overhead.

As an aside, I just checked out the ds.dataset function and the docstring does not mention “csv” as a valid format.

asfimport commented 2 years ago

Weston Pace / @westonpace:

As an aside, I just checked out the ds.dataset function and the docstring does not mention “csv” as a valid format.

Thanks for the heads up. This is definitely an oversight. Did you look at Joris' solution:


csv_dataset = ds.dataset("...", format="csv")
ds.write_dataset(csv_dataset, "parquet_dst.parquet", format="parquet")

Does this work for you or not? It should be doing roughly the same thing as your snippet.

My thinking was that converting each file into a parquet file was naive, and that the dataset interface would take care of some book-keeping if I informed it there was stream of data. That is, it would create the appropriate metadata files and generate N parquet blocks of standard size (eg 256 MB).

I'm in the process of adding some control over this to write_dataset. Take a look at ARROW-14426 for more details. Right now I believe write_dataset will create one giant parquet file for each partition with one row group for each input file. In the future these should both be configurable.

asfimport commented 2 years ago

Weston Pace / @westonpace: I've created ARROW-14931 to add the missing format options.

asfimport commented 2 years ago

Damien Ready: Yes, quickly testing Joris' solution, it does create a parquet file from a csv.

Would be interested in getting some better control on the write_dataset functionality. In my current situation, the individual csv files are a reasonable size, but I can imagine where each file is small (say 1MB) or large (10+GBs) where I do not want to manage the splits/aggregation myself. In the event I wanted to use this new functionality (automatic evenly sized parquet files), would the above snippet I provided still be the correct approach or is there a different API that would streamline this?

asfimport commented 2 years ago

Weston Pace / @westonpace: As part of 7.0.0 I am working on ARROW-14426 / ARROW-14427 which should offer the customization you are after (assuming I understand your question correctly).

asfimport commented 2 years ago

Weston Pace / @westonpace: Ah, also ARROW-13703