xCDAT / xcdat

An extension of xarray for climate data analysis on structured grids.
https://xcdat.readthedocs.io/en/latest/
Apache License 2.0
116 stars 12 forks source link

[Exploration]: How Dask works and how it is utilized in xarray #352

Closed tomvothecoder closed 1 year ago

tomvothecoder commented 2 years ago

Experiment with bounds and edges:

Performance factors:

jasonb5 commented 2 years ago

Here's a small intro from what I can recall when I was working with Xarray and Dask a lot.

Most of my experience with these libraries came from working with ESGF Compute Service. This service would translate WPS requests into Xarray DAG and then execute on a Dask Cluster that was allocated using Dask Gateway. This service also tried to utilize Xarray's ability to read Zarr formatted datasets off of S3 stores to improve throughput for parallelized operations.

Here's a quick intro to Dask. Anything built with a dask array, bag, dataframe, delayed or future is turned into a task graph, the scheduler can optimize the graph and finally assign the tasks to workers. To answer the first question, the communication depends on the scheduler. There's either a single-machine or distributed scheduler. For single-machine you have single thread, multi-threaded or processes. Multi-threaded is pretty straight forward as it can use shared variables in it's thread pool, but processes actually uses cloudpickle to serial/deserialize messages/data passed between processes. The pattern of serialize/deserialize message/data is the same used when using distributed for local/remote clusters.

In my experience chunking is recommended when dealing with out-of-core operations. I remember losing performance with small datasets and chunking with a Local Cluster due to the communication overhead. Chunking works best when you have an independent variable e.g. if you're averaging over time you could chunk by lat, lon, lev or some combination. You can still benefit from chunking even if some of the tasks are not operating on an independent variable e.g. building large task graphs. An issue I ran into when working on the Compute service was using groupby functions which would cause all the data to load prematurely, I think there was a Dask/Xarry issue about this. Another time to utilize chunking is when operating with large task graphs where the same chunk of data has multiple operations performed on it.

jasonb5 commented 2 years ago

Here are some related links.

tomvothecoder commented 1 year ago

An issue I ran into when working on the Compute service was using groupby functions which would cause all the data to load prematurely, I think there was a Dask/Xarry issue about this.

It is good to know that groupby operations are potentially eager rather than lazy, since xcdat's temporal averaging APIs use groupby internally.

I found the related xarray issue: https://github.com/pydata/xarray/issues/2852. Comments from that issue:

It is very hard to make this sort of groupby lazy, because you are grouping over the variable label itself. Groupby uses a split-apply-combine paradigm to transform the data. The apply and combine steps can be lazy. But the split step cannot. Xarray uses the group variable to determine how to index the array, i.e. which items belong in which group. To do this, it needs to read the whole variable into memory. -- https://github.com/pydata/xarray/issues/2852#issuecomment-476678007

The current design of GroupBy.apply() in xarray is entirely ignorant of dask: it simply uses a for loop over the grouped variable to built up a computation with high level array operations.

This makes operations that group over large keys stored in dask inefficient. This could be done efficiently (dask.dataframe does this, and might be worth trying in your case) but it's a more challenging distributed computing problem, and xarray's current data model would not know how large of a dimension to create for the returned ararys (doing this properly would require supporting arrays with unknown dimension sizes). --https://github.com/pydata/xarray/issues/2852#issuecomment-478415169

Workarounds:

I think these comments suggest that xcdat's temporal averaging is either partially lazy or not lazy at all. More investigation needs to be done here to confirm.

Action Items:


In xarray, Dask arrays are not loaded into memory unexpectedly (an exception is raised instead). In xcdat, we load Dask arrays into memory in specific spots.

When you load data as a Dask array in an xarray data structure, almost all xarray operations will keep it as a Dask array; when this is not possible, they will raise an exception rather than unexpectedly loading data into memory. Converting a Dask array into memory generally requires an explicit conversion step. One notable exception is indexing operations: to enable label based indexing, xarray will automatically load coordinate labels into memory. -- https://docs.xarray.dev/en/stable/user-guide/dask.html#using-dask-with-xarray

More investigation items