pydata / xarray

N-D labeled arrays and datasets in Python
https://xarray.dev
Apache License 2.0
3.55k stars 1.06k forks source link

Expose xarray's h5py serialization capabilites as public API? #4242

Open rabernat opened 4 years ago

rabernat commented 4 years ago

Xarray has a magic ability to serialize h5py datasets. We should expose this somehow and allow it to be used outside of xarray.

Consider the following example:

import s3fs
import h5py
import dask.array as dsa
import xarray as xr
import cloudpickle

url = 'noaa-goes16/ABI-L2-RRQPEF/2020/001/00/OR_ABI-L2-RRQPEF-M6_G16_s20200010000216_e20200010009524_c20200010010034.nc'
fs = s3fs.S3FileSystem(anon=True)
f = fs.open(url)
ds = h5py.File(f, mode='r')
data = dsa.from_array(ds['RRQPE'])
_ = cloudpickle.dumps(data)

This raises TypeError: h5py objects cannot be pickled.

However, if I read the file with xarray...

ds = xr.open_dataset(f, chunks={})
data = ds['RRQPE'].data
_ = cloudpickle.dumps(data)

It works just fine. This has come up in several places (e.g. https://github.com/dask/s3fs/issues/337, https://github.com/dask/distributed/issues/2787).

It seems like the ability to pickle these arrays is broadly useful, beyond xarray.

  1. How does our magic work?
  2. What would it look like to break this magic out and expose it as public API (or inside another package)
shoyer commented 4 years ago

The secret is our CachingFileManager object: https://github.com/pydata/xarray/blob/b1c7e315e8a18e86c5751a0aa9024d41a42ca5e8/xarray/backends/file_manager.py

This class is essential to writing a performant / serializable backend for xarray/dask, so we definitely should expose it as public API as part of our backends refactor. I would not object to breaking it out into another package if someone is interested enough to make that happen.

stale[bot] commented 2 years ago

In order to maintain a list of currently relevant issues, we mark issues as stale after a period of inactivity

If this issue remains relevant, please comment here or remove the stale label; otherwise it will be marked as closed automatically

jakirkham commented 2 years ago

FWIW this sounds similar to what h5pickle does. Maybe it is worth improving that package with whatever logic Xarray has?

ilan-gold commented 5 months ago

I wound be extremely interested in, at the minimum, documenting this publicly. For example, this is what I think should work based on looking at the code

import dask.array as da
import dask.distributed as dd
import h5py
import numpy as np
from xarray.backends import CachingFileManager

X = np.random.randn(100, 100)
manager = CachingFileManager(h5py.File, 'data.h5', mode='w')
manager.acquire().create_dataset("X", (100,100), 'f', data=X)
with dd.LocalCluster(n_workers=1,threads_per_worker=1) as cluster:
    with dd.Client(cluster) as client:
        with manager.acquire_context(needs_lock=False) as root: # or True
            X_h5 = root['X']
            da.from_array(X_h5)[...].compute()

Or something along these lines but I am not sure really how to use CachingFileManager properly so don't know. I would like to be able to use lazily loaded hdf5 files with dask distributed.

Thanks!

ilan-gold commented 5 months ago

Update:


import dask.array as da
import dask.distributed as dd
import h5py
import numpy as np
from xarray.backends import CachingFileManager

chunksize = 10
numchunks = 10
size = (chunksize * numchunks, chunksize * numchunks)
X = np.random.randn(*size)
h5py.File('data.h5', 'w').create_dataset("X", size, 'f', data=X)
manager = CachingFileManager(h5py.File, 'data.h5', mode='r')
def get_chunk(block_id=None):
    with manager.acquire_context(needs_lock=False) as f:
        x = block_id[0] * chunksize
        y = block_id[1] * chunksize
        chunk = f['X'][x: x + chunksize, y : y + chunksize]
    return chunk
with dd.LocalCluster(n_workers=1,threads_per_worker=1) as cluster:
    with dd.Client(cluster) as client:
        res = da.map_blocks(get_chunk, chunks=((chunksize, )* numchunks, (chunksize,) * numchunks), meta=np.array([]), dtype=X.dtype)[...].compute()
print(res)

works