Open jrbourbeau opened 1 year ago
How would you like the code to be written/contributed?
I think some combo of groupby/groupby-anomaly (exercise shuffling, reductions); then rolling/rolling anomaly (to exercise map_overlap) is probably what we want.
@aulemahal / @huard do you have a prototype xclim workflow that would be good to test?
How would you like the code to be written/contributed?
Ideally there's some Jupyter notebook somewhere that someone could point us to. Maybe you already have a notebook or two in mind?
Ultimately we want to add the workflow in this repo as a pytest
-style test (here's an example of an existing embarrassingly parallel workflow). These tests are run regularly and used in AB tests as we're developing dask
and distributed
. Eventually these workflows will probably graduate into an example repo with notebooks that people could checkout, play with, and adapt to their own use cases.
Idealized dataset? Presumably some kind of small/big version of the problem, controlled by a small number of parameters?
We're able to use large-scale clusters on the cloud here, so large, realistic datasets are welcome. For example, that embarrassingly parallel workflow processes ~3 TB of data on AWS.
In terms of complexity we'd love something in between 20-200 lines of code. We're trying to strike a balance between these workflows being representative of what large Dask user groups actual do, while also making them approachable (i.e. we don't want people's eyes to glaze over when they see 1000 lines of code).
Can xarray be a dependency?
Yeah, definitely. Workflows using dask
or xarray
(backed by dask
) are totally welcome.
Hi @jrbourbeau - this sounds like a good idea.
We're able to use large-scale clusters on the cloud here, so large, realistic datasets are welcome.
What about requester-pays data?
Can xarray be a dependency?
Yeah, definitely. Workflows using dask or xarray (backed by dask) are totally welcome.
How about xGCM or xhistogram? They both ultimately just wrap xarray / dask, but produce task graphs with structures that conventional xarray usage doesn't. (If not that's okay, it would just make a neat example trickier.)
@jbusecke might also have some suggestions
Hi @jrbourbeau - this sounds like a good idea.
We're able to use large-scale clusters on the cloud here, so large, realistic datasets are welcome.
What about requester-pays data?
Can xarray be a dependency?
Yeah, definitely. Workflows using dask or xarray (backed by dask) are totally welcome.
How about xGCM or xhistogram? They both ultimately just wrap xarray / dask, but produce task graphs with structures that conventional xarray usage doesn't. (If not that's okay, it would just make a neat example trickier.)
@jbusecke might also have some suggestions
What about requester-pays data?
Yeah, requester-pays is fine. We can spin up a cluster and pay for data access anywhere on AWS (and GCP soon). For example, that embarassingly parallel example I referenced earlier is processing ~3TB with requester_pays=True
.
How about xGCM or xhistogram?
Good question. I'm inclined to stick with just dask
and xarray
dependencies at the moment as they're more broadly used
@dcherian I don't think we have a xclim workflow that would be straightforward and simple enough for what I understand is the goal of this issue.
On the other hand, it would be easy to come up with one. Sticking with xarray and dask, I guess one could pull data from the CMIP6 AWS bucket and then reduce it with rolling
and resample
calls. In my experience, these two would be the main climate workflow bottlenecks ? The workflow would be scalable by parametrizing the size of the spatial subset and model selection.
I am imagining something like this https://medium.com/pangeo/easy-ipcc-part-1-multi-model-datatree-469b87cf9114, but stripped down to use only xarray and with a more complex function instead of temperature anomalies, like "yearly number of heat wave days", where "heat wave" is spells of N days above a given threshold. I take this example because it involves both a rolling and a resample.
I support @TomNicholas suggestions from above. I think including some sort of vector calculus operator like div/curl would be incredibly valuable in the long run. This sort of operation usually works fine with toy examples but can turn out to be really tricky when running under real-world conditions due to:
Yeah, requester-pays is fine. We can spin up a cluster and pay for data access anywhere on AWS (and GCP soon). For example, that embarassingly parallel example I referenced earlier is processing ~3TB with requester_pays=True.
Okay that's great.
@jrbourbeau can we specify the resources we want the workload to be tested on? i.e. specify that the total RAM available is larger than a chunk but smaller than the full calculation.
Good question. I'm inclined to stick with just dask and xarray dependencies at the moment as they're more broadly used
I think including some sort of vector calculus operator like div/curl would be incredibly valuable in the long run.
@jbusecke do you think we could make a simplified version of the first step of the hero calculation using just xarray & dask? I'm thinking we use LLC4320, but call xarray.pad
explicitly then broadcast metrics across the larger time-dependent arrays. That should create a task graph that has the same gotchas but without importing xGCM. The histogram workload is maybe less important to capture because that literally just calls dask.blockwise
.
can we specify the resources we want the workload to be tested on?
Yeah, definitely. If you look here at the coiled.Cluster
API docs, we can specify a specific AWS instance type or the amount of memory / CPU we want workers to have and Coiled will pick an appropriate instance type.
these two would be the main climate workflow bottlenecks That should create a task graph that has the same gotchas
Just to clarify, for these workflows we're primarily interested in common operations that Pangeo folks want to do. In particular, I'm hopeful that these are representative in general so:
dask
or distributed
, we'll be able to have a sense for what broad impact it might have on Pangeo folksIf these common geospatial operations also happen to stress Dask and fail in some cases, then that's also really useful information. I just want to make sure that we include operations that are representative of what geospatial user groups often want to perform, not necessarily cases where Dask falls over.
Looking through some of the existing pangeo example, I'm wondering if something like https://gallery.pangeo.io/repos/pangeo-gallery/physical-oceanography/01_sea-surface-height.html or https://gallery.pangeo.io/repos/pangeo-gallery/cmip6/global_mean_surface_temp.html would be considered representative. @dcherian @TomNicholas @jbusecke do those notebooks look like common operations the pangeo community cares about?
Those seem too "easy" at a quick glance.
Here's one https://github.com/pangeo-data/distributed-array-examples/issues/2 . Gabe and I discussed this a bit in https://github.com/pydata/xarray/issues/6709
This point
I think there is a small but important increase in complexity here because we do
ds.u.mean(), ds.v.mean(), (ds.u*ds.v).mean()
all together so each chunk of ds.u and ds.v is used for two different outputs.
is definitely a common "feature" of Pangeo workloads.
EDIT: We could add some pad
and diff
operations to make it look like what Tom's talking about I think
This one was fixed by worker saturation for a single year of data. Not sure how well it does for all 60 years of data.
The example there uses 20 years, it's taking quite a while for the computation to start.
EDIT: This example takes a loooooooong time to show up on the dashboard. There are lots of tasks. But once it starts, it feels snappy, though I do see some white space in the task stream.
Finally a pangeo classic, anomalies with respect to a group mean: https://github.com/pangeo-data/distributed-array-examples/issues/4 .
I don't have time to try it now, but excited to see what you coilies figure out :P
@jbusecke do you think we could make a simplified version of the first step of the hero calculation using just xarray & dask? I'm thinking we use LLC4320, but call xarray.pad explicitly then broadcast metrics across the larger time-dependent arrays. That should create a task graph that has the same gotchas but without importing xGCM. The histogram workload is maybe less important to capture because that literally just calls dask.blockwise.
Yeah I think that should be totally fine. I do not think that at this point there is a big difference in the computation via xgcm (because it all runs though apply_ufunc), you would basically just have to hardcode some of the things that xgcm does for you. So yes I think that is a reasonable way forward. I do unfortunately have limited time to work on this on my end though.
If these common geospatial operations also happen to stress Dask and fail in some cases, then that's also really useful information. I just want to make sure that we include operations that are representative of what geospatial user groups often want to perform, not necessarily cases where Dask falls over.
That makes sense, but I think that many of the 'typical cases' are also the ones were dask tends to fall over. So I think a simplified example as @TomNicholas suggests serves double duty here. I agree with @dcherian that the global means etc are too easy and not really all too 'geo' specific.
Thanks folks. I'm curious, the benchmarks currently run on AWS. Are there AWS variants of the datasets linked to above by any chance? We can also run on GCP, it's just more flags/keywords to set and billing accounts to keep track of.
If I take https://github.com/coiled/benchmarks/issues/770#issuecomment-1506254519 from @dcherian above and swap out 20 years for 1 year it seems not terrible. It's about a 20s startup time (which I suspect may have as much to do with da.random.random
as anything else).
It's pretty much entirely network bound:
import pandas as pd
import xarray as xr
import dask.array
from dask.distributed import wait
import flox.xarray
dims = ("time", "level", "lat", "lon")
# nyears is number of years, adjust to make bigger,
# full dataset is 60-ish years.
nyears = 1
shape = (nyears * 365 * 24, 37, 721, 1440)
chunks = (24, 1, -1, -1)
ds = xr.Dataset(
{
"U": (dims, dask.array.random.random(shape, chunks=chunks)),
"V": (dims, dask.array.random.random(shape, chunks=chunks)),
"W": (dims, dask.array.random.random(shape, chunks=chunks)),
"T": (dims, dask.array.random.random(shape, chunks=chunks)),
},
coords={"time": pd.date_range("2001-01-01", periods=shape[0], freq="H")},
)
zonal_means = ds.mean("lon")
anomaly = ds - zonal_means
anomaly['uv'] = anomaly.U*anomaly.V
anomaly['vt'] = anomaly.V*anomaly.T
anomaly['uw'] = anomaly.U*anomaly.W
temdiags = zonal_means.merge(anomaly[['uv','vt','uw']].mean("lon"))
# note method="blockwise" uses flox
temdiags = temdiags.resample(time="D").mean(method="blockwise")
wait(temdiags.persist())
I'd be curious to learn if this is necessary network traffic (in which case we should improve our handling of network) or if it's unnecessary network traffic (in which case we should improve our task placement).
In either case it seems like a reasonable and easy benchmark to add. With 50 workers it looks like it'll take me 10 minutes or so (although this should probably decrease once we swap in the new hardware defaults). We can tune this down easily if we want to.
If I were to ask for an improvement on this it would be to run on a real dataset rather than something random.
Well, not entirely network bound I guess. It's oddly balanced between network and CPU.
Those seem too "easy" at a quick glance.
There are a couple of different overlapping goals here:
These are different goals, but our hope is that these benchmarks/workflows can serve both. We hope that by making things more demo-able that that'll force us more towards what people actually do, and away from artifical examples that highlight a particular technical challenge (There is probably an analogy here between playing sports and targetting specific muscle groups while working out at a gym. We're looking for sports).
What I like about sea surface altitude is that the results are interpretable and pretty. It also makes us sensitive to issues in Zarr, reading from cloud storage, s3 flakiness, etc..
@dcherian a lot of your recent examples are great because they're easy to run, but they lack some of the breadth of the real-world examples. Thoughts?
I've only looked through the last few comments. I'm going to start working my way up the thread now.
Well, not entirely network bound I guess. It's oddly balanced between network and CPU.
I suspect a fair amount of the ~20% system cpu util is network-related, so most likely network bound and network also explains higher CPU util (but also CPU util is in there range where I don't think CPU is bottleneck)
Also FWIW I probably disagree a little with @jrbourbeau about external dependencies. Mostly we want to make sure that the workload looks familiar to Pangeo users and represents their needs well. If tools like xGCM or xhistogram or flox aren't likely to bias our thinking then I don't really care about whether or not their active. If we find ourselves focusing a lot of engineering time one what ended up being an xGCM or xhistogram issue though then I think that we will have missed.
@ntabris https://cloud.coiled.io/clusters/192751/details?account=dask-engineering if you want to go deeper
(please anyone else let me know if you want access to Coiled to look at metrics. It's easy to add you all to a team where we can look at this stuff together, or where you can run things)
What I like about sea surface altitude is that the results are interpretable and pretty. It also makes us sensitive to issues in Zarr, reading from cloud storage, s3 flakiness, etc..
@dcherian a lot of your recent examples are great because they're easy to run, but they lack some of the breadth of the real-world examples. Thoughts?
These are all based on real-world use cases but with random data swapped in for actual datasets. I can add those pieces back if you want "end-to-end" demos with pretty pictures.
If I take https://github.com/coiled/benchmarks/issues/770#issuecomment-1506254519 from @dcherian above and swap out 20 years for 1 year it seems not terrible.
Also on this one I noted a large increase in head node memory use before it started computing. That killed my cloud session a couple of times before I realized and then sized up.
These are all based on real-world use cases but with random data swapped in for actual datasets. I can add those pieces back if you want "end-to-end" demos with pretty pictures.
Pretty pictures and also we get to see the impact of reading real data, which I suspect will be significant. For example, maybe there's some GIL-holding thing happening in computation which is affecting fsspec's ability to read data. We want to see those kinds of interactions.
There is a large portion of
dask.array
users who use Dask in geospatial / climatology / earth science / etc. use cases. We should make sure these use cases are represented here in our benchmarks.@TomNicholas @dcherian we're trying to establish a set of benchmarks for common workloads faced by Dask users. Ideally these are more realistic and less toy examples. These get used day-to-day by engineers as they run AB tests on various changes that they want to make. Workloads in this suite will naturally be optimized over time. What are 1-3 workflows you'd most like to see run regularly as part of Dask's benchmark suite?
xref https://github.com/coiled/coiled-runtime/issues/725