UBC-MOAD / Reshapr

Command-line tool based on Xarray and Dask for extraction of model variable time series from model products like SalishSeaCast and HRDPS
https://reshapr.readthedocs.io/en/latest/
Apache License 2.0
0 stars 0 forks source link

KeyError in reshapr.core.extract.write_netcdf() #121

Open douglatornell opened 8 months ago

douglatornell commented 8 months ago

Intermittently, the SalishSeaNowcast make_averaged_dataset worker fails with a KeyError for one of the model variables in the dataset it is writing. Example traceback below.

I suspect that this might have to do with an issue I read a while ago where it is suspected that opened dataset files get closed before dask is finished with them (perhaps due to calling xarray.open_mfdataset() in a context manager?)

If this can't be resolved in Reshapr, it should at least be handled as a critical error in the make_averaged_dataset worker so that the worker logs a critical message instead of the worker failing.

Example traceback:

Traceback (most recent call last):
  File "/SalishSeaCast/NEMO_Nowcast/nemo_nowcast/worker.py", line 391, in _do_work
    checklist = self.worker_func(
                ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/SalishSeaNowcast/nowcast/workers/make_averaged_dataset.py", line 189, in make_averaged_dataset
    nc_path = _extract_netcdf(reshapr_config, reshapr_config_yaml)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/tenacity/__init__.py", line 289, in wrapped_f
    return self(f, *args, **kw)
           ^^^^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/tenacity/__init__.py", line 379, in __call__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/tenacity/__init__.py", line 314, in iter
    return fut.result()
           ^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/tenacity/__init__.py", line 382, in __call__
    result = fn(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/SalishSeaNowcast/nowcast/workers/make_averaged_dataset.py", line 211, in _extract_netcdf
    nc_path = reshapr.api.v1.extract.extract_netcdf(reshapr_config, reshapr_config_yaml)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/Reshapr/reshapr/api/v1/extract.py", line 49, in extract_netcdf
    return extract.api_extract_netcdf(config, config_yaml)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/Reshapr/reshapr/core/extract.py", line 84, in api_extract_netcdf
    write_netcdf(extracted_ds, nc_path, encoding, nc_format, unlimited_dim)
  File "/SalishSeaCast/Reshapr/reshapr/core/extract.py", line 1220, in write_netcdf
    extracted_ds.to_netcdf(
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/xarray/core/dataset.py", line 1911, in to_netcdf
    return to_netcdf(  # type: ignore  # mypy cannot resolve the overloads:(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/xarray/backends/api.py", line 1226, in to_netcdf
    writes = writer.sync(compute=compute)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/xarray/backends/common.py", line 172, in sync
    delayed_store = da.store(
                    ^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/dask/array/core.py", line 1237, in store
    compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/dask/base.py", line 341, in compute_as_if_collection
    return schedule(dsk2, keys, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/distributed/client.py", line 3213, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/distributed/client.py", line 2348, in gather
    return self.sync(
           ^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/distributed/utils.py", line 349, in sync
    return sync(
           ^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/distributed/utils.py", line 416, in sync
    raise exc.with_traceback(tb)
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/distributed/utils.py", line 389, in f
    result = yield future
             ^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/tornado/gen.py", line 769, in run
    value = future.result()
            ^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/distributed/client.py", line 2211, in _gather
    raise exception.with_traceback(traceback)
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/dask/optimization.py", line 990, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/dask/core.py", line 149, in get
    result = _execute_task(task, cache)
    ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
    ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
    ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
    ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
    ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/dask/core.py", line 113, in _execute_task
    return [_execute_task(a, cache) for a in arg]
    ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/dask/core.py", line 113, in <listcomp>
    return [_execute_task(a, cache) for a in arg]
    ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
    ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/dask/array/core.py", line 126, in getter
    c = np.asarray(c)
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/xarray/core/indexing.py", line 470, in __array__
    return np.asarray(self.array, dtype=dtype)
    ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/xarray/core/indexing.py", line 634, in __array__
    return np.asarray(self.array, dtype=dtype)
    ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/xarray/core/indexing.py", line 535, in __array__
    return np.asarray(array[self.key], dtype=None)
    ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/xarray/coding/variables.py", line 73, in __array__
    return self.func(self.array)
    ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/xarray/coding/variables.py", line 145, in _apply_mask
    data = np.asarray(data, dtype=dtype)
      ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/xarray/core/indexing.py", line 535, in __array__
    return np.asarray(array[self.key], dtype=None)
    ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/xarray/backends/netCDF4_.py", line 91, in __getitem__
    return indexing.explicit_indexing_adapter(
    ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/xarray/core/indexing.py", line 826, in explicit_indexing_adapter
    result = raw_indexing_method(raw_key.tuple)
      ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/xarray/backends/netCDF4_.py", line 103, in _getitem
    original_array = self.get_array(needs_lock=False)
    ^^^^^^^^^^^^^^^^^
  File "/SalishSeaCast/nowcast-env/lib/python3.11/site-packages/xarray/backends/netCDF4_.py", line 83, in get_array
    variable = ds.variables[self.variable_name]
    ^^^^^^^^^^^^^^^^^
KeyError: 'particulate_organic_nitrogen'
douglatornell commented 6 months ago

This appears to be due to the way the netCDF4 library behaves when dask workers use multiple threads. It has been described as a thread-safety issue (https://github.com/xCDAT/xcdat/issues/561#issuecomment-1969470260) and a file locking issue (https://github.com/pydata/xarray/discussions/8925#discussioncomment-9139316). In any case, the solution is to set the number of threads per dask worker to 1.

When I changed the persistent dask cluster on salish to use dask worker ... --nthreads=1 ... the problem disappeared for the SalishSeaNowcast make_averaged_dataset worker.