coiled / dask-community

Issue tracker for the Dask community team
MIT License
2 stars 0 forks source link

[Stack Overflow] Core dimension error when running numba ufunc on dask array #721

Open github-actions[bot] opened 2 years ago

github-actions[bot] commented 2 years ago

I'm trying to run custom numba vectorized/ufunc functions in a lazy dask pipeline.

When I run the code below I get a ValueError: Core dimension 'm' consists of multiple chunks. I don't understand why m is considered a core dimension. Any idea how I c…


Would you like to know more?

Read the full article on the following website:

https://stackoverflow.com/questions/71526597/core-dimension-error-when-running-numba-ufunc-on-dask-array

pavithraes commented 2 years ago
import numpy as np
import dask.array as da
# import numba
# from numba import float64

# Define ufunc that directly takes a 3D array and mean reduce along axis 0
# @numba.guvectorize([(float64[:,:,:], float64[:,:])], '(k,m,n)->(m,n)')
@da.as_gufunc(signature="(k,m,n)->(m,n)", output_dtypes=float, vectorize=True, allow_rechunk=True)
def reduce_mean(x):
    """Mean reduce a 3D array along the first dimension (axis 0)"""
    nrows = x.shape[0]
    out = np.empty((x.shape[1], x.shape[2]))
    for idx in range(x.shape[1]):
        for idy in range(x.shape[2]):
            col_sum = np.sum(x[:,idx,idy])
            out[idx,idy] = np.divide(col_sum, nrows)
    return out

# Apply ufunc on dask array
arr = da.random.random((10,200,200), chunks=(10,50,50)).astype(np.float64)
arr_reduced = reduce_mean(arr, )
arr_reduced.compute()

Ref: https://blog.dask.org/2019/04/09/numba-stencil

pavithraes commented 2 years ago

map_blocks:

import numpy as np
import dask.array as da
import numba
from numba import float64

# Define ufunc that directly takes a 3D array and mean reduce along axis 0
@numba.guvectorize([(float64[:,:,:], float64[:,:])], '(k,m,n)->(m,n)')
def reduce_mean(x, out):
    """Mean reduce a 3D array along the first dimension (axis 0)"""
    nrows = x.shape[0]
    for idx in range(x.shape[1]):
        for idy in range(x.shape[2]):
            col_sum = np.sum(x[:,idx,idy])
            out[idx,idy] = np.divide(col_sum, nrows)

# Apply ufunc on dask array
arr = da.random.random((10,200,200), chunks=(10,50,50)).astype(np.float64)
arr_reduced = arr.map_blocks(reduce_mean, chunks=(arr.chunks[1][0], arr.chunks[2][0]), drop_axis=0)
arr_reduced.compute()
scharlottej13 commented 2 years ago

solutions (snippets above):

docs improvements: