roocs / clisops

Climate Simulation Operations
https://clisops.readthedocs.io/en/latest/
Other
21 stars 9 forks source link

How do our operations handle "larger than memory" requests? #27

Closed agstephens closed 4 years ago

agstephens commented 4 years ago

Sooner or later a user makes a "larger than memory" request.

We need to implement an appropriate level of Dask chunking in open dataset operations so that we avoid memory errors.

This needs some thought, but this example may be of use:

import xarray as xr
import os

def _setup_env():
    """
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1
"""
    env = os.environ
    env['OMP_NUM_THREADS'] = '1'
    env['MKL_NUM_THREADS'] = '1'
    env['OPENBLAS_NUM_THREADS'] = '1'

_setup_env()

def main():
    dr = '/badc/cmip6/data/CMIP6/HighResMIP/MOHC/HadGEM3-GC31-HH/control-1950/r1i1p1f1/day/ta/gn/v20180927'
    print(f'[INFO] Working on: {dr}')

    ds = xr.open_mfdataset(f'{dr}/*.nc') # , parallel=False)

    chunk_rule = {'time': 4}
    chunked_ds = ds.chunk(chunk_rule)

    ds['ta'].unify_chunks()

    print(f'[INFO] Chunk rule: {chunk_rule}')

    OUTPUT_DIR = '/gws/nopw/j04/cedaproc/astephen/ag-zarr-test'
    output_path = f'{OUTPUT_DIR}/test.zarr'

    chunked_ds.to_zarr(output_path) # Although we won't use Zarr in clisops! - NC should work fine.
    print(f'[INFO] Wrote: {output_path}')

if __name__ == '__main__':

    main()
agstephens commented 4 years ago

See also: https://github.com/agstephens/object-store-play/blob/master/xarray-zarr/CMIP6-Xarray-Zarr-Play-1.md

cehbrecht commented 4 years ago

Maybe use xarray rechunker: https://rechunker.readthedocs.io/en/latest/

cehbrecht commented 4 years ago

See memory usage example: https://github.com/cehbrecht/jupyterlab-notebooks/blob/master/xarray-demo/memory-usage.ipynb

cehbrecht commented 4 years ago

The chunks are managed by dask. One can use the auto chunk size option for one dimension which will use the configured chunk size:

dask.config.get('array.chunk-size')
128MiB
chunked_ds = ds.chunk({'time': 'auto'})
chunked_ds.ta.unify_chunks()

See: https://docs.dask.org/en/latest/array-chunks.html

The default chunk size can be changed with a config file or environment variable:

dask.config.set({'array.chunk-size': '256MiB'})

See: https://docs.dask.org/en/latest/configuration.html

The available memory for a subset (etc) operation could be configured using slurm:

salloc --mem=1024

See: https://slurm.schedmd.com/documentation.html

agstephens commented 4 years ago

@cehbrecht if I understand your findings correctly, the simplest solution is:

  1. Set a memory limit on what we think is sensible (per process running inside our WPS), we can do this with dask.config.set({'array.chunk-size', .....})
  2. Set the same memory limit, or slightly bigger, on the slurm job request that executes the process.
  3. Dask will do the rest. :-)

Sounds great.

agstephens commented 4 years ago

But we do also need rules on:

  1. The size limit for output netCDF files
  2. File-naming for name chunked output files
agstephens commented 4 years ago

Closing - now being implemented in other issues.