COSIMA / cosima-recipes

A cookbook of recipes (i.e., examples) for analysing ocean and sea ice model output
https://cosima-recipes.readthedocs.io
Apache License 2.0
46 stars 66 forks source link

Using intake-esm to load an ensemble: real-world problem that might lead to a tutorial/example #444

Open jemmajeffree opened 2 months ago

jemmajeffree commented 2 months ago

What I want to do is this:

def filepath(m,var,freq,time='185001-201412'):
    ''' Intentded use filepath(1,'pr','Amon')'''
    return '/g/data/fs38/publications/CMIP6/CMIP/CSIRO/ACCESS-ESM1-5/historical/r'+m+'i1p1f1/'+freq+'/'+var+'/gn/latest/'+var+'_'+freq+'_ACCESS-ESM1-5_historical_r'+m+'i1p1f1_gn_'+time+'.nc'

pr = xr.open_mfdataset([filepath(str(m),'pr','Amon') for m in range(1,41)],
                  # coords='minimal',
                  # compat='override',
                  combine = 'nested',
                  concat_dim = ('SMILE_M',),
                  preprocess = lambda x: x['pr'],
                  parallel = True,
                 ).assign_coords(SMILE_M = np.array([str(m) for m in range(1,41)]))

but I understand that intake-esm should make these type of things easier and less hard-coded. So I tried to do this:

import intake
catalog = intake.cat.access_nri
catalog.search(model="ACCESS-ESM1-5",variable='pr')

cat = catalog["cmip6_fs38"].search(variable_id='pr',
                                      experiment_id='historical',
                                      frequency='mon',
                                      file_type="l",
                                      institution_id='CSIRO')

cat.esmcat.aggregation_control.groupby_attrs = ['file_type',
 'project_id',
 'institution_id',
 'source_id',
 'experiment_id',
 #'member_id',
 'frequency',
 'realm',
 'table_id',
 'variable_id',
 'grid_label',
#version]

cat.to_dask(xarray_open_kwargs={'combine':'nested','concat_dim':('M',),'parallel':True},)

Which threw me errors, as did any variant on to_dask I tried (mostly about being unsure of how to combine the ensemble members together)

navidcy commented 2 months ago

[tip: when adding code in within triple backticks, if you add ```python at the top instead of a plan ```, then GitHub adds some python-coloring. (works also for ```julia or ```bash, and possibly others!)

I edited the post above to add that ;) ]

Thomas-Moore-Creative commented 2 months ago

Thanks for documenting this use-case @jemmajeffree. Can we specify what the "typical" cluster size / resources should be to constrain this problem? For example "how much compute resource does the typical COSIMA affiliated student or post-doc have"?

jemmajeffree commented 2 months ago

Thanks @navidcy

@Thomas-Moore-Creative, I was running this on a XL ARE job, which is a bit more than I'd usually be using for data crunching but I was feeling impatient. Most of the work I do is piggybacking pre-existing model runs, which is fairly low compute — as a ballpark estimate, I've used 10 kSU for my PhD so far which involved analysing 9 different CMIP runs, but I understand that this is almost nothing for anyone running models

edoddridge commented 2 months ago

@Thomas-Moore-Creative I'm not sure there's an easy answer to that question. I'd say most people within COSIMA have access to pretty generous compute, and because the cost of running the models far exceeds the cost of the analysis, we tend not to spend a lot of time thinking about the compute resources for analysis. A modelling based PhD student would likely use hundreds of kSU over the course of their degree.

As an example, running the 0.25° version of ACCESS-OM2 with BGC consumes 13.1 kSU/year. So a 60 year run is ~0.8 MSU. The 0.1° version is much more expensive.

Whereas a 24 hour session of the XL ARE cluster that @jemmajeffree mentioned is 420 SU. In general, the problem for analysis is human time, not compute time.

anton-seaice commented 2 months ago

Hi Gemma - apologies I thought this was a dask question. You can use 'to_dataset_dict() on your search:

search = cat["cmip6_fs38"].search(
           variable_id='pr',
           experiment_id='historical',
           frequency='mon',
           file_type="l",
           source_id="ACCESS-ESM1-5"
                                 )

ds_dict = search.to_dataset_dict()

ds = xr.concat(
    ds_dict.values(),
    dim=ds_dict.keys()
)

Your resulting dataarray has silly chunks:

Screenshot 2024-09-05 at 8 09 05 AM

These are basically defined by the source files, in this case you can't improve the opening time and the dataset is small, so you could just .load() the whole dataset. Or you can rechunk after opening (using .chunk())

jemmajeffree commented 2 months ago

Thanks Anton

With intake loading data now, I have a couple more comments on efficiency. I can get xr.open_mfdataset to work more than twice as fast as intake. To my understanding, intake wraps around open_dataset not open_mfdataset, so the optimisations for open_mfdataset don't transfer. Is that correct? In which case, the tiny chunks are a problem. I'd rather not pull everything into memory, though I agree that in the small example I've provided here it is a solution. Even doing that, the data loads faster with open_mfdataset and different chunking than intake.

pr_daily = xr.open_mfdataset([[filepath(str(m),'pr','day',time) for m in range(1,41)] for time in ('18500101-18991231','19000101-19491231','19500101-19991231','20000101-20141231')],
                  coords='minimal', #Grab medium speed-up by not checking coordinates align
                  compat='override', #Grab medium speed-up by not checking coordinates align
                  combine = 'nested',
                  concat_dim = ('time','SMILE_M',),
                  preprocess = lambda x: x['pr'], #Often doing something more complicated in here that gets bonus efficiency
                  parallel = True,
                  chunks={'SMILE_M':1,'time':365,'lat':-1,'lon':-1}, #Chunks are big enough that dask scheduling overhead goes away
                 ).assign_coords(SMILE_M = np.array([str(m) for m in range(1,41)]))

Am I missing any intake optimisations to achieve comparable performance?

Direct comparisons of speed for loading and example computations are here: /scratch/nf33/jj8842/intake comparison.ipynb

I can see that I should have put this on hive to begin with, and it's probably more valuable there digging into efficiencies with intake - would you recommend moving it or is that overcomplicating things?

Thomas-Moore-Creative commented 2 months ago

file_type="l"

SIDEWAYS question, @anton-seaice et al.
I feel I've searched NCI documentation at length to understand the file_type="l" vs file_type="f" with no luck at understanding the detail in this choice.

I'm assuming "l" stands for "link" and "f" for file and my anecdotal experience is results can be different and not specifying leads to duplication.

Anyone have more insight and link to authoritative NCI documentation?

jemmajeffree commented 2 months ago

@Thomas-Moore-Creative, empirically, using "l" gets you just the most recent ACCESS-ESM1-5 data files (stored in a version file "latest" symlink not the actual most recent version) rather than other bits and pieces, but I couldn't tell you what it means

Thomas-Moore-Creative commented 2 months ago

@Thomas-Moore-Creative, empirically, using "l" gets you just the most recent ACCESS-ESM1-5 data files (stored in a version file "latest" symlink not the actual most recent version) rather than other bits and pieces, but I couldn't tell you what it means

Thanks for your experience on this. Sounds like you've tested this? Do we know if NCI documents this anywhere?

The very useful ACCESS-NRI intake catalog docs from @dougiesquire use "f" in the example here

cmip6_datastore_filtered = cmip6_datastore.search(
    source_id="ACCESS-ESM1-5", 
    table_id="Omon", 
    variable_id="tos", 
    experiment_id="historical", 
    file_type="f"
)

cmip6_datastore_filtered

but that example might not be practice if you want the latest CMIP file versions?

Also, on the chunking related settings for the intake workflow, further from the ACCESS-NRI documentation "Chunking tutorial" notes:

Intake-ESM uses xarray.open_dataset to open datasets. By default, the argument chunks={} is used,

and that this approach on chunking could be used:

xarray_open_kwargs = {"chunks": {"time": -1, "st_ocean": 7, "xt_ocean": 400, "yt_ocean": 300}}

ds = esm_datastore_filtered.to_dask(xarray_open_kwargs=xarray_open_kwargs)

I use this with ACCESS-ESM1.5 data in ways like this:

xarray_open_kwargs = {"chunks": {"time": 12, "lev": 50, "i" : 360,'j' :300}}
DS_dict = cmip6_fs38_cat_filtered.to_dataset_dict(progressbar=False,xarray_open_kwargs=xarray_open_kwargs)

I will find some time to run your example .ipynb file.

Without looking at it I wonder if the list of file paths you are sending to .open_mfdataset and .open_dataset are the same list of files?

I also note that if you use intake you can simply pump out a list of file paths using an approach like cmip6_fs38_cat_filtered.unique().path.

Apologies if these comments are obvious or unhelpful given the work you've already done.

jemmajeffree commented 2 months ago

Starting from the bottom: While you can pump out a list of paths from intake, you lose the information about how those filepaths fit together (ie, what ensemble member, and it's kinda stupid to manually pull that back out of the filepath again). Anyway, this is probably what I would do in future if I didn't know how the data I needed was stored I suspect they're the same list of files If you're passing chunks to open_dataset, I would expect that it gets sad about (or fails silently on) having chunks bigger than individual files, which is what I want here

Thomas-Moore-Creative commented 2 months ago

While you can pump out a list of paths from intake, you lose the information about how those filepaths fit together (ie, what ensemble member, and it's kinda stupid to manually pull that back out of the filepath again)

Depending on your needs, it might be. I probably do kinda stupid things all the time. :smile:

anton-seaice commented 1 month ago
  • Is there a better way to fix up the ensemble members than manually relabelling them (currently the concat_dim axis is something like l.CMIP.CSIRO.ACCESS-ESM1-5.historical.r6i1p1f1.mon.atmos.Amon.pr.gn.v20200529 when I only want the r6i1p1f1 bit)?

Not that I know of, you could use something like this:

for i in search.keys():
    print(re.findall(r"r[^/.]*", i)[1])

Or write some better regex to get a better match :) (Interestingly set(search.df.member_id) returns the same result !, although I'm not sure I totally trust that to always be in the same order as .to_dataset_dict())

  • Would the ensemble members always be read/combined in the same order? I'd be hesitant to trust this, but I can't see a way of putting them in numerical order except for sorting the entire dataarray by the concat_dim axis? (Do you know if xr.sortby actually does anything in memory or just changes the wrapper?)

It looks like they are read in alphabetical order. Its safe to assume sortby will just change the order of the index, and not move anything else around in memory.

Am I missing any intake optimisations to achieve comparable performance?

Sorry - I missed this the first time. This dataset has more than one timestep per file ! So you can supply a time chunksize which reduces the number of times the file has to be opened / read and improves caching efficiency:

e.g. for the monthly data:

ds_dict = search.to_dataset_dict(xarray_open_kwargs={'chunks':{'lat':-1,'lon':-1, 'time':-1}})

for the daily data, those chunks would be too big, but size of 365 could be good:

ds_dict = search.to_dataset_dict(xarray_open_kwargs={'chunks':{'lat':-1,'lon':-1, 'time':365}})

Direct comparisons of speed for loading and example computations are here: /scratch/nf33/jj8842/intake comparison.ipynb

The training fairies has been cleaning up nf33, so I can't see this! Note that caching makes good time comparisons hard!

I feel I've searched NCI documentation at length to understand the file_type="l" vs file_type="f" with no luck at understanding the detail in this choice.

I'm assuming "l" stands for "link" and "f" for file and my anecdotal experience is results can be different and not specifying leads to duplication.

The only time I looked, the link just point to the file and they were the same. But I didn't look exhaustively at all.

Anyone have more insight and link to authoritative NCI documentation?

My only suggestion would be the NCI helpdesk

jemmajeffree commented 1 month ago

@anton-seaice Can you see x77? g40? v45? I put it in nf33 because it was the only directory I knew you would have access to

anton-seaice commented 1 month ago

v45 is ok but I think you can use /scratch/public

jemmajeffree commented 1 month ago

It's now in /scratch/public/jj8842

anton-seaice commented 1 month ago

It's now in /scratch/public/jj8842

The timing is not really reliable for performance comparisons, its an indication but changed wildly. i.e. I ran this notebook with the cells in the opposite order, and open_mfdataset was slower.

Adding

xarray_open_kwargs={'chunks':{'time':365,'lat':-1,'lon':-1}}

as an argument in to_dask() will help,

you also could supply

xarray_combine_by_kwargs in to_dask()

(e.g.

dict( coords='minimal', compat='override',  combine = 'nested',

)

but i dont think its necessary

jemmajeffree commented 1 month ago

With those additional keywords, to_dataset_dict is working faster than xr.open_mfdataset. The concatenate a dictionary and then reorder dimensions is annoying, but I can work around it.

For any future readers of this thread: the documentation for what arguments to_dataset_dict takes is here https://intake-esm.readthedocs.io/en/stable/reference/api.html (I couldn't find it at the time I was originally trying to read in data, and had no idea xarray_combine_by_kwargs existed)

Thomas-Moore-Creative commented 1 month ago

@anton-seaice - re: using %%time or %%timeit cell magics for evaluating performance in a notebook. My anecdotal experience and poorly tested opinion is that running garbage collection, deleting existing objects, and restarting the kernel between testing (A) vs (B) can make testing and evaluating performance in a notebook more accurate. Do you agree with that in any way?

adele-morrison commented 1 month ago

@Thomas-Moore-Creative I found %%timeit is not useful if you are timing opening data from Intake or the cookbook, because the first time is very slow and repeat tests are very fast, so averaging over multiple tests does not give an accurate time estimate of the first opening.

Thomas-Moore-Creative commented 1 month ago

@jemmajeffree are you happy with where you landed in your "intake comparison" example? I'm going to compare it to my current approach for ACCESS-ESM1.5, see what I can learn from it, and possibly add to it.

Thomas-Moore-Creative commented 1 month ago

@Thomas-Moore-Creative I found %%timeit is not useful if you are timing opening data from Intake or the cookbook, because the first time is very slow and repeat tests are very fast, so averaging over multiple tests does not give an accurate time estimate of the first opening.

😃 You can't restart the kernel in a %%timeit cell but you could include clearing key objects and garbage collection which might address some of the issues - hence looking for @anton-seaice's views above. 🤔

jemmajeffree commented 1 month ago

@jemmajeffree are you happy with where you landed in your "intake comparison" example? I'm going to compare it to my current approach for ACCESS-ESM1.5, see what I can learn from it, and possibly add to it.

I'm still not really happy with the way the ensemble members are combined, but I've ceased messing with it or trying to improve the approach.

anton-seaice commented 1 month ago

@anton-seaice - re: using %%time or %%timeit cell magics for evaluating performance in a notebook. My anecdotal experience and poorly tested opinion is that running garbage collection, deleting existing objects, and restarting the kernel between testing (A) vs (B) can make testing and evaluating performance in a notebook more accurate. Do you agree with that in any way?

I don't really have an informed view ...

I've found it can be totally arbitrary. One day its 20 seconds, the next day its 2 minutes for no apparent reason.

I suspect restarting the kernel is a reasonable approach, I don't know to what extent / where data may be cached at a more fundamental level.

Thomas-Moore-Creative commented 1 month ago

I don't really have an informed view ...

I do think my "view" is more a "feeling" and "opinion" ( feelpinion? ) from trial and error rather than a fundamental understanding of how things like dask and the NCI nodes and network actually work.

access-hive-bot commented 1 month ago

This issue has been mentioned on ACCESS Hive Community Forum. There might be relevant details there:

https://forum.access-hive.org.au/t/join-in-investigating-analysis-ready-data-ard-strategies-to-increase-impact-of-ocean-and-climate-model-archives-at-nci/3723/1

Thomas-Moore-Creative commented 3 weeks ago

@jemmajeffree, wondering if you settled on how to best retain the ensemble member name labels on loading and if you feel comfortable on how robust / reliable it is?

I am trying the below approach ( which is likely very similar to @anton-seaice suggestion above ) based on the dataset_dict.keys. I've only done light testing for robustness. I'm still building basic utilities that better allow me to use the catalog and discover data and haven't even gotten to the "how fast can this be" part yet.

dataset_dict = catalog_search.to_dataset_dict(progressbar=False,xarray_open_kwargs=xarray_open_kwargs)
member_names = [key.split('.')[5] for key in dataset_dict.keys()]
# Concatenate the datasets along the 'member' dimension and retain the member names
ds = xr.concat(
    dataset_dict.values(), 
    dim=xr.DataArray(member_names, dims="member", name="member"))
# Extract the numeric part from each member name and sort based on it
sorted_member_indices = np.argsort([int(member[1:].split('i')[0]) for member in ds['member'].values])
# Reorder the dataset based on the sorted member indices
ds_sorted = ds.isel(member=sorted_member_indices)
jemmajeffree commented 3 weeks ago

@Thomas-Moore-Creative I'm afraid not; I went back to what I was originally doing reading from individual filepaths What you've got looks good, though

Thomas-Moore-Creative commented 1 week ago

@anton-seaice after some head-banging I did reach out to NCI about "f" vs "l". Apparently "f" means file and "l" does mean link but it can't be relied on to deliver only the latest logical links that are provided in the "latest" directories?

@jemmajeffree if you or others are still using and relying on file_type =, I'm chatting about this here - just FYI. https://forum.access-hive.org.au/t/cmip6-data-analysis-at-nci-file-type-issue/3842