zarr-developers / zarr-python

An implementation of chunked, compressed, N-dimensional arrays for Python.
https://zarr.readthedocs.io
MIT License
1.47k stars 273 forks source link

Proof of concept: CloudFilesStore #767

Open rabernat opened 3 years ago

rabernat commented 3 years ago

We currently rely 100% on fsspec and its implementations for accessing cloud storage (s3fs, gcsfs, adlfs). Cloud storage is complicated, and for debugging purposes, it could be useful to have an alternative. Since I met @william-silversmith a few years ago, I have been curious about CloudFiles:

https://github.com/seung-lab/cloud-files/

CloudFiles was developed to access files from object storage without ever touching disk. The goal was to reliably and rapidly access a petabyte of image data broken down into tens to hundreds of millions of files being accessed in parallel across thousands of cores. The predecessor of CloudFiles, CloudVolume.Storage, the core of which is retained here, has been used to processes dozens of images, many of which were in the hundreds of terabyte range. Storage has reliably read and written tens of billions of files to date.

Highlights

  1. Fast file access with transparent threading and optionally multi-process.
  2. Google Cloud Storage, Amazon S3, local filesystems, and arbitrary web servers making hybrid or multi-cloud easy.
  3. Robust to flaky network connections. Uses exponential random window retries to avoid network collisions on a large cluster. > Validates md5 for gcs and s3.
  4. gzip, brotli, and zstd compression.
  5. Supports HTTP Range reads.
  6. Supports green threads, which are important for achieving maximum performance on virtualized servers.
  7. High efficiency transfers that avoid compression/decompression cycles.
  8. High speed gzip decompression using libdeflate (compared with zlib).
  9. Bundled CLI tool.
  10. Accepts iterator and generator input.

Today I coded up a quick CloufFiles-based store for Zarr

from cloudfiles import CloudFiles

class CloudFilesMapper:

    def __init__(self, path, **kwargs):
        self.cf = CloudFiles(path, **kwargs)

    def clear(self):
        self.cf.delete(self.cf.list())

    def getitems(self, keys, on_error="none"):
        return {item['path']: item['content'] for item in self.cf.get(keys, raw=True)}

    def setitems(self, values_dict):
        self.cf.puts([(k, v) for k, v in values_dict.items()])

    def delitems(self, keys):
        self.cf.delete(keys)

    def __getitem__(self, key):
        return self.cf.get(key)

    def __setitem__(self, key, value):
        self.cf.put(key, value)

    def __iter__(self):
        for item in self.cf.list():
            yield item

    def __len__(self):
        raise NotImplementedError

    def __delitem__(self, key):
        self.cf.delete(key)

    def __contains__(self, key):
        return self.cf.exists(key)

    def listdir(self, key):
        for item in self.cf.list(key):
            yield item.lstrip(key).lstrip('/')

    def rmdir(self, prefix):
        self.cf.delete(self.cf.list(prefix=prefix))

In my test with GCS, it works just fine with Zarr, Xarray, and Dask: https://nbviewer.jupyter.org/gist/rabernat/dde8b835bb7ef0590b6bf4034d5e0b2f

Distributed read performance was about 50% slower than gcsfs, but my benchmark is probably biased.

It might be useful to have the option to switch between the fsspec-based stores and this one. If folks are interested, we could think about adding this to zarr-python as some kind of optional alternative to fsspec.

william-silversmith commented 3 years ago

Thanks for this Ryan! I'm having trouble running the notebook on my M1 (several dependencies are x86 only even though I have Rosetta2), but I'll try again with my x86 machine later. I'm very interested to see what the profile says. I'm always interested in increasing performance.

One minor quick tip. setitems can be written using a generator to avoid the list memory and generation time overhead.

    def setitems(self, values_dict):
        self.cf.puts(( (k, v) for k, v in values_dict.items() ))
william-silversmith commented 3 years ago

Also, it just occurred to me that you mentioned distributed read performance. I've found on virtualized servers that the Python threads jump around between the different cores in a way that doesn't happen on real hardware. Try using the green threads option and you might be pleasantly surprised. You can achieve similar results using taskset on linux machines to pin a process to a single core. I hope to at some point in the next 6 months to experiment more with an async/await implementation that would avoid the need for the gevent library and would enable http/2 support.

rabernat commented 3 years ago

Also, it just occurred to me that you mentioned distributed read performance.

Specifically, Dask.distributed. In our group, we are usually doing our scaling out via dask. One point I don't understand is how CloudFiles' thread-based parallelism would interact with Dask's multi-faceted parallelism (workers, processes, threads, etc.)

william-silversmith commented 3 years ago

One way to cope with the additional threading would be to write n_threads=1. Then the dask threads will determine how many connections are live.

On Fri, Jun 4, 2021, 4:19 PM Ryan Abernathey @.***> wrote:

Also, it just occurred to me that you mentioned distributed read performance.

Specifically, Dask.distributed. In our group, we are usually doing our scaling out via dask. One point I don't understand is how CloudFiles' thread-based parallelism would interact with Dask's multi-faceted parallelism (workers, processes, threads, etc.)

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/zarr-developers/zarr-python/issues/767#issuecomment-854977463, or unsubscribe https://github.com/notifications/unsubscribe-auth/AATGQSOV26TDUCWVSBQDRHLTREYL3ANCNFSM46B7NMZA .

shoyer commented 3 years ago

This looks like a great idea! 👍

Generally you want to use many more parallel threads for IO-bound work (like reading/writing Zarr chunks) than from for compute-bound work (for which you want roughly one thread per core). So I think this nested parallelism is actually probably a good thing -- you want more threads for the IO tasks than you want for the compute tasks.

jakirkham commented 3 years ago

Somewhat related discussion about using multiple ThreadPoolExecutors per Dask Worker from earlier today here ( https://github.com/dask/distributed/issues/4655#issuecomment-854881294 )

martindurant commented 3 years ago

Note that fsspec uses asyncio to fetch multiple chunks concurrently, so this can greatly increase performance by setting each dask partition to be larger than the zarr chunksize.