scikit-hep / awkward

Manipulate JSON-like data with NumPy-like idioms.
https://awkward-array.org
BSD 3-Clause "New" or "Revised" License
830 stars 85 forks source link

Use `pyarrow.dataset.parquet_dataset` in `ak.from_parquet` #886

Closed martindurant closed 2 years ago

martindurant commented 3 years ago

Unfortunately, pyarrow.parquet.ParquetFile does not support remote filesystems. You could use a construction like

with fsspec.open(url, 'rb', **storage_options) as f:
    q = pq.ParquetFile(f)

for the _metadata case or

pq.ParquetDataset(paths, filesystem=fs, **kwargs)

for the list-of-files case; this is what dask "legacy" does.

However, it seems that arrow will be using the new API only, so

q = pyarrow.dataset.parquet_dataset("s3://pivarski-princeton/millionsongs/_metadata", filesystem=fs)  # note the full path to metadata

where the returned object has a .schema and fragments list(q.get_fragments()) instead of the former pieces.

agoose77 commented 3 years ago

I've taken a look at this, and it's a bit more fiddly than I first anticipated. I'm keeping some notes here, in case anyone gets a chance to work on it before I do.

Awkward already does some boilerplate like finding the _metadata file for datasets, and in order to maintain the same interface we're going to need to do the same thing.

So I think the way we need to tackle this is:

Here's a hacky implementation https://github.com/scikit-hep/awkward-1.0/pull/949

martindurant commented 3 years ago

ignores the order of the row groups

This is surprising. I am assuming dask handles this already, might be a template.

Handling URIs and filepaths will be a logistical challenge

Let fsspec do t for you?

fsspec expects POSIX paths

For local files, windows paths work fine. Yes, they may be converted to posix internally, but you shouldn't have to worry about that.

dataset is a very general API, and if we're not careful we'll have lots of tricky cases to handle.

I'd love to know what you mean, what the downsides are/might be.

Define default filesystem

let fsspec do it? If you're going to be doing other file things elsewhere in the project (i.e., where you don't explicitly require arrow), this seems like a good idea. Also, fsspe's local filesystem is guaranteed to be compatible with the other filesystems for s3, gcs..., that it will give you.

  • other

I would make a minimal file-like check, error otherwise. Probably more hasattr(obj, "read") than isinistance(obj, os.IOBase).

martindurant commented 3 years ago

(sorry, misread - that was "other" for path-like things, not file-like. Be aware that people will want to pass pathlib.Path objects at some point. fsspec is OK with this)

agoose77 commented 3 years ago

So, the issue is whether we rely on fsspec. Arrow has a superset of file systems (which do overlap with fsspec), so this is a design choice to be made.

We also need to normalize the paths because we need to combine them, and fsspec / arrow don't implement a join API.

The TLDR is that this is a bit messy because of the need to support the existing API. I also stumbled across a few bugs when running the test suite which make me think this is not going to be as easy as I had hoped.

martindurant commented 3 years ago

We also need to normalize the paths because we need to combine them, and fsspec / arrow don't implement a join API.

Easy to do! fsspec is at your service for this kind of thing. We release all the time.

Arrow has a superset of file systems (which do overlap with fsspec),

The other way around surely. Arrow only supports local, s3 and hdfs.

agoose77 commented 3 years ago

Arrow implements a wrapper over fsspec's file systems, so technically a superset ;) We have a dependency on Arrow because we have to, but I'm not sure whether we can depend upon fsspec internally vs support it via Arrow's wrapper file system. I think currently we're going via the latter route.

martindurant commented 3 years ago

fsspec has no dependencies of its own, so it a very light dependency. It would be a natural place to add things you might need, like join. It already provides tools for URL inference and such that will be important. Dask uses it for this reason (and dask will, of course, be an important engine for awkward).

I am surprised that you explicitly depend on arrow. I understand there is overlap between awkward and arrow and that to/from conversions will be important to many, but I would still have thought is optional.

agoose77 commented 3 years ago

fsspec has no dependencies of its own, so it a very light dependency. It would be a natural place to add things you might need, like join. It already provides tools for URL inference and such that will be important. Dask uses it for this reason (and dask will, of course, be an important engine for awkward).

I am surprised that you explicitly depend on arrow. I understand there is overlap between awkward and arrow and that to/from conversions will be important to many, but I would still have thought is optional.

Sorry, let me be more clear! We have a soft (runtime) dependency upon Arrow in order to implement Arrow/Parquet reading, but it's not required to install.

Clearly we need to make some decisions here, and I'm not in the right headspace for that today! I'll take a look back here at some point this/next week. Thanks for the input so far!

jpivarski commented 3 years ago

Currently, Awkward Array's only required dependency is NumPy (1.13.1+). 90% of its functionality or more depends on NumPy—clearly, that needs to be a strict dependency.

The second most relevant library in terms of Awkward Array functionality is Numba: there's quite a lot of code for lowering array iteration in Numba. However, Numba is not a strict dependency: Awkward's Numba extensions are loaded through Numba's entry point if and when Numba is imported. It's probably still true that the majority of Awkward Array users don't use Numba, though I encourage it a lot.

The third most relevant library is probably pyarrow, though it only gets invoked in ak.from_arrow, ak.to_arrow, ak.to_arrow_table, ak.from_parquet, and ak.to_parquet. pyarrow is also not a strict dependency, and we only attempt to import it when somebody calls one of those functions. Although it's a soft dependency, it's a versioned soft dependency: Arrow is developing rapidly, and we require some of its newest features: the minimum is pyarrow 2.0.0.

There are a number of other libraries that are used in very specific functions, if they're available (e.g. ak.to_pandas requires Pandas). They're all soft dependencies, with an import triggered by the function call, and a message explaining how to install it with pip or conda if it's not there.

Using fsspec to get access to lots of remote file systems would be great, though I only see it coming in through ak.from_parquet, and then there are cases in which it is not needed: local files, S3, and HDFS. When should we invoke the "You can do this, but please install fsspec" error? On any call of ak.from_parquet? The majority of users of this function (right now) are probably using local files. On any non-local file? Or on anything that's not local, S3, or HDFS? It could be coarse or fine-grained.

For the user experience, there's an advantage to grouping dependencies to ask for them all at once (coarse), rather than the "death by a thousand cuts" of install this, get a little further, install that, get a little further, etc. We don't have that problem yet because Awkward's soft dependencies are in different functions: ak.to_pandas needs Pandas and ak.from_arrow needs Arrow, but neither of these functions need both. If ak.from_parquet sometimes needs both, maybe it should always need both, so that nobody's workflow gets interrupted twice by the same ak.* function call. This would be like Uproot's soft dependency on lz4, which also requires xxhash because all ROOT files with an LZ4-compressed block also have an xxhash checksum to check. The error message asking users to install lz4 asks them to install xxhash at the same time, saving an extra step.

Since fsspec has no dependencies, I'm inclined to do that for the ak.from_parquet/ak.to_parquet pair (likely used together, so the same rule should apply for simplicity). Calling either one of these functions should require pyarrow 2.0+ and fsspec at the same time, and the error message should say how to import them both with pip or import them both with conda.

However, ak.from_arrow, ak.to_arrow, and ak.to_arrow_table really don't need fsspec and shouldn't be asking for it. Currently, these and the ak.from_parquet/ak.to_parquet pair all use the same helper function, so that would have to be refactored with an argument (e.g. require_fsspec=False). Awkward Array users who only read and write local files don't, strictly speaking, need fsspec, but we should ask for it anyway to keep the soft dependencies from getting too fine-grained.

This could be elevated to a general rule: any ak.* function that might require any of a set of third-party packages must attempt to import (with "how to install" on failure) all of those third-party packages at the beginning of the function, before any other error-handling. This case is the first that satisfies the rule. I might be over-eager in defining this rule, since we might find before this PR is done that s3fs is only needed if the filesystem is S3, rados is only needed if the filesystem is Ceph, etc., and then ak.from_parquet would (according to this rule) require a whole lot of third-party packages, most of them never used by a given user. If that's the case, I'll back down. On the other hand, if these libraries (s3fs, rados, etc.) are required at the time that the filesystem object is created by fsspec, not at the time that it's used by pyarrow to read the file, then that would be ideal: it would communicate the fact that such-and-such a library is needed because of the requested filesystem, not because of ak.from_parquet.

martindurant commented 3 years ago

I all depends on whether you want fsspec to handle your URLs for you and generate filesystem objects. If you need changes in the thrist party dependency, you will find fsspec much faster to respond than arrow. I would argue that the future of awkward has to be in great part in the cloud, with data on cloud servers or various flavours. I realise that's not as relevant to actual HEP, who likely like HPC systems and mounted disks, but I really want to see awkward as the de-facto nested processing engine for python. But you might want to load root files remotely too, no?

(by the way, all users are working with local files right now, because that's the only thing that works! :) )

drahnreb commented 3 years ago

I would argue that the future of awkward has to be in great part in the cloud, with data on cloud servers or various flavours. […] I really want to see awkward as the de-facto nested processing engine for python.

Agree. If there is a way awkward can get fsspec compliant I would love to see this. I am using Azure and support for it is limited right now.

Maybe this could be a small step towards the concept of a dask integration that @jpivarski presented in this talk: An Awkward Dask Collection?

Eye gazing this issue and recent changes in #935 etc. I‘m eager to help.

martindurant commented 3 years ago

Certainly, dask integration is coming! It will take some planning and development. Splitting awkward processing tasks into daskable chunks and accessing remote filesystems are separate concerns, but they will, of course, play well together.

jpivarski commented 3 years ago

I'd be fine with saying that every Awkward function that might take a URL should require fsspec. I think the only such functions that might take URLs right now are ak.from_parquet and ak.to_parquet (assuming we can write to URLs).

The ak.from_json function can take a filename, though it's a bad API because the same string can also be a string of JSON text. The C++ layer that this is based on is being rewritten in Python, which would make ak.from_json(data) exactly equivalent to ak.from_iter(json.loads(data)) or ak.from_iter(json.load(open(data))), in which case, we can deprecate ak.from_json/ak.to_json with a stub telling people how to write that one-liner.

Other than that, no other functions deal with files. Zarr integration is planned. That would be a second case, and I suppose fsspec would be needed for that, too.

martindurant commented 3 years ago

Further down the line, I would suggest also considering avro, xml and maybe other nested-schema formats, as well as line-serialised arrow. I'm not sure how much of that could be directly deferred to arrow, we will see.

Zarr integration only for >=v3, right?

jpivarski commented 3 years ago

For Avro, I can generate AwkwardForth that's considerably faster than the fastavro library (https://arxiv.org/abs/2102.13516). Awkward Array can become the fastest way to read Avro data without an extra compilation step. I have some background in Avro and could write the Avro schema → AwkwardForth function, though that's a matter of time and would probably have to be motivated by a use-case.

I think line-serialized Arrow should go through pyarrow. (Don't we already get it for free? Isn't there a pyarrow function to load line-serialized Arrow as a pyarrow.array?)

I've talked about Zarr v3 because we have a chance of adding a "must be read by Awkward" flag to the format as some kind of extension. I know that it's possible to serialize into and out of v2 now (using ak.from_buffers/ak.to_buffers), but it's the issue of forcing the collection of buffers to be interpreted as an ak.Array that I'm thinking of. The same could be said of HDF5; there's no flag saying that the group of 1-dimensional datasets is a low-level view that needs to be interpreted in a particular way. I haven't made any steps toward influencing the Zarr v3 specification, other than talking about it, though.

martindurant commented 3 years ago

Awkward Array can become the fastest way to read Avro data

Yes, what I was thinking exactly. Fastavro is very dict-oriented, so no good for this. My library uavro might also be a good starting point, for being more python-oriented (but completely ignoring nesting!). The avro format is actually quite nice to work with, so long as the in-memory buffers can grow as the data is read.

adding a "must be read by Awkward"

Agreed, I thought this was still the plan, and it sounds good - we can be in a position to add our extension when v3 comes out.

Don't we already get it for free (arrow serialisation)

Probably yes, but I imagine we'd want to at least make an explicit example of this.

martindurant commented 3 years ago

(btw: fastparquet contains a new very fast thrift reader that would also be a good starting point for avro, since the two are so similar. That code makes nested python dicts, but it's all in the same field roughly. I don't think we care particularly about thrift-serialised data, but we could cover all avro/thrift/protobuf/msgpack for only a little extra effort)

deeplook commented 3 years ago

This is the only ticket I see mentioning protobuf. What is the current/expected state of support for such formats, especially protobuf?

jpivarski commented 3 years ago

@deeplook I wasn't planning on ak.from_protobuf or ak.to_protobuf, but that would be a good application of AwkwardForth (since it's not a columnar format). It's not on any road maps but can be if there's a strong enough use-case for it. The place to start would be to file a feature request.

deeplook commented 3 years ago

I'm not familiar with AwkwardForth, but I'm interested in operating on nested structures in ways like flatten-dict, but on binary formats, especially protobuf, with some NumPy-like interface and the performance promised by Awkward. ;)

martindurant commented 3 years ago

Right, this thread seems to have meandered quite far from the original intent! I do agree that we should have in mind a list of specific IO that the project is interested in, to go into an explicit issue discussing this. For consideration:

jpivarski commented 2 years ago

In v2, ak.from_parquet uses fsspec (though not Dataset, for reasons explained there): #1338.