pydata / xarray

N-D labeled arrays and datasets in Python
https://xarray.dev
Apache License 2.0
3.6k stars 1.08k forks source link

Thread safety issue with open_mfdataset, Zarr, and dask #8815

Open csubich opened 7 months ago

csubich commented 7 months ago

What happened?

I have a large-ish weather dataset (mirrored version of a subset of WeatherBench2 data), which is stored on-disk as a collection of Zarr datasets, segregated by time. Because processing is intensive and GPU-related, I handle the data in parallel as:

When doing so, I intermittently receive two categories of exceptions from dask. The first is a SystemError:

2024-03-08 13:17:51,404 - distributed.worker - WARNING - Compute Failed
Key:       ('concatenate-open_dataset-geopotential-getitem-1694daf6b6f554dadb58cb1c6a0ab42c', 0, 0, 0, 0)
Function:  execute_task
args:      ((<function getitem at 0x154c850e02c0>, (<function getitem at 0x154c850e02c0>, (<function getitem at 0x154c850e02c0>, (subgraph_callable-ecb9cb81-62ff-42b4-bfdc-571bccb1dbf0, ImplicitToExplicitIndexingAdapter(array=CopyOnWriteArray(array=LazilyIndexedArray(array=<xarray.backends.zarr.ZarrArrayWrapper object at 0x153c5968e140>, key=BasicIndexer((slice(None, None, None), slice(None, None, None), slice(None, None, None), slice(None, None, None)))))), (slice(62, 63, None), slice(0, 37, None), slice(0, 721, None), slice(0, 1440, None))), (array([0]), slice(None, None, None), slice(None, None, None), slice(None, None, None))), (array([0]), slice(None, None, None), slice(None, None, None), slice(None, None, None))), (slice(None, None, None), array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33,
       34, 35, 36]), slice(None, None, None), slice(None, None, None))))
kwargs:    {}
Exception: "SystemError('/home/conda/feedstock_root/build_artifacts/python-split_1696328470153/work/Objects/tupleobject.c:927: bad argument to internal function')"

while the second is a KeyNotFound error:

2024-03-08 13:17:51,429 - distributed.worker - ERROR - Exception during execution of task ('broadcast_to-ea76769e6c2e9cb7113e9f1824307da6', 0, 0, 0, 0, 0, 0).
Traceback (most recent call last):
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/zict/buffer.py", line 184, in __getitem__
    return self.fast[key]
           ~~~~~~~~~^^^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/zict/common.py", line 127, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/zict/lru.py", line 117, in __getitem__
    result = self.d[key]
             ~~~~~~^^^^^
KeyError: ('getitem-4ef33614b5428a711600ad275d8a3687', 0, 0, 0, 0)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/distributed/worker.py", line 2383, in _prepare_args_for_execution
    data[k] = self.data[k]
              ~~~~~~~~~^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/distributed/spill.py", line 216, in __getitem__
    return super().__getitem__(key)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/zict/common.py", line 127, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/zict/buffer.py", line 186, in __getitem__
    return self.slow_to_fast(key)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/zict/buffer.py", line 153, in slow_to_fast
    value = self.slow[key]
            ~~~~~~~~~^^^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/zict/common.py", line 127, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/zict/cache.py", line 67, in __getitem__
    gen = self._last_updated[key]
          ~~~~~~~~~~~~~~~~~~^^^^^
KeyError: ('getitem-4ef33614b5428a711600ad275d8a3687', 0, 0, 0, 0)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/distributed/worker.py", line 2235, in execute
    args2, kwargs2 = self._prepare_args_for_execution(ts, args, kwargs)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/csu001/data/ppp5/conda_env/gforecast/lib/python3.11/site-packages/distributed/worker.py", line 2387, in _prepare_args_for_execution
    data[k] = Actor(type(self.state.actors[k]), self.address, k, self)
                         ~~~~~~~~~~~~~~~~~^^^
KeyError: ('getitem-4ef33614b5428a711600ad275d8a3687', 0, 0, 0, 0)

The computation will often succeed on a subsequent attempt if I catch the exception raised by .compute and re-execute the call.

This second error seems to disappear[^2] if I pass cache=False to open_mfdataset, but the first persists.

Both errors seem to disappear if I use only a single worker thread[^3] or if I restrict the data being used to only one file's worth (that is, still open all the files from open_mfdataset, but only use entries that I know come from one of those source Zarr trees).

[^1]: I was previously using .sel to slice the data in a more semantic way, but I switched to the lower-level slicing to rule out any thread-unsafety inside the non-delayed-computing parts of Xarray when dealing with a shared dataset.

[^2]: The errors are random in exactly the annoying way that threading errors are. One factor making them more likely is debugging printouts, which seems to disrupt thread scheduling enough to trigger them more often.

[^3]: Unfortunately, this results in an unacceptably slow performance. Loading the data in parallel really does improve throughput. I haven't yet tried using the asynchronous model for the dask client.

What did you expect to happen?

No response

Minimal Complete Verifiable Example

No response

MVCE confirmation

Relevant log output

No response

Anything else we need to know?

No response

Environment

``` INSTALLED VERSIONS ------------------ commit: None python: 3.11.8 | packaged by conda-forge | (main, Feb 16 2024, 20:53:32) [GCC 12.3.0] python-bits: 64 OS: Linux OS-release: 4.18.0-240.el8.x86_64 machine: x86_64 processor: x86_64 byteorder: little LC_ALL: None LANG: en_US.UTF-8 LOCALE: ('en_US', 'UTF-8') libhdf5: 1.14.3 libnetcdf: 4.9.2 xarray: 2024.1.0 pandas: 2.2.1 numpy: 1.26.4 scipy: 1.12.0 netCDF4: 1.6.5 pydap: None h5netcdf: 1.3.0 h5py: 3.10.0 Nio: None zarr: 2.16.1 cftime: 1.6.3 nc_time_axis: None iris: 3.7.0 bottleneck: None dask: 2024.1.0 distributed: 2024.1.0 matplotlib: 3.8.3 cartopy: 0.22.0 seaborn: None numbagg: None fsspec: 2023.12.2 cupy: None pint: None sparse: 0.15.1 flox: None numpy_groupies: None setuptools: 69.1.1 pip: 24.0 conda: None pytest: None mypy: None IPython: 8.20.0 sphinx: None ```
max-sixty commented 6 days ago

I suspect this is a lack of thread safety in dask. In particular, the tracebacks are internal dask data structures. Is there anything to suggest this something we could fix from the xarray side?

dcherian commented 3 days ago

cc @fjetter @phofl for awareness