jbusecke / xMIP

Analysis ready CMIP6 data in python the easy way with pangeo tools.
https://cmip6-preprocessing.readthedocs.io/en/latest/?badge=latest
Apache License 2.0
193 stars 41 forks source link

correct_units fails on CMIP6 historical tos data #322

Closed tessjacobson closed 5 months ago

tessjacobson commented 8 months ago

I'm trying to preprocess SST data in all the historical CMIP6 runs and running into an issue with combined_preprocessing. This happens with any of the models/members but is shown below for a single model/member. It seems to be happening in the correct_units step. Using v0.21 of pint and v0.7.1 of xmip.

from xmip.preprocessing import combined_preprocessing
import intake 
import dask

url = "https://storage.googleapis.com/cmip6/pangeo-cmip6.json"
col = intake.open_esm_datastore(url)

query = dict(
    activity_id="CMIP",
    experiment_id="historical",
    variable_id=["tos"],
    table_id="Omon",
    source_id = ['MPI-ESM-1-2-HAM'],
    member_id = 'r3i1p1f1',
    grid_label = 'gn'
)

cat_mon = col.search(**query)

z_kwargs = {'consolidated': True, 'decode_times':False}

with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    dset_dict = cat_mon.to_dataset_dict(zarr_kwargs=z_kwargs, preprocess=combined_preprocessing)
jbusecke commented 5 months ago

Hi @tessjacobson. Thanks for using xMIP and reporting this issue. Apologies for the long wait on this.

I just ran this on the LEAP-Pangeo hub () and got this:

--------------------------------------------------------------------------- TypeError Traceback (most recent call last) File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake_esm/source.py:244, in ESMDataSource._open_dataset(self) 223 datasets = [ 224 _open_dataset( 225 record[self.path_column_name], (...) 241 for _, record in self.df.iterrows() 242 ] --> 244 datasets = dask.compute(*datasets) 245 if len(datasets) == 1: File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/base.py:666, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs) 664 postcomputes.append(x.__dask_postcompute__()) --> 666 results = schedule(dsk, keys, **kwargs) 667 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs) 87 pool = MultiprocessingPoolExecutor(pool) ---> 89 results = get_async( 90 pool.submit, 91 pool._max_workers, 92 dsk, 93 keys, 94 cache=cache, 95 get_id=_thread_get_id, 96 pack_exception=pack_exception, 97 **kwargs, 98 ) 100 # Cleanup pools associated to dead threads File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs) 510 else: --> 511 raise_exception(exc, tb) 512 res, worker_id = loads(res_info) File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/local.py:319, in reraise(exc, tb) 318 raise exc.with_traceback(tb) --> 319 raise exc File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception) 223 task, data = loads(task_info) --> 224 result = _execute_task(task, data) 225 id = get_id() File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/core.py:121, in _execute_task(arg, cache, dsk) 118 # Note: Don't assign the subtask results to a variable. numpy detects 119 # temporaries by their reference count and can execute certain 120 # operations in-place. --> 121 return func(*(_execute_task(a, cache) for a in args)) 122 elif not ishashable(arg): File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/utils.py:73, in apply(func, args, kwargs) 72 if kwargs: ---> 73 return func(*args, **kwargs) 74 else: File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake_esm/source.py:79, in _open_dataset(urlpath, varname, xarray_open_kwargs, preprocess, requested_variables, additional_attrs, expand_dims, data_format, storage_options) 78 if preprocess is not None: ---> 79 ds = preprocess(ds) 81 if varname and isinstance(varname, str): File /srv/conda/envs/notebook/lib/python3.10/site-packages/xmip/preprocessing.py:458, in combined_preprocessing(ds) 457 # fix the units --> 458 ds = correct_units(ds) 459 # rename the `bounds` according to their style (bound or vertex) File /srv/conda/envs/notebook/lib/python3.10/site-packages/xmip/preprocessing.py:219, in correct_units(ds) 217 try: 218 # exclude salinity from the quantification (see https://github.com/jbusecke/xmip/pull/160#issuecomment-878627027 for details) --> 219 quantified = ds.pint.quantify(_unit_overrides) 220 target_units = { 221 var: target_unit 222 for var, target_unit in _desired_units.items() 223 if var in quantified 224 } File /srv/conda/envs/notebook/lib/python3.10/site-packages/pint_xarray/accessors.py:1085, in PintDatasetAccessor.quantify(self, units, unit_registry, **unit_kwargs) 1084 try: -> 1085 new_units[name] = _decide_units(unit, registry, attr) 1086 except (ValueError, pint.UndefinedUnitError) as e: File /srv/conda/envs/notebook/lib/python3.10/site-packages/pint_xarray/accessors.py:138, in _decide_units(units, registry, unit_attribute) 137 else: --> 138 units = registry.parse_units(unit_attribute) 139 else: File /srv/conda/envs/notebook/lib/python3.10/site-packages/pint/facets/plain/registry.py:1127, in GenericPlainRegistry.parse_units(self, input_string, as_delta, case_sensitive) 1126 input_string = p(input_string) -> 1127 units = self._parse_units(input_string, as_delta, case_sensitive) 1128 return self.Unit(units) File /srv/conda/envs/notebook/lib/python3.10/site-packages/pint/facets/nonmultiplicative/registry.py:70, in GenericNonMultiplicativeRegistry._parse_units(self, input_string, as_delta, case_sensitive) 68 as_delta = self.default_as_delta ---> 70 return super()._parse_units(input_string, as_delta, case_sensitive) File /srv/conda/envs/notebook/lib/python3.10/site-packages/pint/facets/plain/registry.py:1153, in GenericPlainRegistry._parse_units(self, input_string, as_delta, case_sensitive) 1151 input_string = input_string.strip() -> 1153 units = ParserHelper.from_string(input_string, self.non_int_type) 1154 if units.scale != 1: File /srv/conda/envs/notebook/lib/python3.10/site-packages/pint/util.py:764, in ParserHelper.from_string(cls, input_string, non_int_type) 763 gen = tokenizer(input_string) --> 764 ret = build_eval_tree(gen).evaluate( 765 partial(cls.eval_token, non_int_type=non_int_type) 766 ) 768 if isinstance(ret, Number): File /srv/conda/envs/notebook/lib/python3.10/site-packages/pint/pint_eval.py:147, in EvalTreeNode.evaluate(self, define_op, bin_op, un_op) 144 raise DefinitionSyntaxError(f"missing binary operator '{op_text}'") 146 return bin_op[op_text]( --> 147 self.left.evaluate(define_op, bin_op, un_op), 148 self.right.evaluate(define_op, bin_op, un_op), 149 ) 150 elif self.operator: File /srv/conda/envs/notebook/lib/python3.10/site-packages/pint/pint_eval.py:146, in EvalTreeNode.evaluate(self, define_op, bin_op, un_op) 144 raise DefinitionSyntaxError(f"missing binary operator '{op_text}'") --> 146 return bin_op[op_text]( 147 self.left.evaluate(define_op, bin_op, un_op), 148 self.right.evaluate(define_op, bin_op, un_op), 149 ) 150 elif self.operator: TypeError: unsupported operand type(s) for -: 'ParserHelper' and 'int' The above exception was the direct cause of the following exception: ESMDataSourceError Traceback (most recent call last) Cell In[1], line 23 20 z_kwargs = {'consolidated': True, 'decode_times':False} 22 with dask.config.set(**{'array.slicing.split_large_chunks': True}): ---> 23 dset_dict = cat_mon.to_dataset_dict(zarr_kwargs=z_kwargs, preprocess=combined_preprocessing) File /srv/conda/envs/notebook/lib/python3.10/site-packages/pydantic/deprecated/decorator.py:55, in validate_arguments..validate..wrapper_function(*args, **kwargs) 53 @wraps(_func) 54 def wrapper_function(*args: Any, **kwargs: Any) -> Any: ---> 55 return vd.call(*args, **kwargs) File /srv/conda/envs/notebook/lib/python3.10/site-packages/pydantic/deprecated/decorator.py:150, in ValidatedFunction.call(self, *args, **kwargs) 148 def call(self, *args: Any, **kwargs: Any) -> Any: 149 m = self.init_model_instance(*args, **kwargs) --> 150 return self.execute(m) File /srv/conda/envs/notebook/lib/python3.10/site-packages/pydantic/deprecated/decorator.py:222, in ValidatedFunction.execute(self, m) 220 return self.raw_function(*args_, **kwargs, **var_kwargs) 221 else: --> 222 return self.raw_function(**d, **var_kwargs) File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake_esm/core.py:686, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs) 684 except Exception as exc: 685 if not skip_on_error: --> 686 raise exc 687 self.datasets = self._create_derived_variables(datasets, skip_on_error) 688 return self.datasets File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake_esm/core.py:682, in esm_datastore.to_dataset_dict(self, xarray_open_kwargs, xarray_combine_by_coords_kwargs, preprocess, storage_options, progressbar, aggregate, skip_on_error, **kwargs) 680 for task in gen: 681 try: --> 682 key, ds = task.result() 683 datasets[key] = ds 684 except Exception as exc: File /srv/conda/envs/notebook/lib/python3.10/concurrent/futures/_base.py:451, in Future.result(self, timeout) 449 raise CancelledError() 450 elif self._state == FINISHED: --> 451 return self.__get_result() 453 self._condition.wait(timeout) 455 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: File /srv/conda/envs/notebook/lib/python3.10/concurrent/futures/_base.py:403, in Future.__get_result(self) 401 if self._exception: 402 try: --> 403 raise self._exception 404 finally: 405 # Break a reference cycle with the exception in self._exception 406 self = None File /srv/conda/envs/notebook/lib/python3.10/concurrent/futures/thread.py:58, in _WorkItem.run(self) 55 return 57 try: ---> 58 result = self.fn(*self.args, **self.kwargs) 59 except BaseException as exc: 60 self.future.set_exception(exc) File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake_esm/core.py:824, in _load_source(key, source) 823 def _load_source(key, source): --> 824 return key, source.to_dask() File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake_esm/source.py:272, in ESMDataSource.to_dask(self) 270 def to_dask(self): 271 """Return xarray object (which will have chunks)""" --> 272 self._load_metadata() 273 return self._ds File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake/source/base.py:283, in DataSourceBase._load_metadata(self) 281 """load metadata only if needed""" 282 if self._schema is None: --> 283 self._schema = self._get_schema() 284 self.dtype = self._schema.dtype 285 self.shape = self._schema.shape File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake_esm/source.py:208, in ESMDataSource._get_schema(self) 206 def _get_schema(self) -> Schema: 207 if self._ds is None: --> 208 self._open_dataset() 209 metadata = {'dims': {}, 'data_vars': {}, 'coords': ()} 210 self._schema = Schema( 211 datashape=None, 212 dtype=None, (...) 215 extra_metadata=metadata, 216 ) File /srv/conda/envs/notebook/lib/python3.10/site-packages/intake_esm/source.py:264, in ESMDataSource._open_dataset(self) 261 self._ds.attrs[OPTIONS['dataset_key']] = self.key 263 except Exception as exc: --> 264 raise ESMDataSourceError( 265 f"""Failed to load dataset with key='{self.key}' 266 You can use `cat['{self.key}'].df` to inspect the assets/files for this key. 267 """ 268 ) from exc ESMDataSourceError: Failed to load dataset with key='CMIP.HAMMOZ-Consortium.MPI-ESM-1-2-HAM.historical.Omon.gn' You can use `cat['CMIP.HAMMOZ-Consortium.MPI-ESM-1-2-HAM.historical.Omon.gn'].df` to inspect the assets/files for this key.

I confirmed that this is introduced by combined_preprocessing:

ds = xr.open_dataset('gs://cmip6/CMIP6/CMIP/HAMMOZ-Consortium/MPI-ESM-1-2-HAM/historical/r3i1p1f1/Omon/tos/gn/v20191218/', engine='zarr', chunks={}, **z_kwargs)
combined_preprocessing(ds)

The error message is quite hard to read, but I think I have a solution:

ds_fixed = ds.copy()
for var in ds_fixed.variables:
    unit = ds_fixed[var].attrs.get('units', None)
    if isinstance(unit, int):
        del ds_fixed[var].attrs['unit']
    print(f"{var} {unit}")
combined_preprocessing(ds_fixed)

this works! It turns out that the original dataset had a unit of 1, which does not make a lot of sense.

This should be easily fixable. Ill consult with the pint crowd @TomNicholas @keewis what the best way of attack is here? Is this something that I should/could change in the unit registry or do you think it is better to just delete all integer unit attributes like above?

TomNicholas commented 5 months ago

I think dimensionless numbers in pint are just supposed to be represented with a unit of ''. I guess you probably could create some new definition in the registry, but if you're already changing dodgy metadata in the CMIP data, then maybe simplest to just change this dodgy metadata too?

keewis commented 5 months ago

yeah, you can put "" or "dimensionless" there. I believe the string "1" also works (but if you have an integer it will fail).

Note that "1" appears to be explicitly mentioned in the CF conventions as valid. So if you want to fix the units, just run str on it?

jbusecke commented 5 months ago

That seems like a nice alternative to ripping it out! Thanks

jbusecke commented 5 months ago

I just added #331, but that did not fix the issue above. I think I misdiagnosed this. It seems to be the time_bounds dimension that is upsetting pint.

from xmip.preprocessing import combined_preprocessing
import intake 
import dask

url = "https://storage.googleapis.com/cmip6/pangeo-cmip6.json"
col = intake.open_esm_datastore(url)

query = dict(
    activity_id="CMIP",
    experiment_id="historical",
    variable_id=["tos"],
    table_id="Omon",
    source_id = ['MPI-ESM-1-2-HAM'],
    member_id = 'r3i1p1f1',
    grid_label = 'gn'
)

cat_mon = col.search(**query)

z_kwargs = {'consolidated': True, 'decode_times':False}

with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    dset_dict = cat_mon.to_dataset_dict(zarr_kwargs=z_kwargs)

from xmip.preprocessing import correct_units
ds_test = ds.drop(['time', 'time_bnds'])
correct_units(ds_test)
ds_test

works as intended!!! So it is the undecoded time units that cause the failure.

As a quick fix for @tessjacobson: Could you just decode the times? Or was there a specific reason not to do that.

from xmip.preprocessing import combined_preprocessing
import intake 
import dask

url = "https://storage.googleapis.com/cmip6/pangeo-cmip6.json"
col = intake.open_esm_datastore(url)

query = dict(
    activity_id="CMIP",
    experiment_id="historical",
    variable_id=["tos"],
    table_id="Omon",
    source_id = ['MPI-ESM-1-2-HAM'],
    member_id = 'r3i1p1f1',
    grid_label = 'gn'
)

cat_mon = col.search(**query)

z_kwargs = {'consolidated': True, 'decode_times':True}

with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    dset_dict = cat_mon.to_dataset_dict(zarr_kwargs=z_kwargs, preprocess=combined_preprocessing)

works as intended.

A more high level question for @keewis and @TomNicholas :

The units upsetting pint seem to be

time-Units: hours since 1850-01-16 12:00:00.000000(<class 'str'>)
time_bnds-Units: days since 1850-01-01(<class 'str'>)

is there a way pint-xarray could/should detect encoded time dimensions and leave them alone?

Another question for the whole group: Is #331 still generally useful? Or should I close this and wait until we actually find a wrongly typed unit in the wild?

dcherian commented 5 months ago

pint-xarray could/should detect encoded time dimensions and leave them alone?

I don't understand. How is pint-xarray seeing these units if decode_times=True? Xarray should be handling them.

Or should I close this and wait until we actually find a wrongly typed unit in the wild?

I would happily add this to cf_xarray.units (https://github.com/xarray-contrib/cf-xarray/blob/main/cf_xarray/units.py).

jbusecke commented 5 months ago

I don't understand. How is pint-xarray seeing these units if decode_times=True? Xarray should be handling them.

It is not. The issue here is that @tessjacobson explicitly set decode_times=False (which is often useful for debugging etc, and ideally should not break the preprocessing).

But since xarray still 'knows' about the time dimension I feel we should be able to just leave them as is.

dcherian commented 5 months ago

@keewis is there a way to have cf_xarray's unit registry ignore these time units if they exist

keewis commented 5 months ago

I'm not sure. We'd need to be able to tell pint to return None for these, as that will tell pint-xarray not to do anything with that particular variable. However, since time is directly supported in xarray (with a unit string of "{units} since {date}") I think it would be justified to do this directly in pint-xarray (which I would expect to be a lot easier).

See also #279

keewis commented 5 months ago

Is #331 still generally useful? Or should I close this and wait until we actually find a wrongly typed unit in the wild?

Actually, it might be better to move this to cf-xarray's preprocessors? I forgot where we had that discussion, but I do remember hearing about units of 1 before, so this appears to be somewhat common?.

Edit: that was in xarray-contrib/cf-xarray#238

That would cast everything to a str before applying anything else. Unfortunately, pint does the weird thing of inserting the "%"" percent" transformation in first place somewhere in the constructor, so we can't just insert it into the existing list of preprocessors in cf-xarray. However, this works:

import pint

ureg = pint.UnitRegistry(preprocessors=[...])
ureg.preprocessors.insert(0, str)
ureg.parse_units(1)
jbusecke commented 5 months ago

Just to clarify, I think the 1 unit was actually not a problem at all. I think I just misdiagnosed this in the beginning. So this really just is a problem with the encoded time dimensions AFAICT

keewis commented 5 months ago

the datetime unit issue should be fixed by the PR on pint-xarray, and I will make sure to issue a bugfix release very soon (I apparently neglected the project a bit regarding that).

For the 1 unit see the PR I just opened on cf-xarray.

jbusecke commented 5 months ago

Amazing. Thank you so much @keewis.

jbusecke commented 5 months ago

I just tested this on the LEAP-Pangeo hub from the main of cf-xarray and pint-xarray

pip install git+https://github.com/xarray-contrib/cf-xarray.git git+https://github.com/xarray-contrib/pint-xarray.git

and this ran without error:

from xmip.preprocessing import combined_preprocessing
import intake 
import dask

url = "https://storage.googleapis.com/cmip6/pangeo-cmip6.json"
col = intake.open_esm_datastore(url)

query = dict(
    activity_id="CMIP",
    experiment_id="historical",
    variable_id=["tos"],
    table_id="Omon",
    source_id = ['MPI-ESM-1-2-HAM'],
    member_id = 'r3i1p1f1',
    grid_label = 'gn'
)

cat_mon = col.search(**query)

z_kwargs = {'consolidated': True, 'decode_times':False}

with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    dset_dict = cat_mon.to_dataset_dict(zarr_kwargs=z_kwargs, preprocess=combined_preprocessing)

I will close this issue now. Please feel free to open again if this should not work for you @tessjacobson.

keewis commented 5 months ago

you might also want to close #279

keewis commented 3 weeks ago

it took me a while, but the fix in pint-xarray just hit PyPI

jbusecke commented 3 weeks ago

Awesome. Thanks @keewis