pangeo-data / distributed-array-examples

12 stars 0 forks source link

Means of quadratic quantities #2

Open dcherian opened 1 year ago

dcherian commented 1 year ago

From https://github.com/pydata/xarray/issues/6709

This example calculates ds.u.mean(), ds.v.mean(), and (ds.u * ds.v).mean() all at the same time


ms = MemorySampler()

ds = xr.Dataset(
    dict(
        anom_u=(["time", "face", "j", "i"], da.random.random((5000, 1, 987, 1920), chunks=(10, 1, -1, -1))),
        anom_v=(["time", "face", "j", "i"], da.random.random((5000, 1, 987, 1920), chunks=(10, 1, -1, -1))),
    )
)

quad = ds**2
quad["uv"] = ds.anom_u * ds.anom_v
mean = quad.mean("time")

with ms.sample():
    mean.compute()

With dask, we get not-so-great memory use. (Colors are for different values of "worker saturation")

image

TomNicholas commented 1 year ago

Cubed completed this workload using only 1.5GB of RAM!

https://gist.github.com/TomNicholas/8366c917349b647d87860a20a257a3fb

TomNicholas commented 1 year ago

I would like to try this problem with cubed using real data instead of random data. @dcherian (/anyone) if you know, can you explain a little more about the context of this issue please? So that I understand if/how I might be able to use some publicly available zarr data to create a representative benchmark case that includes I/O. Something about anomalies of GCM data... :sweat_smile:

dcherian commented 1 year ago

I would like to try this problem with cubed using real data instead of random data.

cc @robin-cls who opened the original xarray issue

fjetter commented 1 year ago

FYI I could track this problem down to the way dask performs the topological sort / prioritization of tasks, see https://github.com/dask/dask/issues/10384

This example should work trivially when either is true:

  1. Only one of the arrays is calculated, e.g. mean['uv'].compute()
  2. The xarray dataset is transformed to a dask.DataFrame using mean.to_dask_dataframe() (The DataFrame graph looks slightly different and is handled well by dask.order)
TomNicholas commented 1 year ago

Only one of the arrays is calculated, e.g. mean['uv'].compute()

Anecdotally I think the performance is much better when you only compute one array, yes.

fjetter commented 11 months ago

Just a heads up. I'm working for a fix for this in dask/dask, see https://github.com/dask/dask/pull/10535

Preliminary results look very promising

image

This graph show the memory usage for a couple of runs with increasing size in the time partition. This increases basically number of tasks but keeps the individual chunks and the algorithm constant.

image This was far away from the spilling threshold (yellow line) so the constant memory was indeed due to better scheduling, not spilling or anything like that.

I'm also looking at other workloads. If you are aware of other stuff that should be constant or near-constant in memory usage but isn't, please let me know!