xarray-contrib / flox

Fast & furious GroupBy operations for dask.array
https://flox.readthedocs.io
Apache License 2.0
124 stars 17 forks source link

Using Flox with cubed #224

Open TomNicholas opened 1 year ago

TomNicholas commented 1 year ago

What would need to happen to use flox with cubed instead of dask?

(I see this question as part of fully solving https://github.com/pydata/xarray/issues/6807.)

In the code I see blockwise being used, but also dask.array.reductions._tree_reduce and custom HighLevelGraph objects being created. Is there any combination of arguments to flox that only uses blockwise? Could more of flox be made to work via the blockwise abstraction? Should we add ._tree_reduce to our list of useful computation patterns for chunked arrays that includes blockwise, map_blocks, and apply_gufunc?

Sorry if we already discussed this elsewhere.

EDIT: cc @tomwhite

dcherian commented 1 year ago

Should we add ._tree_reduce to our list of useful computation patterns for chunked arrays that includes blockwise, map_blocks, and apply_gufunc

It's implemented in both cubed and dask.array as reduction (https://tom-e-white.com/cubed/operations.html#reduction-and-arg-reduction, https://docs.dask.org/en/stable/generated/dask.array.reduction.html).

We should be able to "just" change _tree_reduce(...) to reduction(chunk=identity, ...) where

def identity(x):
    return x

With this, method="blockwise" and method="map-reduce" will work. cohorts will take some thinking. Right now it's built around a .blocks accessor so perhaps cubed could add that.

TomNicholas commented 1 year ago

So I tried to hack flox into accepting cubed.Array objects in #249, to see what exactly is missing to make it work. I'm running the example in https://github.com/tomwhite/cubed/issues/223. (Also requires https://github.com/pydata/xarray/pull/7941)

For method='cohorts'

I got to the point of cubed needing to implement .blocks as expected.

File ~/Documents/Work/Code/flox/flox/core.py:1454, in dask_groupby_agg(array, by, agg, expected_groups, axis, fill_value, method, reindex, engine, sort, chunks_cohorts)
   1452 for blks, cohort in chunks_cohorts.items():
   1453     index = pd.Index(cohort)
-> 1454     subset = subset_to_blocks(intermediate, blks, array.blocks.shape[-len(axis) :])
   1455     reindexed = dask.array.map_blocks(
   1456         reindex_intermediates, subset, agg=agg, unique_groups=index, meta=subset._meta
   1457     )
   1458     # now that we have reindexed, we can set reindex=True explicitlly

AttributeError: 'Array' object has no attribute 'blocks'

For method='map-reduce'

it seems cubed might need to implement concatenate as an argument to blockwise,as concatenate=False is always passed by flox:

File ~/Documents/Work/Code/flox/flox/core.py:1432, in dask_groupby_agg(array, by, agg, expected_groups, axis, fill_value, method, reindex, engine, sort, chunks_cohorts)
   1426 # Each chunk of `reduced`` is really a dict mapping
   1427 # 1. reduction name to array
   1428 # 2. "groups" to an array of group labels
   1429 # Note: it does not make sense to interpret axis relative to
   1430 # shape of intermediate results after the blockwise call
   1431 if method == "map-reduce":
-> 1432     reduced = tree_reduce(
   1433         intermediate,
   1434         combine=partial(combine, agg=agg),
   1435         aggregate=partial(aggregate, expected_groups=expected_groups, reindex=reindex),
   1436     )
   1437     if is_duck_dask_array(by_input) and expected_groups is None:
   1438         groups = _extract_unknown_groups(reduced, dtype=by.dtype)

TypeError: reduction() got an unexpected keyword argument 'concatenate'

For method='blockwise'

an error from what looks like a potential bug inside cubed.rechunk (from the call to rechunk_for_blockwise inside flox.core.groupby_reduce:

File ~/Documents/Work/Code/cubed/cubed/primitive/rechunk.py:124, in _setup_array_rechunk(source_array, target_chunks, max_mem, target_store_or_group, target_options, temp_store_or_group, temp_options, name)
    121     target_chunks = source_chunks
    123 if isinstance(target_chunks, dict):
--> 124     array_dims = _get_dims_from_zarr_array(source_array)
    125     try:
    126         target_chunks = _shape_dict_to_tuple(array_dims, target_chunks)

File ~/Documents/Work/Code/cubed/cubed/vendor/rechunker/api.py:21, in _get_dims_from_zarr_array(z_array)
     18 def _get_dims_from_zarr_array(z_array):
     19     # use Xarray convention
     20     # http://xarray.pydata.org/en/stable/internals.html#zarr-encoding-specification
---> 21     return z_array.attrs["_ARRAY_DIMENSIONS"]

AttributeError: 'LazyZarrArray' object has no attribute 'attrs
dcherian commented 1 year ago

t seems cubed might need to implement concatenate as an argument to blockwise,as concatenate=False is always passed by flox

Could delete it. The default in dask is None, so that line might not be doing anything.

TomNicholas commented 1 year ago

The default in dask is None

Is it? Looks like it's True to me?

Commenting out the concatenate kwarg in every place it occurs allows me to build a nice-looking graph for groupby using map-reduce:

image

This is for 120 chunks, using

mean = ds.groupby("time.dayofyear").mean(skipna=False, method='map-reduce')  # skipna=False to avoid eager load, see xarray issue #7243

Notice also that because this is the map-reduce strategy the output object has only one chunk, meaning the Spec has to be large.

This doesn't actually execute though (unsuprisingly), .compute fails with

--------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/tenacity/__init__.py:382, in Retrying.__call__(self, fn, *args, **kwargs)
    381 try:
--> 382     result = fn(*args, **kwargs)
    383 except BaseException:  # noqa: B902

File ~/Documents/Work/Code/cubed/cubed/runtime/executors/python.py:10, in exec_stage_func(func, *args, **kwargs)
      8 @retry(stop=stop_after_attempt(3))
      9 def exec_stage_func(func, *args, **kwargs):
---> 10     return func(*args, **kwargs)

File ~/Documents/Work/Code/cubed/cubed/primitive/blockwise.py:67, in apply_blockwise(out_key, config)
     65     args.append(arg)
---> 67 result = config.function(*args)
     68 if isinstance(result, dict):  # structured array with named fields

File ~/Documents/Work/Code/cubed/cubed/primitive/blockwise.py:259, in fuse.<locals>.fused_func(*args)
    258 def fused_func(*args):
--> 259     return pipeline2.config.function(pipeline1.config.function(*args))

TypeError: <lambda>() got an unexpected keyword argument 'axis'

Because of the way cubed retries things the traceback doesn't give me any more useful context than this though. Is there a way to resurface the useful part of the traceback @tomwhite?

tomwhite commented 1 year ago

Because of the way cubed retries things the traceback doesn't give me any more useful context than this though. Is there a way to resurface the useful part of the traceback @tomwhite?

This might help: https://tenacity.readthedocs.io/en/latest/index.html#error-handling. If so we should add reraise=True everywhere in Cubed.

TomNicholas commented 1 year ago

Thanks for the rapid reply!

I tried adding that into cubed's python executor but the traceback still has me none the wiser as to where axis is being passed in erroneously.

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[17], line 1
----> 1 result['asn'].compute()

File ~/Documents/Work/Code/xarray/xarray/core/dataarray.py:1102, in DataArray.compute(self, **kwargs)
   1083 """Manually trigger loading of this array's data from disk or a
   1084 remote source into memory and return a new array. The original is
   1085 left unaltered.
   (...)
   1099 dask.compute
   1100 """
   1101 new = self.copy(deep=False)
-> 1102 return new.load(**kwargs)

File ~/Documents/Work/Code/xarray/xarray/core/dataarray.py:1076, in DataArray.load(self, **kwargs)
   1058 def load(self: T_DataArray, **kwargs) -> T_DataArray:
   1059     """Manually trigger loading of this array's data from disk or a
   1060     remote source into memory and return this array.
   1061 
   (...)
   1074     dask.compute
   1075     """
-> 1076     ds = self._to_temp_dataset().load(**kwargs)
   1077     new = self._from_temp_dataset(ds)
   1078     self._variable = new._variable

File ~/Documents/Work/Code/xarray/xarray/core/dataset.py:792, in Dataset.load(self, **kwargs)
    789 chunkmanager = get_chunked_array_type(*lazy_data.values())
    791 # evaluate all the chunked arrays simultaneously
--> 792 evaluated_data = chunkmanager.compute(*lazy_data.values(), **kwargs)
    794 for k, data in zip(lazy_data, evaluated_data):
    795     self.variables[k].data = data

File ~/Documents/Work/Code/cubed-xarray/cubed_xarray/cubedmanager.py:69, in CubedManager.compute(self, *data, **kwargs)
     66 def compute(self, *data: "CubedArray", **kwargs) -> tuple[np.ndarray, ...]:
     67     from cubed import compute
---> 69     return compute(*data, **kwargs)

File ~/Documents/Work/Code/cubed/cubed/core/array.py:410, in compute(executor, callbacks, optimize_graph, resume, *arrays, **kwargs)
    407         executor = PythonDagExecutor()
    409 _return_in_memory_array = kwargs.pop("_return_in_memory_array", True)
--> 410 plan.execute(
    411     executor=executor,
    412     callbacks=callbacks,
    413     optimize_graph=optimize_graph,
    414     resume=resume,
    415     array_names=[a.name for a in arrays],
    416     **kwargs,
    417 )
    419 if _return_in_memory_array:
    420     return tuple(a._read_stored() for a in arrays)

File ~/Documents/Work/Code/cubed/cubed/core/plan.py:202, in Plan.execute(self, executor, callbacks, optimize_graph, resume, array_names, **kwargs)
    197 if callbacks is not None:
    198     [
    199         callback.on_compute_start(dag, resume=resume)
    200         for callback in callbacks
    201     ]
--> 202 executor.execute_dag(
    203     dag,
    204     callbacks=callbacks,
    205     array_names=array_names,
    206     resume=resume,
    207     **kwargs,
    208 )
    209 if callbacks is not None:
    210     [callback.on_compute_end(dag) for callback in callbacks]

File ~/Documents/Work/Code/cubed/cubed/runtime/executors/python.py:22, in PythonDagExecutor.execute_dag(self, dag, callbacks, array_names, resume, **kwargs)
     20 if stage.mappable is not None:
     21     for m in stage.mappable:
---> 22         exec_stage_func(stage.function, m, config=pipeline.config)
     23         if callbacks is not None:
     24             event = TaskEndEvent(array_name=name)

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/tenacity/__init__.py:289, in BaseRetrying.wraps.<locals>.wrapped_f(*args, **kw)
    287 @functools.wraps(f)
    288 def wrapped_f(*args: t.Any, **kw: t.Any) -> t.Any:
--> 289     return self(f, *args, **kw)

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/tenacity/__init__.py:379, in Retrying.__call__(self, fn, *args, **kwargs)
    377 retry_state = RetryCallState(retry_object=self, fn=fn, args=args, kwargs=kwargs)
    378 while True:
--> 379     do = self.iter(retry_state=retry_state)
    380     if isinstance(do, DoAttempt):
    381         try:

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/tenacity/__init__.py:325, in BaseRetrying.iter(self, retry_state)
    323     retry_exc = self.retry_error_cls(fut)
    324     if self.reraise:
--> 325         raise retry_exc.reraise()
    326     raise retry_exc from fut.exception()
    328 if self.wait:

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/tenacity/__init__.py:158, in RetryError.reraise(self)
    156 def reraise(self) -> t.NoReturn:
    157     if self.last_attempt.failed:
--> 158         raise self.last_attempt.result()
    159     raise self

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/concurrent/futures/_base.py:439, in Future.result(self, timeout)
    437     raise CancelledError()
    438 elif self._state == FINISHED:
--> 439     return self.__get_result()
    441 self._condition.wait(timeout)
    443 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/concurrent/futures/_base.py:391, in Future.__get_result(self)
    389 if self._exception:
    390     try:
--> 391         raise self._exception
    392     finally:
    393         # Break a reference cycle with the exception in self._exception
    394         self = None

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/tenacity/__init__.py:382, in Retrying.__call__(self, fn, *args, **kwargs)
    380 if isinstance(do, DoAttempt):
    381     try:
--> 382         result = fn(*args, **kwargs)
    383     except BaseException:  # noqa: B902
    384         retry_state.set_exception(sys.exc_info())  # type: ignore[arg-type]

File ~/Documents/Work/Code/cubed/cubed/runtime/executors/python.py:10, in exec_stage_func(func, *args, **kwargs)
      8 @retry(reraise=True, stop=stop_after_attempt(3))
      9 def exec_stage_func(func, *args, **kwargs):
---> 10     return func(*args, **kwargs)

File ~/Documents/Work/Code/cubed/cubed/primitive/blockwise.py:67, in apply_blockwise(out_key, config)
     64     arg = arr[chunk_key]
     65     args.append(arg)
---> 67 result = config.function(*args)
     68 if isinstance(result, dict):  # structured array with named fields
     69     for k, v in result.items():

File ~/Documents/Work/Code/cubed/cubed/primitive/blockwise.py:259, in fuse.<locals>.fused_func(*args)
    258 def fused_func(*args):
--> 259     return pipeline2.config.function(pipeline1.config.function(*args))

TypeError: <lambda>() got an unexpected keyword argument 'axis'
dcherian commented 1 year ago

Your chunk lambda identity function probably needs to accept axis and keepdims

tomwhite commented 1 year ago

The default in dask is None

Is it? Looks like it's True to me?

It's confusing because the default for concatenate in blockwise is None, but in reduction it's True.

Perhaps we could add a concatenate parameter to Cubed's reduction that only accepts None or False, and raises an exception for True?

tomwhite commented 1 year ago

For method='blockwise'

an error from what looks like a potential bug inside cubed.rechunk (from the call to rechunk_for_blockwise inside flox.core.groupby_reduce:

File ~/Documents/Work/Code/cubed/cubed/primitive/rechunk.py:124, in _setup_array_rechunk(source_array, target_chunks, max_mem, target_store_or_group, target_options, temp_store_or_group, temp_options, name)
    121     target_chunks = source_chunks
    123 if isinstance(target_chunks, dict):
--> 124     array_dims = _get_dims_from_zarr_array(source_array)
    125     try:
    126         target_chunks = _shape_dict_to_tuple(array_dims, target_chunks)

File ~/Documents/Work/Code/cubed/cubed/vendor/rechunker/api.py:21, in _get_dims_from_zarr_array(z_array)
     18 def _get_dims_from_zarr_array(z_array):
     19     # use Xarray convention
     20     # http://xarray.pydata.org/en/stable/internals.html#zarr-encoding-specification
---> 21     return z_array.attrs["_ARRAY_DIMENSIONS"]

AttributeError: 'LazyZarrArray' object has no attribute 'attrs

I've opened https://github.com/tomwhite/cubed/pull/226 to fix this.

tomwhite commented 1 year ago

For method='cohorts'

I got to the point of cubed needing to implement .blocks as expected.

From what I can tell, Flox only uses the shape of blocks, not the values themselves. So would it be possible to implement this using array.chunks?

dcherian commented 1 year ago

Flox only uses the shape of blocks, not the values themselves. So would it be possible to implement this using array.chunks

Maybe but I don't know that its worth it. cohorts currently uses a lot of internal dask implementation details and doesn't really work that well despite all the effort.

Perhaps with cubed we should just always have method="map-reduce" since memory use is less of a concern.

TomNicholas commented 1 year ago

Perhaps with cubed we should just always have method="map-reduce" since memory use is less of a concern.

Right now it would just be nice to make a groupby work on at least one case to test potential performance.

Your chunk lambda identity function probably needs to accept axis and keepdims

^ Thanks - changing this leads to some kind of cubed error:

File ~/Documents/Work/Code/cubed/cubed/primitive/blockwise.py:70, in apply_blockwise(out_key, config)
     69     for k, v in result.items():
---> 70         config.write.open().set_basic_selection(out_chunk_key, v, fields=k)
     71 else:

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/zarr/core.py:1486, in Array.set_basic_selection(self, selection, value, fields)
   1485 else:
-> 1486     return self._set_basic_selection_nd(selection, value, fields=fields)

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/zarr/core.py:1790, in Array._set_basic_selection_nd(self, selection, value, fields)
   1788 indexer = BasicIndexer(selection, self)
-> 1790 self._set_selection(indexer, value, fields=fields)

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/zarr/core.py:1802, in Array._set_selection(self, indexer, value, fields)
   1792 def _set_selection(self, indexer, value, fields=None):
   1793 
   1794     # We iterate over all chunks which overlap the selection and thus contain data
   (...)
   1800 
   1801     # check fields are sensible
-> 1802     check_fields(fields, self._dtype)
   1803     fields = check_no_multi_fields(fields)

File ~/miniconda3/envs/cubed_xarray/lib/python3.9/site-packages/zarr/indexing.py:854, in check_fields(fields, dtype)
    853 if dtype.names is None:
--> 854     raise IndexError("invalid 'fields' argument, array does not have any fields")
    855 try:

IndexError: invalid 'fields' argument, array does not have any fields
tomwhite commented 1 year ago

changing this leads to some kind of cubed error

It's quite hard to debug without the code, but it's worth trying to turn off graph optimization (fuse) by passing optimize_graph=False to compute() to check that the problem doesn't lie there.

I can also take a closer look if there's some code you are able to share.

TomNicholas commented 1 year ago

I can also take a closer look if there's some code you are able to share.

https://gist.github.com/TomNicholas/c50b89eeb3ec9e8e49368f811689fd65

tomwhite commented 1 year ago

Thanks for the notebook @TomNicholas. I tried running it, and managed to reproduce the problem.

I think what's happening is that Dask doesn't care about the dtype or shape of intermediate values in the reduction (see e.g. this comment), whereas Cubed does, since it is materializing the intermediates as Zarr arrays, so the dtypes and shapes must be correct.

Another problem is that Dask's _tree_reduce and Cubed's reduction have slightly different semantics, which may also be causing problems.

I'm going to have a look at seeing what it would take to get a Flox unit test running with Cubed.

dcherian commented 1 year ago

think what's happening is that Dask doesn't care about the dtype or shape of intermediate values in the reduction (see e.g. this comment), whereas Cubed does, since it is materializing the intermediates as Zarr arrays, so the dtypes and shapes must be correct.

We would need to migrate to using structured arrays as the intermediates (which dask now supports IIRC). Or totally simplify by making the intermediates simple arrays. These would be very major changes, I would just move on with the blogpost.

tomwhite commented 1 year ago

I just reached the same conclusion after looking at it for a while!

dcherian commented 1 year ago

Sorry! I did have the same realization many months ago but didn't write it down and totally forgot about it !

tomwhite commented 1 year ago

No problem! It was quite instructive looking through the code.

A couple of things I noticed:

dcherian commented 10 months ago

I've been thinking about this some. I think blockwise and map-reduce can be made to work out of the box, if we adjust the intermediates to be structured arrays.

For "cohorts", I think we'll want to think about what makes sense for cubed. The current implementation really adapts to dask's preferences at the moment by (practically speaking) choosing to shuffle chunks, instead of subsets of a chunk. Perhaps for cubed a more explicit shuffle makes more sense: calculate the blockwise intermediates, but write them to multiple intermediate zarrs (one per cohort), and then tree-reduce those independently.

In any case, the fundamental idea is that some group labels tend to occur together (particularly for quasi-periodic time groupings groupby("time.hour"), groupby("time.dayofyear")), and we should be able to use that to reduce the number of reduction steps and communication overhead.

PS: find_group_cohorts is now substantially improved, and does not use the .blocks property any more.

tomwhite commented 10 months ago

I think blockwise and map-reduce can be made to work out of the box, if we adjust the intermediates to be structured arrays.

Great! Does Flox need to use the Array API for this to work, or is that not needed?

Perhaps for cubed a more explicit shuffle makes more sense: calculate the blockwise intermediates, but write them to multiple intermediate zarrs (one per cohort), and then tree-reduce those independently.

BTW I'm planning on adding tree_reduce to Cubed as a part of the optimization work (https://github.com/tomwhite/cubed/issues/339), which hopefully will make this easier.

dcherian commented 10 months ago

Does Flox need to use the Array API for this to work, or is that not needed?

Not needed AFAICT. I'm also OK to if ... else ... here.

adding tree_reduce to Cubed

Aside from algorithmic changes, isn't this just reduction with chunk=lambda x: x?

PS: It'd be nice if both dask and cubed allowed chunk=None to skip the blockwise step, since that could be arbitrarily complicated

tomwhite commented 10 months ago

Aside from algorithmic changes, isn't this just reduction with chunk=lambda x: x?

Yes

dcherian commented 6 months ago

FYI my original proposal for what is now "cohorts" was quite different. The idea came to me as "shuffling chunks" so that members of a group would be in nearby chunks, so that map-reduce would be very effective at reducing data early on.

https://gist.github.com/dcherian/6ccd76d2a6eaadb7844d61d197a8b3db

If cubed can do the 'block shuffling' without actually rewriting data, it might be a good idea still. The downside is that you may end up with massive chunks at the end of the reduction. For e.g. consider doing ds.groupby("time.dayofyear") when ds has chunksize=1 along time. You'll end up with output chunksizes being 365x bigger :D , this is why the current idea is more of "index out blocks with the members from the same groups", reduce, then concatenate.

Just though I'd bring it up since you are reimplementing these ideas.

EDIT: flox today can quickly detect when the chunking is "perfect" for this kind of shuffling. https://github.com/xarray-contrib/flox/blob/efd88e1b117db5f18ee1354d2edadafc9005b1b3/flox/core.py#L377