ska-sa / katdal

Data access library for the MeerKAT radio telescope
BSD 3-Clause "New" or "Revised" License
12 stars 13 forks source link

SPR1-1944: dask 2022.01.1 breaks katdal due to missing da.core.getem #351

Closed ludwigschwardt closed 2 years ago

ludwigschwardt commented 2 years ago

The latest dask has renamed da.core.getem to da.core.graph_from_arraylike with a different interface (see dask/dask#7417). Revisit the ChunkStore.{get|put}_dask_array machinery to replace getem.

For the "get" case we stick with getem in older dasks, and switch to graph_from_arraylike in newer ones. For the "get" case we revert to da.from_array with a custom arraylike object and getter function.

For the "put" case we switch to da.map_blocks all around (but we would ideally like to use blockwise in the future).

More rationale follows 😄

ludwigschwardt commented 2 years ago

Why did we use da.core.getem?

It provides a convenient way to build a graph around a getter/putter and a chunk specification, in order to simulate an actual array on top of our zarr-like chunk store. We designed our getters and putters to accept dask-style array names and slice specifications as inputs, in order to have synergy with the underlying dask graphs.

There are two use cases: getting a dask array from the chunk store, and putting a dask array into the chunk store.

1. The “get” use case

def ChunkStore.get_dask_array(array_name, chunks, dtype, ...):

The chunk store interface relies on a get_chunk(array_name, slices, dtype) function to retrieve individual chunks. The dtype is partial’ed in, leaving array_name and slices as main parameters. For example,

chunk = getter('x', (slice(2, 4), slice(3, 6)))

will retrieve a 2-by-3 chunk of array 'x' The name and slice specification are combined into a URL to be retrieved from the underlying object store behind the scenes.

This is very similar to da.core.getter, with the array x represented by its name instead of the actual array.

The relevant code in get_dask_array is:

graph = da.core.getem(array_name, chunks, getter)
array = da.Array(graph, array_name, chunks, dtype)
return array

It produces a basic graph like

# Called as: dx = store.get_dask_array('x', ((2, 2),), int)
# This is dx.dask.layers['x'].mapping
{
  ('x', 0): (getter, 'x', (slice(0, 2),)),
  ('x', 1): (getter, 'x', (slice(2, 4),)),
}

Note that 'x' is overloaded to be the name of the dask array, high-level graph, materialised layer and part of the URL name for the custom getter... [dask 2022 doesn't like it though 😂 - we'll revisit it later.]

Let's try the suggested da.from_array route, which is more public/stable. It needs an actual arraylike object, so let's fake it:

class DummyArray:
    def __init__(self, chunks, dtype):
        self.shape = tuple(sum(c) for c in chunks)
        self.ndim = len(self.shape)
        self.dtype = np.dtype(dtype)
dummy_array = DummyArray(chunks, dtype)
array = da.from_array(dummy_array, chunks, array_name, getitem=getter)

The dummy_array will provide .shape, .ndim and more importantly .dtype. Since it won’t be of type np.ndarray, it will use the custom getter instead. (Hint: namedtuple is not good either since it is a subclass of tuple, which is converted to np.ndarray internally by from_array.)

Unfortunately its graph also contains the dummy array, which throws a spanner in the works:

{
  ('x', 0): (getter, 'x', (slice(0, 2),)),
  ('x', 1): (getter, 'x', (slice(2, 4),)),
  'x': dummy_array,
}

The custom getter will now be called on the dummy array, instead of the expected array name.

Can you just pop this key from the underlying mapping? Probably not.

What about HighLevelGraph.from_collection? Give it a shot, following the example in the docstring:

layer = {
    (array_name, i): (getter, array_name, slices)
    for i, slices in enumerate(da.core.slices_from_chunks(chunks))
}
graph = HighLevelGraph.from_collections(array_name, layer)
array = da.Array(graph, array_name, chunks, dtype)

Unfortunately this only works for 1-D arrays. The index i needs to be multidimensional, and constructing the keys like in getem:

keys = product([out_name], *(range(len(bds)) for bds in chunks))

really begs for the restoration of that function or the newer graph_from_arraylike. Oh well, let’s create a shim to choose between the two APIs then…

The nice thing is that graph_from_arraylike works pretty much like getem. The arr parameter may be a placeholder string instead of an actual arraylike object, which is critical for us. It should really be called graph_from_chunks… 😂 We can also use inline_array=True to avoid adding the array itself to the graph.

Here is our shim (we are keeping the simpler getem-based API). We also need a unique out_name that differs from array_name, otherwise the latest dask optimises away the entire graph... 😅

def _graph_from_chunks(array_name, chunks, getter, out_name):
    """Create dask graph from chunk spec like the older :func:`da.core.getem`.

    The basic graph structure (for a 1-D array) is:

      {
        (out_name, 0): (getter, array_name, (slice(start_0, end_0),)),
        (out_name, 1): (getter, array_name, (slice(start_1, end_1),)),
        ...
      }
    """
    try:
        return da.core.graph_from_arraylike(
            array_name,
            chunks,
            shape=tuple(sum(c) for c in chunks),
            name=out_name,
            getitem=getter,
            inline_array=True,
        )
    except AttributeError:
        return da.core.getem(array_name, chunks, getter, out_name=out_name)
ludwigschwardt commented 2 years ago

2. The “put” use case

def ChunkStore.put_dask_array(array_name, array, ...):

Here we want to store the chunks of array as objects in the chunk store. The one advantage of this use case is that we start with a dask array, instead of having to create one from scratch. We basically just need to add another layer that will send each chunk to a custom putter function.

The chunk store interface provides a put_chunk(array_name, slices, chunk) function. This is very similar to the “get” case, just adding the actual array to be stored in the object store as a third parameter, chunk. So we might have

success = putter('x', (slice(2, 4), slice(3, 6)), chunk)

where chunk is an ndarray of shape (2, 3). This is a bit like __setitem__ but with the array name instead of the actual array as first parameter. As before, we don’t want dask to substitute 'x' with its value in this case.

We could have considered da.store but that requires a direct setitem interface on a custom arraylike to achieve our ends. Instead, we construct a custom graph. Unlike store we also return the success of the put operation per chunk (None or a caught Exception, styled into an ndarray of the correct dimensions), instead of the data itself (who wants that? 😁 ).

The relevant code in put_dask_array is:

# Construct output graph on same chunks as input, but with new name
graph = da.core.getem(array_name, array.chunks, putter, out_name=out_name)
# Set chunk parameter of put_chunk() to corresponding key in input array
graph = {k: v + ((array.name,) + k[1:],) for k, v in graph.items()}
dask_graph = dask.highlevelgraph.HighLevelGraph.from_collections(out_name, graph, [array])
# The success array has one element per chunk in the input array
out_chunks = tuple(len(c) * (1,) for c in array.chunks)
return da.Array(dask_graph, out_name, out_chunks, object)

We introduce a new out_name derived from array_name (part of the name of the object in the object store). The graph contains slicing tasks, which is then modified to include a reference to the originating chunk. This is slapped on top of the existing array as a new layer in a HighLevelGraph, and finally put into an Array object.

The resultant graph looks something like this:

# Called as:
# dx = da.from_array(np.arange(4), (2, 2))
# ds = store.put_dask_array('x', dx)
# This has two layers - the original dx array ('array' layer):
{
  ('array', 0): array([0, 1]),
  ('array', 1): array([2, 3])}
}
# And the putter layer ('store-x'):
{
  ('store-x', 0): (putter, 'x', (slice(0, 2),), ('array', 0)),
  ('store-x', 1): (putter, 'x', (slice(2, 4),), ('array', 1)),
}

Another option is da.map_blocks, which will apply the putter function to each chunk in the dask array and return success per chunk:

def map_putter(chunk, block_info=None, store_name=None):
    slices = tuple(slice(*loc) for loc in block_info[0]["array-location"])
    return putter(store_name, slices, chunk)
return da.map_blocks(map_putter, array, chunks=(1,) * array.ndim,
                     dtype=object, store_name=array_name)

This has a similar graph, with the “putter” layer replaced by a Blockwise.

The map_blocks version is a more intuitive replacement for the original getem version, so we’ll go with that.

There are two downsides though…

What about da.blockwise to go directly to a Blockwise layer?

This provides a way to call a putter function on each chunk, but no way to pass along the block info / slices spec at the same time (that’s what map_blocks adds to basic blockwise…).

Let’s go back and look at the internals of da.core.graph_from_arraylike. This calls the lower-level dask.blockwise.blockwise function, which we will also call core_blockwise as da.core does, to avoid confusion with the higher-level da.blockwise. This suggests the following route to assemble the three parameters to our putter function:

from dask.blockwise import blockwise as core_blockwise
from dask.layers import ArraySliceDep

out_ind = tuple(range(array.ndim))
layer = core_blockwise(
    # Function signature: putter(array_name, slices, chunk)
    putter,
    # Output: we want all output chunks
    out_name, out_ind,
    # Arg 1: the same array name is passed to all tasks, so no out_ind
    array_name, None,
    # Arg 2: produce slice spec per chunk on the fly
    ArraySliceDep(array.chunks), out_ind,
    # Arg 3: this represents a chunk of `array`
    array.name, out_ind,
    # We need to tell blockwise about the external array's shape
    numblocks={array.name: array.numblocks},
)
# This part is the same as before
dask_graph = HighLevelGraph.from_collections(out_name, layer, [array])
out_chunks = tuple(len(c) * (1,) for c in array.chunks)
return da.Array(dask_graph, out_name, out_chunks, object)

This indeed works well, with some of the highest benchmarks! 😁

The catch is that ArraySliceDep was only introduced in dask/dask#7417 in January 2022, so it’s really bleeding edge.

Since put_dask_array is not that critical compared to get_dask_array, I'm sticking with the more bog standard map_blocks for now.

ian-r-rose commented 2 years ago

Thanks for the ping @ludwigschwardt, this was a good read!

I have a question about your attempt to use da.from_array(): it seems like your primary objection is that, by default, from_array inserts a copy of your dummy array into the graph. But later down you use inline_array=True with your invocation of graph_from_arraylike(). Would it also work to thread inline_array=True into da.from_array()? I don't know enough about your custom getter to know whether that is a good idea, but this is exactly the type of scenario for which inline_array is intended: where the array-ish class is some lazy representation of a larger store somewhere else.

gjoseph92 commented 2 years ago

The custom getter will now be called on the dummy array, instead of the expected array name.

As before, we don’t want dask to substitute 'x' with its value in this case

It seems like in both get and put, you want the name of the array (like 'x') to be passed as a literal string argument to your function, as opposed to? in addition to? the object at key 'x' being substituted in. I'm assuming you need that string-name in order to come up with a URL in your getter/putter function.

Doing those sorts of substitutions is kind of dask's whole job. Instead, I'd think about other ways to get that information into the function. There are two strategies that come to mind:

  1. When you control the object at key 'x' (it's a DummyArray), just give it a name field
  2. When you don't control the object (it's an arbitrary NumPy array), use functools.partial(putter, name=...) to pre-pass-in the name value.
class DummyArray:
    def __init__(self, chunks, dtype, name):
        self.name = name
        self.shape = tuple(sum(c) for c in chunks)
        self.ndim = len(shape)
        self.dtype = np.dtype(dtype)

    def __getitem__(self, *idx):
        return getter(self.name, idx)

    def __setitem__(self, *idx):
        return putter(self.name, idx)

dummy_array = DummyArray(chunks, dtype, array_name)
array = da.from_array(dummy_array, chunks, array_name, inline_array=False)
... # do stuff to array
output = DummyArray(result.chunks, result.dtype, "result-name")
da.store(result, output)  # or continue using blockwise if you need it to remain an array

Even after the array is created, it is about 18% slower to transfer data with map_blocks compared to getem.

That's interesting, and surprising. I'd expect the map_blocks and blockwise graphs to be nearly identical, the only difference being that map_blocks inefficiently generates a materialized graph because https://github.com/dask/dask/pull/7686 still hasn't happened.

it results in massive serialisations and pickles on the way to a tiny hash string, causing put_dask_array to take more than a minute to create the array

Also surprised by this. Yes, the map_putter function would be pickled once to tokenize it, but I don't see why that would add a full minute. Could be something else going on (again, the inefficient graph generation due to using block_info could be an issue).

ludwigschwardt commented 2 years ago

Thanks for taking the time to read this and for the helpful comments! I hope it gave you a glimpse of how folks are (ab)using dask... 😁

It seems like your primary objection is that, by default, from_array inserts a copy of your dummy array into the graph.

My primary objection is that I cannot return to this idyllic two-liner 🙂

graph = da.core.getem(array_name, chunks, getter)
array = da.Array(graph, array_name, chunks, dtype)

Ideally I want to avoid yet another dummy class layer, unless it has a long-term benefit of greater maintainability. That said, I completely missed inline_array on da.from_array in all my meandering... 🤦🏼‍♂️ It's even got a great docstring!

It seems like in both get and put, you want the name of the array (like 'x') to be passed as a literal string argument to your function, as opposed to? in addition to? the object at key 'x' being substituted in.

As opposed to. It just happened to work in the old regime to have the same string for the object name and the getter argument, but now I need to have a different out_name anyway.

I basically made the getter function signature compatible with a dask slicing task and then leveraged getem to set up the calls.

I'm assuming you need that string-name in order to come up with a URL in your getter/putter function.

Yes.

There are two strategies that come to mind...

Thanks a lot, I'll give DummyArray another shot (both with or without inline_array).

That's interesting, and surprising. I'd expect the map_blocks and blockwise graphs to be nearly identical.

I first thought it was due to the extra contortions I added to support map_blocks but it's not that. Maybe the block_info machinery? Here's a stripped-down example where the put call does nothing:

import dask.array as da
import numpy as np

dx = da.zeros(1000000, chunks=1000)

def _dummy_put(slices, chunk):
    return np.atleast_1d(None)

# OPTION 1: map_blocks

def _put_map_blocks(chunk, block_info=None):
    slices = tuple(slice(*loc) for loc in block_info[0]["array-location"])
    return _dummy_put(slices, chunk)

px = da.map_blocks(_put_map_blocks, dx, dtype=object)
%timeit px.compute()   # 148 ms

# OPTION 2: blockwise

from dask.layers import ArraySliceDep
from dask.highlevelgraph import HighLevelGraph
from dask.blockwise import blockwise as core_blockwise
out_name = "y"
out_ind = (0,)

layer = core_blockwise(
    _dummy_put,
    # Function signature: success = _dummy_put(slices, chunk)
    out_name, out_ind,
    # Parameter 1: slices
    ArraySliceDep(dx.chunks), out_ind,
    # Parameter 2: chunk
    dx.name, out_ind,
    numblocks={dx.name: dx.numblocks},
)
dask_graph = HighLevelGraph.from_collections(out_name, layer, [dx])
out_chunks = tuple(len(c) * (1,) for c in dx.chunks)
py = da.Array(dask_graph, out_name, out_chunks, object)
%timeit py.compute()   # 112 ms

I get a consistent 25% improvement with blockwise, until the chunks become few and large enough. Bear in mind that the actual data transfer will be much slower, so this is really nitpicking about the potentially minimal graph overhead 🙂

Also surprised by this. Yes, the map_putter function would be pickled once to tokenize it, but I don't see why that would add a full minute.

Have a look at this:

In [14]: import numpy as np

In [15]: import dask.array as da

In [16]: class Store:
    ...:     def __init__(self, x):
    ...:         self.x = x
    ...: 
    ...:     def put(self, slices, chunk):
    ...:         self.x[slices] = chunk

In [17]: s = Store(np.zeros((100, 1000, 1000)))  # store 1 GB of data

In [18]: s.put
Out[18]: <bound method Store.put of <__main__.Store object at 0x121dea310>>

In [19]: %time da.core.tokenize(s.put)
CPU times: user 44.7 s, sys: 38.3 s, total: 1min 23s
Wall time: 1min 28s
Out[19]: 'affa99f676ec67eb789e6dbd2048a531'

[As an aside, I foolishly tried to copy'n'paste the above code block verbatim into IPython and it actually ran (at least up to In[18]) 🤯 😁 ]

Interrupting the process reveals that it calls cloudpickle and/or normal pickle on s.put, which seems to serialise the entire 1 GB store in the process. Not a big deal in practice, because we only use these kinds of in-memory stores in the unit tests. But the unit tests do suffer...

Mmm... Rerunning the tokenisation this morning now only takes 15-20 seconds. Still too long for my liking, but at least not a minute anymore.

ludwigschwardt commented 2 years ago

Thanks @gjoseph92 and @ian-r-rose for clearing up my mind!

I've gone full circle back to da.from_array for the "get" use case:

class _ArrayLikeGetter:
    """Present an array-like interface to chunk getter function."""
    def __init__(self, getter, array_name, chunks, dtype, **kwargs):
        self.getter = getter
        self.array_name = array_name
        self.kwargs = kwargs
        # Basic array-like attributes needed by :func:`dask.array.from_array`
        self.shape = tuple(sum(c) for c in chunks)
        self.ndim = len(self.shape)
        self.dtype = np.dtype(dtype)

    def __getitem__(self, slices, asarray=False, lock=None):
        """Call chunk getter on slices set up by :func:`dask.array.from_array`."""
        return self.getter(self.array_name, slices, self.dtype, **self.kwargs)

getter_shim = _ArrayLikeGetter(getter, array_name, chunks, dtype, **kwargs)
return da.from_array(getter_shim, chunks, out_name, asarray=False)

The performance is similar, probably because we swapped a functools.partial (inserting the dtype) for a __setitem__ wrapper function. Since the array_name does not pass through dask's paws, the need for inline_array has also disappeared. 🙂

The “put” use case is still using map_blocks instead of using this dummy array class with da.store because __setitem__ does not lend itself to returning errors.

Unless we patch da.core.load_store_chunk like this...

- def load_store_chunk(x, out, index, lock, return_stored, load_stored):
+ def load_store_chunk(x, out, index, lock, return_stored, load_stored, return_errors_of_type=UnusedException):
      result = None
      if return_stored and not load_stored:
          result = out

      if lock:
          lock.acquire()
      try:
          if x is not None:
              if is_arraylike(x):
                  out[index] = x
              else:
                  out[index] = np.asanyarray(x)
          if return_stored and load_stored:
              result = out[index]
+     except return_errors_of_type as err:
+         result = err
      finally:
          if lock:
              lock.release()

      return result

You pass in the base class of errors you want to return in return_errors_of_type (ChunkStoreError in our case). The above function then returns either None (i.e. success) or the exception object, without crashing the whole thing.

Just thinking out aloud 🙂

ludwigschwardt commented 2 years ago

Ugh... The saga is still not resolved.

It turns out that dask 2022.03.0 works fine, but 2022.01.0 and even 2021.12.0 doesn't.

Our underlying ChunkStore.get_chunk function expects slices that match the stored NPY file shape, as a safety check.

The get_dask_array function allows a slicing index to select part of the stored array. With our new Array getter, the chunk slices and top-level index slice are fused during optimization, and the slicing task calls get_chunk with incorrect slices. Interestingly, the latest dask does not fuse this scenario like before.

Some clues from the dask objects:

dask 2022.01.0 (broken)

HighLevelGraph with 2 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x7f60581e6af0>
 0. x-5fe0ffd8d63a013fd260312ee9aa78e7              # standard Array slices (non-inline)
 1. getitem-1c1b4ff098d8ac7419c760adfd98d224        # further slices

dask 2022.03.0 (works)

HighLevelGraph with 3 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x12831d340>
 0. original-x-2c31ed0a37106da99c07bb5f8baebbe4     # just the ArrayLikeGetter
 1. x-2c31ed0a37106da99c07bb5f8baebbe4              # Blockwise layer
 2. getitem-d6a9e4af0dc4fb7ebcdbc8f07aacce6e        # further slices

I guess our array-like breaks a basic assumption: that it is arbitrarily sliceable.

I am loath to just call compute(optimize_graph=False). Any suggestions welcome!

ludwigschwardt commented 2 years ago

Hi @gjoseph92 and @ian-r-rose, is there a way to disable graph optimisation for a layer? I want to prevent fuse from gobbling up my carefully crafted slices inside an Array. Passing fuse_keys to compute() seems suboptimal, since array creation and the actual compute call are miles apart in the code.

Never mind... It looks like providing an explicit getitem to from_array works, since it won't be in the da.optimization.GETTERS tuple. Thanks for the tip in https://github.com/dask/dask/issues/8753#issuecomment-1072792646.

It looks like dask 2022.04.0 also requires inline_array=True to use a custom getter...

ludwigschwardt commented 2 years ago

Hopefully the final issues:

The _ArrayLikeGetter scheme only works properly on dask 2022.03.0, and then only because of a regression (see dask/dask#8753).

When using the index parameter of get_dask_array to slice the array into a smaller one, the dask fuse graph optimisation step merges slices of slices into single slices, but this makes it incompatible with the chunk store, which expects very specific slices. That is, the chunk store dask array has a fixed set of chunks and is not arbitrarily sliceable as expected by fuse.

The solution is to specify a custom getter to da.from_array, even though it is the expected _ArrayLikeGetter.__getitem__. This prevents further optimisation of those getter tasks.

However, the plot thickens... The custom getter is not called by the latest dask 2022.04.0 unless you also set inline_array=True. This parameter first appeared in dask 2021.01.0, which also happens to be one of the last releases that still supports Python 3.6, so it's all good.

I now call from_array like this:

getter_shim = _ArrayLikeGetter(getter, array_name, chunks, dtype, **kwargs)
array = da.from_array(
    getter_shim,
    chunks,
    out_name,
    # Don't cast chunks to ndarray because some of them may be `PlaceholderChunk`s
    asarray=False,
    # Set custom getter anyway to prevent slice fusion during graph optimisation,
    # which ends up calling chunk store with wrong chunks.
    getitem=_ArrayLikeGetter.__getitem__,
    # Dask 2022.04.0 ignores the custom getter unless this is True
    inline_array=True
)
return array[index]
ludwigschwardt commented 2 years ago

Ready for review again, @KimMcAlpine and @richarms 🙂

ludwigschwardt commented 2 years ago

At least the code now works on all dasks back to 1.2.1, which is a good sign for compatibility 🙂