uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

Make use of the new pyarrow.dataset functionality instead of ParquetDataset #613

Open jorisvandenbossche opened 3 years ago

jorisvandenbossche commented 3 years ago

In the Apache Arrow project, we have been working the last year on a new Dataset API. (original design document: https://docs.google.com/document/d/1bVhzifD38qDypnSjtf8exvpP3sSB5x_Kw9m-n66FB2c/edit, current python docs: https://arrow.apache.org/docs/python/dataset.html).

The dataset API has the goal to handle (scan/materialize) data sources larger than memory, and specifically provide:

Think: the current pyarrow.parquet.ParquetDataset functionality, but then not specific to parquet (currently also Feather ands CSV are supported), not tied to python (so for example the R bindings of arrow also use this), and with more features (better schema normalization, more partitioning schemes, predicate pushdown / row group filtering, etc).

A small illustrative example of how it can look like in python using the new generic API:

import pyarrow.dataset as ds
dataset = ds.dataset("path/to/partitioned/dataset",
                     format="parquet", partitioning="hive", filesystem=..)
table = dataset.to_table(columns=['col1', 'col2'],
                         filter=(ds.field('key2') == 1) & (ds.field('key2'== 2))

In addition, we also support using it from the existing pyarrow.parquet API with a keyword:

import pyarrow.parquet as pq
pq.ParquetDataset("path/to/partitioned/dataset", use_legacy_dataset=False)

This ParquetDataset "shim", however, does not support the full existing API. So for example, it does not support the .pieces and ParquetDatasetPiece etc, which are specifically APIs that dask is using.

Long term, we would like to have this new Datasets implementation replace the python implementation of pyarrow.parquet.ParquetDataset (the basic pyarrow.parquet.read/write_table and pyarrow.parquet.ParquetFile are normally there to stay, but the Python ParquetDataset implementation hopefully not, long term, as it is duplicating the new Datasets implementation).

Given that pyarrow.ParquetDataset eventually might go away (in its current form, we probably want to keep something similar, possibly with the same name, but probably not with an exactly identical "pieces" API), such a change would certainly have an impact on petastorm, which seems to be heavy user of the current python ParquetDataset APIs.

For that reason, I wanted to bring this up, as those changes certainly will impact petastorm, but at the same time feedback on the new APIs (whether they are useful / sufficient / ... for petastorms use cases) is very valuable.

selitvin commented 3 years ago

@jorisvandenbossche : thanks for the detailed message!

Petastorm takes accesses a single row-group in a given file (by wrapping it with a piece object: path + rowgroup-sequence-number). I am not sure it would be possible to implement current petastorm logic on top of pyarrow if this kind of low-level access is hidden from us. I'll start looking at the new API - it's a bit concerning as according to what you say, it stops being parquet specific (which is great - but maybe not so great for petastorm :) )

aperiodic commented 2 years ago

@selitvin have you come to any conclusions about the feasibility of moving Petastorm to use the new Dataset API? The new API has functionality that my team would like to make use of (specifically, filtering rows by non-partition-key columns). If it's possible to move Petastorm to the new API, I would love to contribute development effort to help that happen.

selitvin commented 2 years ago

@aperiodic , unfortunately, I did not spend time on this. Feel free to take a look. It has been awhlie since the original post. I was not tracking recent developments in pyarrow, so perhaps the original API has evolved and it would be possible to access pieces. Would be happy to support if you can do some investigation.

aperiodic commented 2 years ago

I'm going on vacation for a week starting tomorrow, but once I return, I'll start reading through Petastorm and write up a proposal to maintain its functionality with the new PyArrow dataset API.

RobindeGrootNL commented 2 years ago

@jorisvandenbossche pointed me here from a PR about exposing parquet encryption in pyarrow. Since parquet encryption is only exposed through pyarrow.dataset and not through pq.ParquetDataset (which is what petastorm currently uses), I am also very interested in petastorm implementing the new PyArrow dataset API (I want to run large scale Deep Learning on sensitive data that I want to have encrypted without a big performance overhead).

@aperiodic if you could let me know when you've had a look that would be great! Then I can see if/how I can help out here.

aperiodic commented 2 years ago

@RobindeGrootNL I've had a cursory look through, and confirmed @selitvin's initial suspicion that moving to the new API is not possible in its current state, because it does not offer the same functionality as pq.ParquetDataset. I'm currently cataloguing all the Parquet-specific functionality that Petastorm relies on, and my plan is to start a discussion with the PyArrow maintainers about a Parquet-specific extension of pyarrow.dataset that extends the new, generic Dataset API with the Parquet-specific functionality that Petastorm and other users of PyArrow need in order to read giant Parquet datasets with maximal efficiency.

I'm hoping to start that conversation with the PyArrow maintainers early next week. In the meantime, I would be grateful if you'd join the Arrow dev mailing list, so you can add your voice in support of a Parquet-specific extension when I start that conversion.

RobindeGrootNL commented 2 years ago

@aperiodic A shame that it's not possible with the current implementation, but great that you're taking the time and effort to figure out what functionality is missing so we can discuss its addition to PyArrow with the folks over there. I have signed up for the mailing list and I'm looking forward to discussing it with them and getting pyarrow.dataset to support what we need from it.

jorisvandenbossche commented 2 years ago

Note that since I opened this issue, we have added several functionalities to the pyarrow.dataset API to work with (parquet-specific) fragments (eg split fragments by row groups, expose statistics, open a dataset from a metadata file, ..). The main driver has been the needs for dask, and dask was able to completely move from the legacy ParquetDataset to the new API (for their use case). Of course, there might be still missing pieces for your use cases. That was the goal of opening this issue originally, hoping to get feedback on API requirements from petastorm.

But so I am happy answer any questions and discuss possibilities to extend the API where needed, here or on the mailing list.

RobindeGrootNL commented 2 years ago

That's good to know! Then I hope that quite some of the parquet-specific functionalities that @aperiodic is cataloguing will already have been implemented with only a few still to be added, but let's see if that comes true. Since I am not very knowledgeable about the inner workings of petastorm nor pyarrow, I'll see what @aperiodic finds and then where I can eventually help out!

aperiodic commented 2 years ago

I've catalogued all of the ways that Petastorm uses ParquetDataset, and now I'm trying to figure out equivalents in the new API. The good news is that a lot of the usage was to determine row group information or to split by row groups, which can also be done via the new Dataset API (as @jorisvandenbossche mentioned). So the major concern that @selitvin originally had with the new API has been resolved.

However, I'm a bit confused by how Petastorm uses two different kinds of metadata: "metadata" and "common metadata." @selitvin, can you explain the motivation for that? Is "metadata" typically generated by Spark while "common metadata" is created by Petastorm, so "metadata" is used when possible, and other metadata that Petastorm needs but Spark doesn't generate is put in "common metadata" (where Petastorm can put whatever it needs to without worrying that it'll be overwritten)?

selitvin commented 2 years ago

What I can recall from going through the code is that:

Bottom line: petastorm will read unischema from _common_metadata and maybe read row-group counts if is not already available from _metadata.

The code looks a bit convoluted. I think it's worth refactoring a bit before the conversion, so it's clearer what's going on. Can help with that.

RobindeGrootNL commented 2 years ago

Great to hear that the new API supports the most important aspects that petastorm relies on! Is the (common) metadata then the only remaining (potential) issue with the new API or are there more aspects that inhibit a transition to the new API? And is the metadata issue less of a problem in make_batch_reader() (which does not use any Petastorm specific metadata) than in materialize_dataset() and make_reader(). If so, it could be an idea to first focus on the make_batch_reader() function since that is likely to cover a large part of the user base of Petastorm already?

aperiodic commented 2 years ago

Thanks for the overview, @selitvin. Since common_metadata is a petastorm-specific thing, I think we'll have to wrap the new PyArrow dataset in a simple class that adds back common_metadata (and also the root path of the dataset), but that should be straightforward.

@RobindeGrootNL common metadata is one gap, but there's also no "metadata" equivalent in the new PyArrow dataset API; possibly because it's spark-specific? I'm going to spend some time today digging into what exactly Petastorm uses the .metadata attribute for, and based on result of that, either have a plan to , or ask the PyArrow maintainers about the possibility of supporting .metadata in the new API.

dmcguire81 commented 2 years ago

@aperiodic it hasn't been the default in Spark since 2.0.0, according SPARK-15719.

selitvin commented 2 years ago

Focusing on make_batch_reader is probably a good idea. I am not sure how widely the petastorm-specific extensions (ndarrays) + make_reader API are used by the community (not have a good idea how to find this out). My hunch is that the ndarrays part of petastorm is of less importance. Wonder what you guys think?

aperiodic commented 2 years ago

If we're focusing on make_batch_reader, then here's the exhaustive list of functionality we need to implement with the new API in order to get that to work:

Functionality Dependent on the Dataset Instance

Functionality Dependent on a Piece/Fragment of the Dataset

If we're okay with deferring support for rowgroup_selector, and possibly having suboptimal performance when the user passes a shuffle_row_drop_partitions value, then I think everything listed above is achievable with the new PyArrow API.

Plan for Implementation

Assuming that we are okay with the above two compromises, then I think I'm ready to get started on the actual coding. My plan is to do the migration in two parts.

The first part is to:

  1. Create internal wrapper classes for the Dataset and the Piece/Fragment, which take a flag for whether to use the new PyArrow API or the deprecated API (and raise NotImplementedError if the flag selects the new API).
  2. Add methods to the classes to cover all the functionality I listed above using the deprecated API, exactly like the code does now.
  3. Ensure that all tests still pass, and open a PR to get the new wrapper classes merged in.

The first part provides us the scaffolding to move functionality to the new API only by implementing methods in the above classes, in order to isolate the rest of the codebase from this migration, and to allow us to implement things with the new API bit-by-bit (in order to spread the work across multiple contributors).

The second part is to:

  1. Implement one or more methods in the wrapper classes using the new PyArrow API (if the caller asked for it by setting the flag to True).
  2. Add duplicates / extend the parametrization of all existing tests that exercise affected parts of the codebase to rerun with the wrappers classes' new API implementations, to verify that the implementation from the previous step has identical behavior & functionality as the implementation using the deprecated API.
  3. Open a PR with the new implementation.
  4. Repeat until all methods have implementations using the new API.

The second part can be worked on in parallel.

@selitvin, does this sound like a good plan to you? Any feedback, suggestions for improvements, or potential holes or issues that I missed?

selitvin commented 2 years ago

That's a detailed plan - cool!

I think it's unlikely that there are many users that are using rowgroup_selector and shuffle_row_drop_partitions (but I don't have a solid proof that my assumption is correct). Once we start moving with your plan, perhaps we should start issuing a deprecation message in the code and see if there is any pushback from the community.

The plan looks solid to me.

jorisvandenbossche commented 2 years ago

common metadata is one gap, but there's also no "metadata" equivalent in the new PyArrow dataset API; possibly because it's spark-specific?

@aperiodic both the metadata and common_metadata attributes are in principle small wrappers around reading a file if present.

For example, using a small partitioned dataset with a _metadata path:

>>> base_path = "test_partitioned_with_metadata"
>>> dataset = pq.ParquetDataset(base_path, use_legacy_dataset=True)
>>> dataset.metadata_path
'test_partitioned_with_metadata/_metadata'
>>> dataset.metadata
<pyarrow._parquet.FileMetaData object at 0x7f77e6df39a0>
....

and if you want to do the same manually with the new interface, this would be something like (in pseudo-code):

metadata_path = base_path + "_metadata"
if exists(metadata_path):
    metadata = pq.read_metadata(metadata_path)

I think the main thing that the new interface does not really do is keeping track of the "basepath" from where you can easily get the (common) metadata path (so that's something you will have to do yourself at the moment to store the path specified by the user).

For now, we didn't add this to the new interface, because those two attributes were also not really used internally anyway. Note that it is possible to get the Parquet FileMetadata for each file / piece in the dataset with the new interface as well:

>>> dataset = pq.ParquetDataset("test_partitioned_with_metadata", use_legacy_dataset=False)
>>> dataset.fragments[0]
<pyarrow.dataset.ParquetFileFragment path=test_partitioned_with_metadata/part=A/part.0.parquet partition=[part=A]>
>>> dataset.fragments[0].metadata
Out[40]: 
<pyarrow._parquet.FileMetaData object at 0x7f77e6e9fbd0>
...

And thus you have access to the number of row groups etc this way.

aperiodic commented 2 years ago

Oh good, I didn't notice that there's still functionality for reading metadata files. That clears up most of my remaining concerns about moving over to the new API. Thanks for the tips, @jorisvandenbossche!

I think it's unlikely that there are many users that are using rowgroup_selector and shuffle_row_drop_partitions (but I don't have a solid proof that my assumption is correct). Once we start moving with your plan, perhaps we should start issuing a deprecation message in the code and see if there is any pushback from the community.

That sounds reasonable to me, at least initially. Once there is a way to use the new PyArrow API for make_batch_reader, we can get some feedback from users and decide whether it makes more sense to focus on moving everything else over to the new API or to backfill support for those two other arguments of make_batch_reader when the new API is being used.

The plan looks solid to me.

Great! I'll get started on the wrapper class this week, and will hopefully have a PR up for that sometime next week.

aperiodic commented 2 years ago

I've got a PR pretty much ready to go to add the wrapper class, but I just learned today that there is a new review process for OSS submissions that I have to put this through first. That could add as much a month, based on historical review times. Hopefully it'll go much faster than that because this is pretty small and clearly unrelated to our IP, but I'll keep y'all updated.

selitvin commented 2 years ago

@aperiodic : do you have any updates on this? Would you be able to publish a PR?

RobindeGrootNL commented 1 year ago

@aperiodic any updates on the OSS submissions process? It would be great if this could be implemented, both for ending the warning about deprecation and for the new features it allows petastorm to have (e.g. columnar encryption).