Open dcherian opened 1 year ago
Oops I think I missed this one when creating the ChunkManagerEntrypoint
in https://github.com/pydata/xarray/pull/7019. Looks like calling xarray's ffill
or bfill
on an array with chunked core dimensions will attempt to apply dask.array.cumreduction
along the core dims even if the underlying array is a cubed.Array
.
Thanks @dcherian - great suggestion. It would be interesting to see how we could implement this in Cubed.
The Python Array API spec has a proposal for cumulative_sum
, which is a special case of this operation.
Also, I noticed that bfill
in Xarray uses flip
twice, so #114 may be relevant (although it might be more efficient to avoid flip
in this case, due to Zarr's regular chunks requirement - flip
will violate this since the final chunk will become the first chunk, so the whole array needs rewriting).
I think the relevant part of the Nvidia doc is "39.2.4 Arrays of Arbitrary Size", which explains how to apply the algorithm to chunked (or blocked) arrays.
We could implement this by using the NumPy cumsum
operation (or equivalent for the generalised operation) on the blocks, and then create the auxiliary arrays of sums (SUMS
using the Nvidia terminology) and cumulative sums (INCR
).
Naively, INCR
could have chunks of size one so that it could be mapped using a blockwise operation with the scanned blocks, but this would not scale well since if there were say 1000 chunks, then a single task would have to write 1000 tiny chunk files when storing INCR
as a Zarr file.
Instead we could write INCR
as a Zarr file with a single chunk, then a task processing a scanned block i
would load the entire INCR
file, but just use the i
-th value to add to its chunk. This should work as long as the object store could handle many concurrent reads. Cubed has a map_direct
operation that allows access to side inputs (INCR
) for cases like this.
Ah yes that figure in "39.2.4" is what I remember. Such a cool algorithm!
Here's the dask PR where I learnt of this: https://github.com/dask/dask/pull/6675
I'm trying to do this to learn more about cubed. :)
The start is easy:
import numpy as np
scan = np.cumsum
scanned = blockwise(
scan,
"ij",
array,
"ij",
axis=-1,
)
# TODO could also return {"scan": np.cumsum(array), "reduce": np.sum(array)} in `scanned`
reduced = blockwise(
np.sum,
"ij",
array,
"ij",
axis=-1,
adjust_chunks={"j": 1},
keepdims=True,
)
But I'm stuck at scanning the intermediate sums which have chunksize=1 along the scanned axis (here the last axis). It seems like at this point, we'd want to merge_chunks
to some suitable size.
In general, we'll still have multiple chunks after merging, so we'd want the balanced tree algorithm in any case (Section 39.2.2), similar to tree_reduce
/partial_reduce
. Correct? In which case, we might as well run the balanced tree from the beginning? One thing I don't understand is where the I/O will end up happening for this algorithm. Help!
I guess for a simpler start, I could run a sequential scan after merge_chunks
.
I'm trying to do this to learn more about cubed. :)
Yay!
similar to
tree_reduce/partial_reduce
. Correct?
I think so?
we'd want to
merge_chunks
to some suitable sizeOne thing I don't understand is where the I/O will end up happening for this algorithm.
merge_chunks
is a special case of rechunk
(see https://cubed-dev.github.io/cubed/operations.html#rechunk for visual), which means the IO will end up happening each time merge_chunks
is called. The blockwise
operations are embarrassingly parallel, and so don't require IO (they will be fused with other blockwise operations until they hit the non-blockwise merge_chunks
). So basically each round of the tree reduction will require one stage of IO.
(correct me if any of that is wrong @tomwhite )
That's great Deepak!
We definitely want a multi-level merge chunks function. I think that Cubed's reduction
function is almost what we want - rather than reducing along an axis we just keep the array the same size and change the chunking. I'll have a go at changing this in a branch so you've got something to work with.
I guess for a simpler start, I could run a sequential scan after
merge_chunks
.
Maybe - although is there not a problem with having to combine block i with block i+1 (section 39.2.4) - so it's not a true blockwise function?
See https://github.com/cubed-dev/cubed/tree/tree-merge-chunks. The trick is to use a combine_size
of -1 along an axis to mean the size shouldn't change.
Sorry this wasn't clear.
I'm at the scan-block-sums stage (middle of the image).
At this point all individual block sums are in separate blocks. Now we could
(1) Compute this scan sequentially which would be a slow blocking task in the middle.
(2) merge_chunks
to a suitable intermediate chunking determined by memory limits. And then recursively apply the scan. Terminate when merge_chunks
merges to one chunk along the axis. At that point compute the scan in memory on one worker and return.
I think that In the general case, you cannot merge the whole thing into one chunk along the core-dimension. Imagine if the array size (even after blockwise sum reduction) was > individual node memory. [Is this right?]
Maybe - although is there not a problem with having to combine block i with block i+1 (section 39.2.4) - so it's not a true blockwise function?
Yes, but we can pad the identity element to the beginning of the SUMS
array for which the chunksize is 1 prior to merge_chunks
I think I'll experiment with (2) tonight.
@tomwhite Cubed is quite impressive! :clap: :clap:
I got it to work! Used some recursion, but no padding. Just some trickery in map_direct
.
from functools import partial
import cubed.array_api as xp
import numpy as np
from cubed.core.ops import (
blockwise,
general_blockwise,
map_direct,
merge_chunks,
reduction,
)
from cubed.pad import pad
from cubed.primitive.blockwise import key_to_slices
def wrapper_binop(out: np.ndarray, left, right, *, binop, block_id, axis, identity):
left_slicer = key_to_slices(block_id, left)
right_slicer = list(left_slicer)
# For the first block, we add the identity element
# For all other blocks `k`, we add the `k-1` element along `axis`
right_slicer[axis] = slice(block_id[axis]-1, block_id[axis])
right_slicer = tuple(right_slicer)
right_ = right[right_slicer] if block_id[axis] > 0 else identity
return binop(left[left_slicer], right_)
def scan(array, scan_func, reduce, binop, identity, axis):
# Blelloch (1990) out-of-core algorithm.
# 1. First, scan blockwise
scanned = blockwise(
scan_func,
"ij",
array,
"ij",
axis=axis,
)
# If there is only a single chunk, we can be done
if array.numblocks[-1] == 1:
return scanned
# 2. Calculate the blockwise reduction
# TODO: could also merge(1,2) by returning {"scan": np.cumsum(array), "reduce": np.sum(array)} in `scanned`
reduced = blockwise(
reduce,
"ij",
array,
"ij",
axis=axis,
adjust_chunks={"j": 1},
keepdims=True,
)
# 3. Now scan `reduced` to generate the increments for each block of `scanned`.
# Here we diverge from Blelloch, who assumes that `reduced` is small enough for an in-memory scan.
# We generalize that by recursively applying the scan to `reduced`.
# 3a. First we merge to a decent intermediate chunksize since reduced.chunksize[axis] == 1
new_chunksize = min(reduced.shape[axis], reduced.chunksize[axis] * 5)
new_chunks = reduced.chunksize[:-1] + (new_chunksize,)
merged = merge_chunks(reduced, new_chunks)
# 3b. Recursively scan this merged array to generate the increment for each block of `scanned`
increment = scan(
merged, scan_func, reduce=reduce, binop=binop, identity=identity, axis=axis
)
# 4. Back to Blelloch. Now that we have the increment, add it to the blocks of `scanned`.
# Use map_direct since the chunks of increment and scanned aren't aligned anymore.
#. Bada-bing, bada-boom.
assert increment.shape[axis] == scanned.numblocks[axis]
return map_direct(
partial(wrapper_binop, binop=binop, axis=axis, identity=identity),
scanned,
increment,
shape=scanned.shape,
dtype=scanned.dtype,
chunks=scanned.chunks,
extra_projected_mem=scanned.chunkmem * 2, # arbitrary
)
result = scan(
array, reduce=np.sum, scan_func=np.cumsum, binop=np.add, identity=0, axis=-1
)
print(result)
print(result.compute())
np.testing.assert_equal(result, np.cumsum(array.compute(), axis=-1))
Nice! What does the graph look like? (result.visualize(optimize_graph=True/False)
)
Amazing!
You certainly picked one of the most difficult functions to implement for your first contribution @dcherian. It's the first use of recursion to build a Cubed graph 😄
use of recursion to build a Cubed graph
I'm still not sure this is a good idea.
I re-read Blelloch:
the tree technique is then used to prescan the processor sums.
I don't understand if this kind of tree will work well with cubed. It is what dask does: https://github.com/dask/dask/blob/8dcf306e831ad91df32c0e45bd1670c93bc25b89/dask/array/reductions.py#L1364.
Nice! What does the graph look like?
Forcing at least two layers of recursion (I think):
If you're looking for something to do :), then scans would be a good thing to add.
Dask calls this "cumreduction" (terrible name!) : and its a quite useful primitive (xarray uses it for
ffill
,bfill
). It's also a fun algorithm to think about: https://developer.nvidia.com/gpugems/gpugems3/part-vi-gpu-computing/chapter-39-parallel-prefix-sum-scan-cuda see the blelloch, 1990 section)