fsspec / s3fs

S3 Filesystem
http://s3fs.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
877 stars 272 forks source link

Support for S3 Select #155

Open ytsaig opened 5 years ago

ytsaig commented 5 years ago

I'm not sure if this question / feature request belongs here or elsewhere in the dask ecosystem, so apologies if this is not the right place. I was wondering whether there are plans to support S3 Select for server-side filtering of data.

An example use case is reading a subset of columns/rows from a large parquet file. I haven't tested it, but can imagine that server-side filtering would improve performance substantially since it avoid transferring the full data over the network prior to filtering.

martindurant commented 5 years ago

This is certainly something that could live in s3fs, either as its own method, or as optional arguments to open() - I believe it should also return a file-like object. The doc sounds a little complicated, so I'd be relying on your implementation, and I doubt if moto, the testing/mocking library, supports it - so it would have to be an untested (in the sense of CI) feature.

If you were hoping to call this from Dask, that might be a little trickier, I don't know how you'd go about passing the arguments through from a call to something like dd.read_csv through to whichever method you implement on s3fs.

dvukolov commented 5 years ago

S3 Select currently supports Parquet as input, but not as output. The data would have to go through an intermediary format (either CSV or JSON), respective serialization/deserialization and loss of information about data types in the process. I am not sure whether such logic is permissible for a filesystem layer.

An alternative to S3 Select in a special case of reading a subset of columns from a large Parquet file is to minimize the amount of reading libraries do, for example:

import pyarrow.parquet as pq
import s3fs

s3 = s3fs.S3FileSystem(default_fill_cache=False, default_block_size=1)
dataset = pq.ParquetDataset("{bucket}/{path}", filesystem=s3, validate_schema=False)
df = dataset.read(columns=columns).to_pandas()

This may not be as performant as S3 Select, but still tries to take advantage of the fact that Parquet is a columnar store, namely:

One may experiment with those options to see how they affect timing and data transfer for a particular file.

martindurant commented 5 years ago

@dvukolov , your method sounds reasonable.

I still don't have any principled objection to someone providing a SELECT endpoint via s3fs, but indeed I know little about the performance or serialisation issue. The output, the thing actually being accessed would have to behave like a file. If using parquet, I imagine it is useful for prefiltering on rows, when wanting to download only a small fraction of the original.

dvukolov commented 5 years ago

Implementing S3 Select as a separate method would look something like this:

def select(self, path, expression, input_format, output_format):
    response = self.s3.select_object_content(...)

    f = io.BytesIO()
    for event in response["Payload"]:
        if "Records" in event:
            data = event["Records"]["Payload"]
            f.write(data)
        elif "End" in event:
            f.seek(0)
            return f

An alternative would be to take a BytesIO() object as an argument, rather than create it inside the method:

def select(self, buf, path, expression, input_format, output_format):
    ...

The second option makes a lot of sense as it allows:

If this is something you are willing to consider, I can provide a draft implementation.

manugarri commented 5 years ago

Adding my 2 cents.

In my company we use gzip jsonlines as the input. Then we use dask for transformations and export to parquet.

Usually when we do ETL we need to filter by fields (that can be nested), using S3 Select with dask would reduce our processing time/cost significantly.

Is there any idea of how we could implement this? I see that s3fs follows fsspec interfaces. This means the get method is actually defined in fsspec. Interestingly, this method suports args and kwargs (which would be great for this use case) but do not use them.