uber / petastorm

Petastorm library enables single machine or distributed training and evaluation of deep learning models from datasets in Apache Parquet format. It supports ML frameworks such as Tensorflow, Pytorch, and PySpark and can be used from pure Python code.
Apache License 2.0
1.78k stars 285 forks source link

Support for Azure Blob Storage and Azure Data Lake #571

Open upendrarv opened 4 years ago

upendrarv commented 4 years ago

Hi,

Any plans to add support for Azure Blob storage and/or Azure Data Lake ?

Thanks for the great work !

jsgoller1 commented 4 years ago

Hi there @upendrarv, thanks for opening this issue. Can you clarify specifically what you're asking?

selitvin commented 4 years ago

This is not something we were planning to work on at the moment.

For this to work, we would need a proper Azure blob storage support from pyarrow. I am not sure what is arrow's progress on having a proper support of these Azure filesystems.The only thing I see in their documentation is this. It might be possible to do something using that technique in petastorm, but might get hairy as we do use pyarrow's ParquetDataset class.

upendrarv commented 4 years ago

@jsgoller1 : Currently I am looking to refer the PetaStorm datasets stored in Azure Data Lake Storage and Azure Blob Storage.. Currently there is a support for S3 and GCS (Case 5 and Case 6 in [fs_util.py].(https://github.com/uber/petastorm/blob/c370bac366e86ca07c051cbd0daffacf866ecde1/petastorm/fs_utils.py)). Similarly I am looking for support for Azure Blob File System.

@selitvin : Thanks for the reply. In Python, adlfs package available. We can use adlfs to access the Azure Blob File System instead of taking dependency on pyarrow. I have created a prototype code to access the petastorm datasets present in Azure Blob File System. With below piece of code, I am able to read the dataset and train my ML model.

Below is the sample code:

import os
import posixpath

from adlfs import AzureBlobFileSystem
from petastorm.reader import Reader
from pyarrow.filesystem import FileSystem, DaskFileSystem
from pyarrow.util import implements, _stringify_path

class ABFSWrapper(DaskFileSystem):
    @implements(FileSystem.isdir)
    def isdir(self, path):
        path = _sanitize_azure_path(_stringify_path(path))
        try:
            contents = self.fs.ls(path)
            if len(contents) == 1 and contents[0] == path:
                return False
            else:
                if not any(ftype in path for ftype in
                           ['parquet', 'parq', 'metadata']):
                    raise ValueError('Directory is not a partition of'
                                     ' *.parquet. Try passing a globstring.')
                return True
        except OSError:
            return False

    @implements(FileSystem.isfile)
    def isfile(self, path):
        path = _sanitize_azure_path(_stringify_path(path))
        try:
            contents = self.fs.ls(path)
            return len(contents) == 1 and contents[0] == path
        except OSError:
            return False

    def walk(self, path, invalidate_cache=True, refresh=False):
        """
        Directory tree generator, like os.walk
        Generator version of what is in ABFSClient, which yields a flattened list of files
        """
        path = _sanitize_azure_path(_stringify_path(path))
        directories = set()
        files = set()

        for key in list(self.fs.ls(path, invalidate_cache=invalidate_cache)):
            path = key
            if self.isdir(path):
                directories.add(path)
            elif self.isfile(path):
                files.add(path)
            else:
                pass

        files = sorted([posixpath.split(f)[1] for f in files
                        if f not in directories])
        directories = sorted([posixpath.split(x)[1]
                              for x in directories])

        yield path, directories, files

        for directory in directories:
            for tup in self.walk(directory, refresh=refresh):
                yield tup
def _sanitize_azure_path(path):
    if path.startswith('adl://'):
        return path.replace('adl://', '')
    else:
        return path

abfs = AzureBlobFileSystem(account_name="xxxxxxxxxxxxxxxxxx", account_key="xxxxxxxxxxxxxxxxx")
wrapped_fs = ABFSWrapper(abfs)

dataset_url_without_protocol_prefix = "/petastorm/parquet"
with Reader(dataset_path=dataset_url_without_protocol_prefix, pyarrow_filesystem=wrapped_fs) as reader:
    .....................................................

The same piece of code can be added to Petastorm Library - similar to _gcsfshelper folder. Let me know your thoughts on the same.

selitvin commented 4 years ago

This is nice! Would you like to take a shot at adding a PR that would add this To Petastorm?

sam-h-bean commented 2 years ago

@upendrarv Do it! I am facing this right now trying to instantiate a batch reader from a parquet table in Azure Blob Storage