pydata / xarray

N-D labeled arrays and datasets in Python
https://xarray.dev
Apache License 2.0
3.58k stars 1.07k forks source link

Implement dask.sizeof for xarray.core.indexing.ImplicitToExplicitIndexingAdapter #5426

Open mrocklin opened 3 years ago

mrocklin commented 3 years ago

I'm looking at a pangeo gallery workflow that suffers from poor load balancing because objects of type xarray.core.indexing.ImplicitToExplicitIndexingAdapter are being interpretted as 48B when in fact, I suspect, they are significantly larger to move around.

I'm seeing number of processing tasks charts that look like the following, which is a common sign of the load balancer not making good decisions, which is most commonly caused by poor data size measurements:

Screen Shot 2021-06-01 at 8 54 29 PM

mrocklin commented 3 years ago

Thinking about this some more, it might be some other object, like a Zarr store, that is on only a couple of these machines. I recall that recently we switched Zarr from being in every task to being in only a few tasks. The problem here might be reversed, that we actually want to view Zarr stores in this case as quite cheap.

cc @TomAugspurger who I think was actively making decisions around that time.

TomAugspurger commented 3 years ago

https://github.com/dask/dask/pull/6203 and https://github.com/dask/dask/pull/6773/ are the maybe relevant issues. I actually don't know if that could have an effect here. I don't know (and a brief search couldn't confirm) whether or not xarray uses dask.array.from_zarr.

mrocklin commented 3 years ago

Do you run into poor load balancing as well when using Zarr with Xarray? My guess here is that there are a few tasks in the graph that report multi-TB sizes and so are highly resistant to being moved around. I haven't verified that though

shoyer commented 3 years ago

What is sizeof supposed to estimate? The size of the computed array or the size of the pickled lazy object?

Typically this object would end up in Dask graphs when something is read from an xarray storage backend, e.g., netCDF or Zarr. If the underlying files are accessible everyone (e.g., as is the case for Zarr backed by a cloud object store), then a small size for the serialized object would be appropriate.

TomAugspurger commented 3 years ago

Do you run into poor load balancing as well when using Zarr with Xarray?

The only thing that comes to mind is everything being assigned to one worker when the entire task graph has a single node at the base of the task graph. But then work stealing kicks in and things level out (that was a while ago though).

I haven't noticed any kind of systemic load balancing problem, but I can take a look at that notebook later.

shoyer commented 3 years ago

dask/dask#6203 and dask/dask#6773 are the maybe relevant issues. I actually don't know if that could have an effect here. I don't know (and a brief search couldn't confirm) whether or not xarray uses dask.array.from_zarr.

Xarray uses dask.array.from_array but not from_zarr: https://github.com/pydata/xarray/blob/83eda1a8542a9dbd81bf0e08c8564c044df64c0a/xarray/core/variable.py#L1046-L1068

shoyer commented 3 years ago

The only thing that comes to mind is everything being assigned to one worker when the entire task graph has a single node at the base of the task graph. But then work stealing kicks in and things level out (that was a while ago though).

Right, so it might help to pipe an option for inline=True into Variable.chunk() (which is indirectly called via open_zarr when chunks are provided).

mrocklin commented 3 years ago

It may also be that we don't want to inline zarr objects (The graph is likely to be cheaper to move if we don't inline them). However we may want Zarr objects to report themselves as easy to move by defining their approximate size with sizeof. The ideal behavior here is that Dask treats zarr stores (or whatever is at the bottom of this graph) as separate tasks, but also as movable tasks.

mrocklin commented 3 years ago

Ideally Dask would be able to be robust to this kind of mis-assignment of object size, but it's particularly hard in this situation. We can't try to serialize these things because if we're wrong and the size actually is massive then we blow out the worker.

mrocklin commented 3 years ago

Screen Shot 2021-06-01 at 9 50 12 PM

This is what it looks like in practice for me FWIW

mrocklin commented 3 years ago

Hrm, the root dependency does appear to be of type

xarray.core.indexing.ImplicitToExplicitIndexingAdapter with size 48 B

I'm not sure what's going on with it

shoyer commented 3 years ago

Hrm, the root dependency does appear to be of type

xarray.core.indexing.ImplicitToExplicitIndexingAdapter with size 48 B

I'm not sure what's going on with it

Well, sys.getsizeof() is certainly an under-estimate here, but I suspect the true size (e.g., if you pickle it) is measured in a handful of KB. I would be surprised if Dask is reluctant to serialize such objects.

mrocklin commented 3 years ago

Yeah, that size being very small shouldn't be a problem

shoyer commented 3 years ago

When I pickle the adapter object from this example with cloudpickle, it looks like it's 6536 bytes.

mrocklin commented 3 years ago

I think that the next thing to do here is to try to replicate this locally and watch the stealing logic to figure out why these tasks aren't moving. At this point we're just guessing. @jrbourbeau can I ask you to add this to the stack of issues to have folks look into?

mrocklin commented 3 years ago

Also cc'ing @gjoseph92

martindurant commented 2 years ago

The conversation here seems to have stalled, but I feel like it was useful. Did we gather any useful actions?