DaanVanVugt / h5pickle

Wrapper for h5py with pickle capabilities
MIT License
25 stars 8 forks source link

How to use this with Dask array? #6

Closed rzu512 closed 6 years ago

rzu512 commented 6 years ago

I want something like:

import dask.array as da
x = da.to_hdf5(hdf5_filename, key)
x.compute()

And it would open the file at beginning of computation and close it when the computation graph doesn't have to read the array anymore.

DaanVanVugt commented 6 years ago

Typically I would load the hdf5 object first and then use dask with it. I've created a utility function to automatically wrap a hdf5 dataset into a dask array:

"""Wrap a dask.array around something array-like

This module defines a wrapper, wrap_dask, for array values like h5py.Dataset.
Chunking can be performed automatically or with a user-defined function

Example:
    To create a dask array from a h5py dataset with automatic chunking use:

        wrap_dask(h5py.File('file.h5', 'r')['/x'])

    The chunksize can also be set manually

        wrap_dask(h5py.File('file.h5', 'r')['/x'], chunks=(100,100))

    Finally we can set the threshold for reading later or now, to have some
    control over I/O overhead. For instance, to always read lazily:

        wrap_dask(h5py.File('file.h5', 'r')['/x'], lazy_threshold=0)

"""

import numpy as np
try:
    import dask.array
except ImportError:
    raise NotImplementedError("You need the dask package to use this wrapper")
else:

    def chunk_one_dimension(dims, dim=-1):
        """Define chunking along every but the last dimension of an array.

        This function creates chunks along the whole of the last dimension, i.e.
        an array of shape (10,20,1000) would get chunks of size (1,1,1000).

        Args:
            dims (tuple): A tuple of sizes along each dimension, like a.shape
            dim (int): Which dimension to create the chunk along (the last by default)

        Returns:
            tuple: Chunk size
        """
        chunksize = [1 for d in dims]
        chunksize[dim] = dims[dim]
        return tuple(chunksize)

    def dask_wrapper(lazy_threshold=10000, chunks=chunk_one_dimension, **kwargs_1):
        """Return a function wrapping an array with dask.array with specified args"""
        def wrap_dask(value, **kwargs_2):
            """Wrap an array with dask.array, immediately if it's small or lazily if it's large.

            Create a delayed array from some input.
            The input should have a .shape property (like np.array)

            Args:
                value: Which input to wrap. Could be h5py.Dataset or other source
                lazy_threshold (int): Minimum size of array to lazy-load. Otherwise load now
                chunks: Tuple of chunk sizes or function to determine chunk size
                *args, **kwargs: passed to dask.array.from_array

            Returns:
                dask.array object
            """
            kwargs = {}
            if callable(chunks):
                kwargs['chunks'] = chunks(value.shape)
            else:
                kwargs['chunks'] = chunks
            if 'name' in kwargs_2: kwargs['name'] = kwargs_2['name']

            # If it is a 'small' array read it directly, otherwise just pass the input
            if np.prod(value.shape) < lazy_threshold:
                return dask.array.from_array(np.asarray(value), **kwargs)
            else:
                return dask.array.from_array(value, **kwargs)
        return wrap_dask

I hope this helps.