dask / dask-image

Distributed image processing
http://image.dask.org/en/latest/
BSD 3-Clause "New" or "Revised" License
210 stars 47 forks source link

unnable to wrap function with da.as_gufunc for cupy array #275

Open lrlunin opened 1 year ago

lrlunin commented 1 year ago

Describe the issue: I was happy to use dask-image with processing many images with CPU. Now I want to look whether this task can be rather accelerated with GPU. Previously I defined my custom function in this way:

@da.as_gufunc(signature=f"(i,j),(),()->(i,j)", output_dtypes=np.int16, vectorize=True)
def cpu_convolve_peak(img, th, dis):
    mask_hor_np = np.ones([1,2])
    clusters = np.fftconvolve(img, mask_hor_np, mode='same')
    # doing some further stuff with clusters as local peak search etc
    return clusters

and applied it to images loaded with dask_image.imread.imread("...", arraytype="numpy" ) and it was perfekt.

Now I loading my images with cp_images = dask_image.imread.imread("...", arraytype="cupy" ) and have a python function which doing some sort of complex things inside using GPU-only functions inside.

@da.as_gufunc(signature=f"(i,j),(),()->(i,j)", output_dtypes=cp.int16, vectorize=True)
def gpu_convolve_peak(img, th, dis):
    mask_hor_cp = cp.ones([1,2])
    clusters = cuscipy.fftconvolve(img, mask_hor_cp, mode='same')
    # doing some further stuff with clusters as local peak search etc
    return clusters

When now I try to execute the following code:

gpu_convolve_peak(cp_images, 100, 2).compute()

or either apply more functions at the end, for example:

gpu_convolve_peak(cp_images, 100, 2).sum(axis=0).compute()

I will get an error: TypeError: Implicit conversion to a NumPy array is not allowed. Please use `.get()` to construct a NumPy array explicitly.

It seems like dask inside tries to handle the array as numpy array despite it is cupy array. I would also ask whether my approach is correct in a sense of CUDA architecture. When I executing a cupy function it will work only in a "single core mode" on the GPU, right? I mean that GPU doesn't parallelize it itself and I need to run more instances to get this kind of "GPU multicore supremacy" with all these 2000+ CUDA cores on my GPU. I would be very thankful if you correct me and tell what the right way would be.

Thank you so much for your beautiful project and nice code!

Environment:

GenevieveBuckley commented 1 year ago

Can you provide the full error traceback (i.e. everything else that gets printed above that TypeError message)? That would help to figure out where things are going wrong.

Also, just to check, can you run your gpu function on a single cupy input array?

lrlunin commented 1 year ago

executing:

images_cp = dask_image.imread.imread("/home/lrlunin/50/*", arraytype="cupy")
gpu_convolved = gpu_convolve_peak(images_cp, 100, 1)
gpu_convolved_comp = gpu_convolved.compute()

results

TypeError                                 Traceback (most recent call last)
Cell In [15], line 1
----> 1 gpu_convolved_comp = gpu_convolved.compute()

File ~/miniconda3/envs/rapids-22.10/lib/python3.8/site-packages/dask/base.py:315, in DaskMethodsMixin.compute(self, **kwargs)
    291 def compute(self, **kwargs):
    292     """Compute this dask collection
    293 
    294     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    313     dask.base.compute
    314     """
--> 315     (result,) = compute(self, traverse=False, **kwargs)
    316     return result

File ~/miniconda3/envs/rapids-22.10/lib/python3.8/site-packages/dask/base.py:600, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    597     keys.append(x.__dask_keys__())
    598     postcomputes.append(x.__dask_postcompute__())
--> 600 results = schedule(dsk, keys, **kwargs)
    601 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/rapids-22.10/lib/python3.8/site-packages/distributed/client.py:3057, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3055         should_rejoin = False
   3056 try:
-> 3057     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3058 finally:
   3059     for f in futures.values():

File ~/miniconda3/envs/rapids-22.10/lib/python3.8/site-packages/distributed/client.py:2226, in Client.gather(self, futures, errors, direct, asynchronous)
   2224 else:
   2225     local_worker = None
-> 2226 return self.sync(
   2227     self._gather,
   2228     futures,
   2229     errors=errors,
   2230     direct=direct,
   2231     local_worker=local_worker,
   2232     asynchronous=asynchronous,
   2233 )

File ~/miniconda3/envs/rapids-22.10/lib/python3.8/site-packages/distributed/utils.py:339, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    337     return future
    338 else:
--> 339     return sync(
    340         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    341     )

File ~/miniconda3/envs/rapids-22.10/lib/python3.8/site-packages/distributed/utils.py:406, in sync(loop, func, callback_timeout, *args, **kwargs)
    404 if error:
    405     typ, exc, tb = error
--> 406     raise exc.with_traceback(tb)
    407 else:
    408     return result

File ~/miniconda3/envs/rapids-22.10/lib/python3.8/site-packages/distributed/utils.py:379, in sync.<locals>.f()
    377         future = asyncio.wait_for(future, callback_timeout)
    378     future = asyncio.ensure_future(future)
--> 379     result = yield future
    380 except Exception:
    381     error = sys.exc_info()

File ~/miniconda3/envs/rapids-22.10/lib/python3.8/site-packages/tornado/gen.py:762, in Runner.run(self)
    759 exc_info = None
    761 try:
--> 762     value = future.result()
    763 except Exception:
    764     exc_info = sys.exc_info()

File ~/miniconda3/envs/rapids-22.10/lib/python3.8/site-packages/distributed/client.py:2089, in Client._gather(self, futures, errors, direct, local_worker)
   2087         exc = CancelledError(key)
   2088     else:
-> 2089         raise exception.with_traceback(traceback)
   2090     raise exc
   2091 if errors == "skip":

File ~/miniconda3/envs/rapids-22.10/lib/python3.8/site-packages/dask/optimization.py:990, in __call__()
    988 if not len(args) == len(self.inkeys):
    989     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File ~/miniconda3/envs/rapids-22.10/lib/python3.8/site-packages/dask/core.py:149, in get()
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File ~/miniconda3/envs/rapids-22.10/lib/python3.8/site-packages/dask/core.py:119, in _execute_task()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/miniconda3/envs/rapids-22.10/lib/python3.8/site-packages/numpy/lib/function_base.py:2328, in __call__()
   2325     vargs = [args[_i] for _i in inds]
   2326     vargs.extend([kwargs[_n] for _n in names])
-> 2328 return self._vectorize_call(func=func, args=vargs)

File ~/miniconda3/envs/rapids-22.10/lib/python3.8/site-packages/numpy/lib/function_base.py:2402, in _vectorize_call()
   2400 """Vectorized call to `func` over positional `args`."""
   2401 if self.signature is not None:
-> 2402     res = self._vectorize_call_with_signature(func, args)
   2403 elif not args:
   2404     res = func()

File ~/miniconda3/envs/rapids-22.10/lib/python3.8/site-packages/numpy/lib/function_base.py:2428, in _vectorize_call_with_signature()
   2424 if len(args) != len(input_core_dims):
   2425     raise TypeError('wrong number of positional arguments: '
   2426                     'expected %r, got %r'
   2427                     % (len(input_core_dims), len(args)))
-> 2428 args = tuple(asanyarray(arg) for arg in args)
   2430 broadcast_shape, dim_sizes = _parse_input_dimensions(
   2431     args, input_core_dims)
   2432 input_shapes = _calculate_shapes(broadcast_shape, dim_sizes,
   2433                                  input_core_dims)

File ~/miniconda3/envs/rapids-22.10/lib/python3.8/site-packages/numpy/lib/function_base.py:2428, in <genexpr>()
   2424 if len(args) != len(input_core_dims):
   2425     raise TypeError('wrong number of positional arguments: '
   2426                     'expected %r, got %r'
   2427                     % (len(input_core_dims), len(args)))
-> 2428 args = tuple(asanyarray(arg) for arg in args)
   2430 broadcast_shape, dim_sizes = _parse_input_dimensions(
   2431     args, input_core_dims)
   2432 input_shapes = _calculate_shapes(broadcast_shape, dim_sizes,
   2433                                  input_core_dims)

File cupy/_core/core.pyx:1473, in cupy._core.core._ndarray_base.__array__()

TypeError: Implicit conversion to a NumPy array is not allowed. Please use `.get()` to construct a NumPy array explicitly.

Execution the same function without dask annotation

def gpu_convolve_peak_no_dask(img, th, dis):
    mask_hor_cp = cp.ones([1,2])
    clusters_scipy_hor = cuscipy.fftconvolve(img, mask_hor_cp, mode='same')
    # doing some further stuff
    return clusters_scipy_hor 

on a single image cupy array works without problem.

But I noticed interesting thing. Function wrapped with @da.as_gufunc applied on a single cupy array:

single_img = imread("/home/lrlunin/50/220421_run_d0_f0_50_f00001.tiff")
ped = imread('/home/lrlunin/20220421_data_evaluation/ped_48.tiff')
single_img_min_ped = single_img - ped
single_cp_img = cp.array(single_img_min_ped)
res = gpu_convolve_peak(single_cp_img, 100, 1)
print(res)

gives the following output: dask.array<transpose, shape=(400, 400), dtype=int16, chunksize=(400, 400), chunktype=cupy.ndarray> Also cupy.ndarray as expected.

GenevieveBuckley commented 1 year ago

Did a bit of digging here, and I think it's possible the two np.vectorize() calls in dask/array/gufunc.py (here and here) could be causing problems.

I tried a quick test to check this theory, replacing np.vectorize with cupy.vectorize at those places... and got a message that NotImplementedError: cupy.vectorize does not supportsignatureoption currently. So this might not be a quick fix, I'm afraid. This is with cupy-cuda11x version 11.0.0 from PyPI.

GenevieveBuckley commented 1 year ago

I think we should transfer this issue to the main dask repository, for two reasons:

  1. This issue relates mostly to the code in dask.array.gufunc.py, which lives in the main dask repository, and
  2. There are more people over there with an interest in combining cupy + dask, and it would be good to get more eyes on this issue (cc @quasiben and @jakirkham from the NVIDIA team)

@jakirkham can you transfer this issue across to dask/dask? (Apparently you have to have write access to transfer issues, and I have "triage" level access instead)

GenevieveBuckley commented 1 year ago

Issue summary

Dask generalized ufuncs do not work correctly when given Dask arrays containing cupy chunks.

Minimal reproducible example

You can run this quick test on Google Colab, if you don't have a GPU on your local machine.

  1. From the Google Colab "Runtime" menu, click "Change Runtime Type" and check the runtime type is set to "GPU".
  2. Install cupy into the notebook environment. Copy-paste !pip install cupy-cuda11x into a notebook cell and execute it.

I used this docstring example to make a minimal, reproducible test.

It works as expected with a numpy backed dask array:

import dask.array as da
import numpy as np

def outer_product(x, y):
    return np.einsum("i,j->ij", x, y)

a = da.random.normal(size=(20,30), chunks=(10, 30))
b = da.random.normal(size=(10, 1,40), chunks=(5, 1, 40))
c = da.apply_gufunc(outer_product, "(i),(j)->(i,j)", a, b, vectorize=True)
c.compute().shape
# Expected output: (10, 20, 30, 40)
# Works as expected

Fails with a cupy backed dask array:

import dask.array as da
import cupy as cp

def outer_product(x, y):
    return cp.einsum("i,j->ij", x, y)

data_a = cp.random.normal(size=(20,30))
a = da.from_array(data_a, chunks=(10, 30))
data_b = cp.random.normal(size=(10, 1,40))
b = da.from_array(data_b, chunks=(5, 1, 40))
c = da.apply_gufunc(outer_product, "(i),(j)->(i,j)", a, b, vectorize=True)
c.compute().shape
# Expected output: (10, 20, 30, 40)
# TypeError: Implicit conversion to a NumPy array is not allowed. Please use `.get()` to construct a NumPy array explicitly.

Notably, this other docstring example does work with a dask array containing cupy chunks. This example does not use the vectorize keyword argument, which is probably why we don't see the problem here.

Hypothesis

There are several lines in dask/array/gufunc.py that have numpy specific functions (np.vectorize and np.newaxis, see here, here, and here). These lines might be causing this problem for dask arrays containing cupy chunks.

What we've tried so far

To test this idea, I tried switching out the np.vectorize functions for the cupy.vectorize function.

Unfortunately, cupy gives me this error:

NotImplementedError: cupy.vectorize does not support `signature` option currently.

See here in the cupy source code.

What to do next?

We could make a feature request issue in the cupy repository, for supporting the signature keyword argument in cupy.vectorize. I don't know where that request would fit in to their other priorities.

If that was implemented, Dask could then consider replacing the three numpy-specific lines in dask/array/gufunc.py with some sort of dispatching solution, so that np.vectorize is used by numpy backed dask arrays, and cupy.vectorize is used by cupy backed dask arrays.

In the meantime, @lrlunin might investigate if there's a way to avoid using da.as_gufunc and get the same result. I know that's not very a satisfying answer @lrlunin, but this might not be a quick fix overall. We do appreciate you bringing this to our attention, though!

GenevieveBuckley commented 1 year ago

In the meantime, @lrlunin might investigate if there's a way to avoid using da.as_gufunc and get the same result. I know that's not very a satisfying answer @lrlunin, but this might not be a quick fix overall. We do appreciate you bringing this to our attention, though!

If I understand your problem correctly, one possible workaround might be to use da.map_blocks (possibly in combination with the drop_axis or new_axis keyword arguments). It looks like you have one dask chunk per image frame, and want to find the peaks/clusters for each of these separately before combining the results. If you were using gufuncs mostly to manage the expected input/output dimensions, then the drop_axis/new_axis kwargs could help. I wrote a bit about how to handle awkward output shapes in this blogpost, which might also be helpful information.

GenevieveBuckley commented 1 year ago

Created issue at https://github.com/dask/dask/issues/9714

(I know this is only 4 days since my last comment and two of them were the weekend. But I figured I might forget to check back in on this, so it might be best if I did it now while I remember)