pangeo-forge / pangeo-forge-recipes

Python library for building Pangeo Forge recipes.
https://pangeo-forge.readthedocs.io/
Apache License 2.0
123 stars 54 forks source link

copy_to_local in OpenWithXarray not working on Dataflow #722

Open jbusecke opened 5 months ago

jbusecke commented 5 months ago

As suggested by @rabernat in the meeting earlier today, I ran a test of my (M)RE (see #715 and []()) with copy_to_local set to True.

This failed running on Google dataflow with several errors similar to this (from the dataflow workflow logs of this job&authuser=1)):

FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmpphbwtfmv' [while running 'Creating CMIP6.CMIP.CMCC.CMCC-ESM2.historical.r1i1p1f1.3hr.pr.gn.v20210114|OpenURLWithFSSpec|OpenWithXarray|Preprocessor|StoreToZarr/StoreToZarr/Rechunk/MapTuple(combine_fragments)-ptransform-46']
Traceback (most recent call last):
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/file_manager.py", line 211, in _acquire_with_cache_info
    file = self._cache[self._key]
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/lru_cache.py", line 56, in __getitem__
    value = self._cache[key]
KeyError: [<class 'netCDF4._netCDF4.Dataset'>, ('/tmp/tmpphbwtfmv',), 'r', (('clobber', True), ('diskless', False), ('format', 'NETCDF4'), ('persist', False)), '0cfa0ad6-23bc-4bb4-852a-213acee52f2f']

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 637, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/tmp/ab3ce7b5c6a409796cee8c934cce7cfd4b3d5546a1c4bd716293176fd8cfc1a1nwgi2awr/lib/python3.10/site-packages/apache_beam/transforms/core.py", line 2040, in <lambda>
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/pangeo_forge_recipes/rechunking.py", line 240, in combine_fragments
    ds_combined = xr.combine_nested(dsets_to_concat, concat_dim=concat_dims_sorted)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/combine.py", line 577, in combine_nested
    return _nested_combine(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/combine.py", line 356, in _nested_combine
    combined = _combine_nd(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/combine.py", line 232, in _combine_nd
    combined_ids = _combine_all_along_first_dim(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/combine.py", line 267, in _combine_all_along_first_dim
    new_combined_ids[new_id] = _combine_1d(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/combine.py", line 290, in _combine_1d
    combined = concat(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/concat.py", line 250, in concat
    return _dataset_concat(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/concat.py", line 631, in _dataset_concat
    combined_var = concat_vars(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/variable.py", line 2962, in concat
    return Variable.concat(variables, dim, positions, shortcut, combine_attrs)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/variable.py", line 1718, in concat
    data = duck_array_ops.concatenate(arrays, axis=axis)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/duck_array_ops.py", line 356, in concatenate
    return _concatenate(as_shared_dtype(arrays), axis=axis)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/duck_array_ops.py", line 232, in as_shared_dtype
    arrays = [asarray(x, xp=xp) for x in scalars_or_arrays]
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/duck_array_ops.py", line 232, in <listcomp>
    arrays = [asarray(x, xp=xp) for x in scalars_or_arrays]
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/duck_array_ops.py", line 219, in asarray
    return data if is_duck_array(data) else xp.asarray(data)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 806, in __array__
    return np.asarray(self.get_duck_array(), dtype=dtype)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 809, in get_duck_array
    self._ensure_cached()
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 803, in _ensure_cached
    self.array = as_indexable(self.array.get_duck_array())
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 760, in get_duck_array
    return self.array.get_duck_array()
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 630, in get_duck_array
    array = array.get_duck_array()
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/coding/variables.py", line 81, in get_duck_array
    return self.func(self.array.get_duck_array())
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 623, in get_duck_array
    array = self.array[self.key]
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/netCDF4_.py", line 101, in __getitem__
    return indexing.explicit_indexing_adapter(
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/core/indexing.py", line 987, in explicit_indexing_adapter
    result = raw_indexing_method(raw_key.tuple)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/netCDF4_.py", line 113, in _getitem
    original_array = self.get_array(needs_lock=False)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/netCDF4_.py", line 92, in get_array
    ds = self.datastore._acquire(needs_lock)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/netCDF4_.py", line 412, in _acquire
    with self._manager.acquire_context(needs_lock) as root:
  File "/usr/local/lib/python3.10/contextlib.py", line 135, in __enter__
    return next(self.gen)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/file_manager.py", line 199, in acquire_context
    file, cached = self._acquire_with_cache_info(needs_lock)
  File "/opt/apache/beam-venv/beam-venv-worker-sdk-0-0/lib/python3.10/site-packages/xarray/backends/file_manager.py", line 217, in _acquire_with_cache_info
    file = self._opener(*self._args, **kwargs)
  File "src/netCDF4/_netCDF4.pyx", line 2469, in netCDF4._netCDF4.Dataset.__init__
  File "src/netCDF4/_netCDF4.pyx", line 2028, in netCDF4._netCDF4._ensure_nc_success
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmpphbwtfmv'

So this is not a fix for the issue in #715 yet.

I am not entirely sure how I should go about debugging this further. Any suggestions welcome.

jbusecke commented 5 months ago

I am wondering if my workers had and local storage to begin with:

image

The n1 machine family seems to generally have local SSD access?

image

But I am wondering if that needs to be specified as an option?

rabernat commented 5 months ago

I think you also have to set load=True. It's looking for a local file that isn't there.

https://github.com/pangeo-forge/pangeo-forge-recipes/blob/fe036507d728d10cf4c926fedcc5d0ca5234f7f3/pangeo_forge_recipes/openers.py#L242-L246

jbusecke commented 5 months ago

Just opened https://github.com/pangeo-forge/pangeo-forge-runner/pull/183 to test if I can set larger HDD on the dataflow workers.

jbusecke commented 5 months ago

Ok so that seems to generally work, but it might be useful to somehow check if the current worker has any storage attached? Not sure if this is possible in general, but that would certainly increase the usability of that feature.

jbusecke commented 5 months ago

I think you also have to set load=True. It's looking for a local file that isn't there.

https://github.com/pangeo-forge/pangeo-forge-recipes/blob/fe036507d728d10cf4c926fedcc5d0ca5234f7f3/pangeo_forge_recipes/openers.py#L242-L246

Should this be an error, rather than a warning? I did not see this anywhere in the logs on dataflow.

I guess this depends on where this is run.

rabernat commented 5 months ago

but it might be useful to somehow check if the current worker has any storage attached?

If the worker has no storage attached, this would fail on OpenWithXarray. Instead, it is failing on a combine step, after the fragment has been moved to another worker.

jbusecke commented 5 months ago

So ok I now ran another test&authuser=1). For the above test case, I am doing the following:

And I am getting these sort of error traces:

image

sorry for the ugly screenshot, but I am not even sure what could be happening here...

I am fairly confident I can exclude workers OOMing: The memory useage is very low, and each workers memory could hold the entire dataset (all files) in memory