dask / dask

Parallel computing with task scheduling
https://dask.org
BSD 3-Clause "New" or "Revised" License
12.55k stars 1.71k forks source link

map_blocks returning pd.DataFrame fails with block_info parameter #11127

Open joshua-gould opened 5 months ago

joshua-gould commented 5 months ago
import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd

np.random.seed(42)
a = da.random.random(size=(100, 100), chunks=(10, 10))

def test1(x):
    return pd.DataFrame({"x": np.arange(4), "y": np.arange(4), "z": ["a", "b", "c", "d"]})

def test2(x, block_info=None):
    return pd.DataFrame({"x": np.arange(4), "y": np.arange(4), "z": ["a", "b", "c", "d"]})

meta = dd.utils.make_meta([("x", np.int64), ("y", np.int64), ("z", object)])
df1 = da.map_blocks(test1, a, drop_axis=1, meta=meta).compute()  # works
df2 = da.map_blocks(test2, a, drop_axis=1, meta=meta).compute()  # fails with AttributeError: 'DataFrame' object has no attribute 'chunks'
quasiben commented 5 months ago

Thanks @joshua-gould . I can easily reproduce with what you have above. It seems that when block_info is provided dask is in code path which assumes an Array collection https://github.com/dask/dask/blob/484fc3f1136827308db133cd256ba74df7a38d8c/dask/array/core.py#L901-L926

This is obviously a problem as meta is coercing the collection into a resulting Dataframe. I don't think we can just swap out partitions for chunks here

Would it be possible to convert the array to a dataframe and call map_partitions instead:

a.to_dask_dataframe().map_partitions(test2)

joshua-gould commented 5 months ago

I'm trying to extract the indices and values of a 2-d array where the value > 0. I have a solution below using dask.delayed:

import dask
import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask.array.core import slices_from_chunks

x = da.random.random((24, 24), chunks=(5, 6))
# numpy solution
_x = x.compute()
_indices = np.where(_x > 0)
df = pd.DataFrame({"value": _x[_indices], "y": _indices[0], "x": _indices[1]})

@dask.delayed
def process_chunk(a, offset):
    indices = np.where(a > 0)
    y = indices[0] + offset[0]
    x = indices[1] + offset[1]
    return pd.DataFrame({"value": a[indices], "y": y, "x": x})

output = []
for s in slices_from_chunks(x.chunks):
    r = process_chunk(x[s], (s[0].start, s[1].start))
    output.append(r)
meta = dd.utils.make_meta([("value", x.dtype), ("y", np.int64), ("x", np.int64)])
ddf = dd.from_delayed(output, meta=meta).compute()

# compare with numpy
df = df.sort_values(["y", "x", "value"]).reset_index(drop=True)
ddf = ddf.sort_values(["y", "x", "value"]).reset_index(drop=True)
pd.testing.assert_frame_equal(df, ddf)
quasiben commented 5 months ago

Hmm, could you instead do this with nonzero and a mask ?

arr = np.array([[-1, 2, 0], [4, -5, 6], [0, 0, 7]])
arr = da.from_array(arr)
indicies = da.nonzero(arr > 0) # or rely on dispatching with np.nonzero
arr[arr >0]
joshua-gould commented 5 months ago

Thanks for your response. I'm not sure how to create a dask dataframe using this approach. I tried:

x = da.random.random((24, 24), chunks=(5, 6))
indices = da.where(x > 0)
vals = x.reshape(-1)[indices[0] * x.shape[1] + indices[1]]

ddf = dd.concat(
    [
        dd.from_array(vals, columns=["value"]),
        dd.from_array(da.stack(indices, axis=1, allow_unknown_chunksizes=True), columns=["y", "x"]),
    ],
    axis=1,
).compute()

But I get the warnings:

dask/array/slicing.py:1089: PerformanceWarning: Increasing number of chunks by factor of 20
  p = blockwise(
dask_expr/_concat.py:146: UserWarning: Concatenating dataframes with unknown divisions.
We're assuming that the indices of each dataframes are 
 aligned. This assumption is not generally safe.