fsspec / s3fs

S3 Filesystem
http://s3fs.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
881 stars 272 forks source link

Question: Using dask/s3fs with non-standard iterator #289

Open MatinF opened 4 years ago

MatinF commented 4 years ago

Hi all,

We're hoping to use dask/s3fs with the below use case: 1) We have many large binary data files stored on S3, which we hope to process 2) Our aim is to load parts of the data into dask dataframes using s3fs and an iterator

We're able to get our iterator to work when we store the non-standard files locally. But it seems like the ability of dask/s3fs to load data from s3 easily is limited to csv, parquet and json.

Is there a way to use a custom iterator function to load non-standard binary data stored on s3 into a dask dataframe using s3fs? Any examples/guidance would be highly appreciated.

Martin

martindurant commented 4 years ago

s3fs allows you to read specific byte rages from the target file, which is probably all that you really need to use for an iterator. Builtin text-mode, for instance, can already iterate by line terminator or any other character (and this is what the CSV and json loaders use; although encoding can foul his logic). The other example of parquet does not iterate over the file at all, but loads a set of bytes offsets out of the file metadata in the footer.

In theory, s3fs files aspire to be compatible to the local file experience, so anything missing could be considered a bug. If your function accepts a python file-like object, it ought to work with an s3fs file. However, in practice, you will need to consider the implications of caching bytes in memory, making random-access much more complicated or at least inefficient.

MatinF commented 4 years ago

Thanks for the input Martin. I've tried to replicate a very minimal exemplification of what we're doing locally, vs. what we're trying via s3fs:

Locally: This local example uses our iterator to open a csv file

data_path = Path(__file__).parent / "data.csv"
iterator = create_CAN_iterator_from_object(data_path)
b = db.from_sequence(iterator)

Via s3fs: This public bucket example attempts the same via s3fs illustratively

fs = s3fs.S3FileSystem(anon=True)
with fs.open("general-editor-schemas/data.csv", "rb") as f:
        df = pd.read_csv(f)     # note: this works
        iterator = create_CAN_iterator_from_object(f) 
        b = db.from_sequence(iterator)  

In the above example, we've taken outset in a csv file rather than a non-standard, purely for illustration purposes. For the real use case we'd replace this with a binary file and a matching iterator.

My expectation would be that we'd need something similar to the parquet solution. Our hope was that there would be a native way via s3fs to "load" the file into the iterator and then into dask - similar to what we can achieve locally. However, in the above example we get the following error:

Traceback (most recent call last):
  File "C:\Users\marti\Documents\dashboard\main.py", line 138, in <module>
    main()
  File "C:\Users\marti\Documents\dashboard\main.py", line 36, in main
    b = db.from_sequence(iterator)
  File "C:\Users\marti\AppData\Local\Programs\Python\Python37\lib\site-packages\dask\bag\core.py", line 1755, in from_sequence
    seq = list(seq)
TypeError: 'NoneType' object is not iterable

I'd be really happy for any thoughts on the above - e.g. any examples or similar that we can check out.

Best, Martin

MatinF commented 4 years ago

To enable replication, I've added a minimal working example below:

basic_example.zip

I realize the warning I get is that the data fetched via s3fs is NoneType, though I'm unsure if this reflects some silly mistake on my end - or if it reflects that performing this type of operation via s3fs requires a more extensive implementation.

Martin

martindurant commented 4 years ago

In your function below, you only create the iterator for Path objects (with an explicit use of builtin open at self._handle = open(next(self.files), "r") in CAN_CSV_Iterator), otherwise return None. S3FileSystem.open produces a file-like object directly, not a Path.

def create_CAN_iterator_from_object(argument):
    from pathlib import Path

    result = None

    # If the argument is a path, treat it as a file.
    if isinstance(argument, Path):
        # Check the file ending.
        if argument.suffix.lower() == ".csv":
            from CAN_CSV_Iterator import CAN_CSV_Iterator
            result = CAN_CSV_Iterator(argument)
        pass
    return result
MatinF commented 4 years ago

Thanks Martin, I'll give this a further look with the team.

Interestingly, one of the next things I was going to test was to use an existing iterator from the asammdf project to see if that would work natively with s3fs - and it seems to be the case:

import dask.bag as db
import s3fs
from asammdf import MDF

# local example MDF4 (works)
iterator = MDF("data.mf4").iter_to_dataframe()
b = db.from_sequence(iterator)

# s3fs example MDF4 (works)
fs = s3fs.S3FileSystem(anon=True)
with fs.open("general-editor-schemas/data.mf4", "rb") as f:
    iterator = MDF(f).iter_to_dataframe(f, chunk_ram_size=2097152)
    b = db.from_sequence(iterator)

As such, it looks like it is doable directly with an iterator and the non-standard format we're looking to use in the end (MDF4). It's currently running fairly slow, but we'll look further into that.

Thanks for your inputs on this!

martindurant commented 4 years ago

Right, it should work wherever the function expects a file-like object.

running fairly slow

Performance will depend on the behaviour of the cache, and usually be quite bad for true random access, which would require many fetches from the remote API.