ESMValGroup / ESMValCore

ESMValCore: A community tool for pre-processing data from Earth system models in CMIP and running analysis scripts.
https://www.esmvaltool.org
Apache License 2.0
42 stars 38 forks source link

Processing ERA5 (project native6) leads to huge dask graphs #2375

Closed schlunma closed 5 months ago

schlunma commented 5 months ago

The following very simple recipe leads to lots of dask tasks, which slows down the calculations a lot. This becomes especially bad if additional custom preprocessor steps are used.

documentation:
  description: Test
  authors:
    - schlund_manuel
  title: Test.

datasets:
  - {dataset: ERA5, project: native6, type: reanaly, version: v1, tier: 3}

diagnostics:
  test:
    scripts:
      null
    variables:
      ta:
        mip: Amon

These are the dask graph sizes after each preprocessing step:

load
after: 2107

fix_metadata
before: 2107
after: 10363

concatenate
before: 10363
after: 17587

cmor_check_metadata
before: 17587
after: 17587

fix_data
before: 17587
after: 17587

cmor_check_data
before: 17587
after: 17587

add_supplementary_variables
before: 17587
after: 17587

save
before: 17587

As you can see, fix_metadata and concatenate add ~7500 elements to the graph each. @ESMValGroup/technical-lead-development-team

valeriupredoi commented 5 months ago

has the data changed? ie could it be that there are now a lot more storage (HDF5) chunks that Dask is struggling to distribute?

valeriupredoi commented 5 months ago

or, conversely, iris IO has changed

schlunma commented 5 months ago

No, I think nothing in particular has changed. I am pretty sure the problem exists since a long time, I only found some time now to investigate this further.

valeriupredoi commented 5 months ago

aha - interesting, then maybe worth investigation ERA5's chunking, I have a hunch it's made up of really tiny HDF5 chunks :bread:

schlunma commented 5 months ago

What's the straightforward way to check that? 😁

valeriupredoi commented 5 months ago

if you install kerchunk from conda-forge, then you can use this utility to convert the ERA5 netCDF4 to a Zarr reference file, and you can then count the chunks - chunks are stored with indices like (x, y, z) depending on the number of dims:

import os
import ujson
import fsspec
from kerchunk.hdf import SingleHdf5ToZarr

def _correct_compressor_and_filename(content, varname, bryan_bucket=False):
    """
    Correct the compressor type as it comes out of Kerchunk (>=0.2.4; pinned).
    Also correct file name as Kerchnk now prefixes it with "s3://"
    and for special buckets like Bryan's bnl the correct file is bnl/file.nc
    not s3://bnl/file.nc
    """
    new_content = content.copy()

    # prelimniary assembly
    try:
        new_zarray =  ujson.loads(new_content['refs'][f"{varname}/.zarray"])
        group = False
    except KeyError:
        new_zarray =  ujson.loads(new_content['refs'][f"{varname} /{varname}/.zarray"])
        group = True

    # re-add the correct compressor if it's in the "filters" list
    if new_zarray["compressor"] is None and new_zarray["filters"]:
        for zfilter in new_zarray["filters"]:
            if zfilter["id"] == "zlib":
                new_zarray["compressor"] = zfilter
                new_zarray["filters"].remove(zfilter)

        if not group:
            new_content['refs'][f"{varname}/.zarray"] = ujson.dumps(new_zarray)
        else:
            new_content['refs'][f"{varname} /{varname}/.zarray"] = ujson.dumps(new_zarray)

    return new_content

def create_kerchunks(file_url, varname, outf):
    """Translate a netCDF4/HDF5 to a Zarr file object."""
    fs = fsspec.filesystem('')
    with fs.open(file_url, 'rb') as local_file:
        try:
            h5chunks = SingleHdf5ToZarr(local_file, file_url,
                                        inline_threshold=0)
        except OSError as exc:
            raiser_1 = f"Unable to open file {file_url}. "
            raiser_2 = "Check if file is netCDF3 or netCDF-classic"
            print(raiser_1 + raiser_2)
            raise exc

        with fs.open(outf, 'wb') as f:
            content = h5chunks.translate()
            content = _correct_compressor_and_filename(content,
                                                       varname)
            f.write(ujson.dumps(content).encode())

    zarray =  ujson.loads(content['refs'][f"{varname}/.zarray"])
    zattrs =  ujson.loads(content['refs'][f"{varname}/.zattrs"])

    return outf, zarray, zattrs

file_url is just the full path to the file (provided you are on a local POSIX FS), varname is the name of the variable that needs looking at (short name, like tas) and outf is a JSON file you are writing to - anything, as long as you are extending it with a .json extensuion; or, you can point me to an ERA5 file and I can run PyActive on it, and investigate the chunks :beer:

valeriupredoi commented 5 months ago

you don't even need to correct the compressor - that's just so the metadatda is nice when you write to the json file :+1:

schlunma commented 5 months ago

An example would here (on Levante): /work/bd0854/DATA/ESMValTool2/RAWOBS/Tier3/ERA5/v1/mon/ta/era5_temperature_1979_monthly.nc. Cheers V :beers:

valeriupredoi commented 5 months ago

I'll run PyActive on that file tomorrow, bud - am very curious to see it's B-tree and chunks etc, am quite confident it's the chunking that's affecting Dask performance

schlunma commented 5 months ago

Something that might be related to that: I'd tried to use iris' new CHUNK_CONTROL feature on that data and run into an error, see https://github.com/SciTools/iris/issues/5864.

bouweandela commented 5 months ago

17 000 shouldn't be too dramatic, at 1ms per item that would be 17s, but I can imagine that if you have a considerable further increase in the number of chunks coming from temporal statistics that are implemented using iris.cube.Cube.aggregated_by https://github.com/SciTools/iris/issues/5455 you run into trouble.

I looked into this and found the following:

valeriupredoi commented 5 months ago

OK gents, so here's some low level info about this nasty file:

so the file is not storage (HDF5)-chunked, so it comes a big beautiful (12, 37, 721, 1440) shape contiguous dataset, uncompressed by the looks of it too - so no wonder Dask is having a hard time dealing with it - probably, as @bouweandela mentions, it tries to rechunk it (chunk it, in actuality)

Looking at its time of creation

// global attributes:
        :Conventions = "CF-1.6" ;
        :history = "2020-04-23 08:58:03 GMT by grib_to_netcdf-2.16.0: /opt/ecmwf/eccodes/bin/grib_to_netcdf -S param -o /cache/data9/adaptor.mars.internal-1587632239.0829659-3867-37-6f7c8649-12e9-4051-9c55-d5fe2fdaba2f.nc /cache/tmp/6f7c8649-12e9-4051-9c55-d5fe2fdaba2f-adaptor.mars.internal-1587632239.083458-3867-14-tmp.grib"

this tells me this was nc-ed from a GRIB file, why they used NETCDF3 standards is beyond me!

malininae commented 5 months ago

@schlunma by any chance, can you also look at hourly? @Karen-A-Garcia used the cmorizer on the hourly ERA data and it was flying, but we've never looked at the graphs. As a heavy user of hourly data, that would be interesting to see.

schlunma commented 5 months ago

@malininae did you use 3D data or 2D data? For me, reading and processing 2D data was not an issue, only the much bigger 3D variables were a problem.

malininae commented 5 months ago

@malininae did you use 3D data or 2D data? For me, reading and processing 2D data was not an issue, only the much bigger 3D variables were a problem.

Well, on a year or two year scale it is fine, but I'm using like 84 years of hourly ERA5 data, which gets trickier :cold_sweat:

schlunma commented 5 months ago

But you were saying that you didn't have any problems with hourly data? Also for the 84 years?

Also, Bouwe's comment suggests that these graph sizes are to be expected, so I think we can close this issue.

bouweandela commented 5 months ago

In my (limited) experience, you can expect an overloaded scheduler (I think this is called 'scheduler contention' in the dask dashboard) when the task graph has a size of 1 million or more because at 1 ms per task, that would be 1000 seconds just to keep track of all the tasks and then the optimization of the graph also takes time of course.

schlunma commented 5 months ago

Closing this, feel free to reopen if necessary.

valeriupredoi commented 5 months ago

at least this helped identify the issue in that iris issue eh - and now they have a fix :grin: