Open rabernat opened 3 years ago
Should be
openfile = fsspec.open(url, mode='rb')
with openfile as f:
dsgcs = xr.open_dataset(f, chunks=3000)
If I use your recommended syntax, it doesn't work with distributed because the file is closed.
from dask.distributed import Client
client = Client()
dsgcs.surface.mean().compute()
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-25-136bf6160302> in <module>
1 url = 'gs://ldeo-glaciology/bedmachine/BedMachineAntarctica_2019-11-05_v01.nc'
2 with fsspec.open(url, mode='rb') as openfile:
----> 3 dsgcs = xr.open_dataset(openfile, chunks=3000)
4
5 get_ipython().run_line_magic('time', 'dsgcs.surface.mean().compute()')
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/api.py in open_dataset(filename_or_obj, group, decode_cf, mask_and_scale, decode_times, autoclose, concat_characters, decode_coords, engine, chunks, lock, cache, drop_variables, backend_kwargs, use_cftime, decode_timedelta)
543
544 with close_on_error(store):
--> 545 ds = maybe_decode_store(store)
546
547 # Ensure source filename always stored in dataset object (GH issue #2550)
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/api.py in maybe_decode_store(store, lock)
449
450 def maybe_decode_store(store, lock=False):
--> 451 ds = conventions.decode_cf(
452 store,
453 mask_and_scale=mask_and_scale,
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/conventions.py in decode_cf(obj, concat_characters, mask_and_scale, decode_times, decode_coords, drop_variables, use_cftime, decode_timedelta)
598 decode_timedelta=decode_timedelta,
599 )
--> 600 ds = Dataset(vars, attrs=attrs)
601 ds = ds.set_coords(coord_names.union(extra_coords).intersection(vars))
602 ds._file_obj = file_obj
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/dataset.py in __init__(self, data_vars, coords, attrs)
541 coords = coords.variables
542
--> 543 variables, coord_names, dims, indexes, _ = merge_data_and_coords(
544 data_vars, coords, compat="broadcast_equals"
545 )
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/merge.py in merge_data_and_coords(data, coords, compat, join)
465 explicit_coords = coords.keys()
466 indexes = dict(_extract_indexes_from_coords(coords))
--> 467 return merge_core(
468 objects, compat, join, explicit_coords=explicit_coords, indexes=indexes
469 )
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/merge.py in merge_core(objects, compat, join, combine_attrs, priority_arg, explicit_coords, indexes, fill_value)
592 coerced, join=join, copy=False, indexes=indexes, fill_value=fill_value
593 )
--> 594 collected = collect_variables_and_indexes(aligned)
595
596 prioritized = _get_priority_vars_and_indexes(aligned, priority_arg, compat=compat)
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/merge.py in collect_variables_and_indexes(list_of_mappings)
276 append_all(coords, indexes)
277
--> 278 variable = as_variable(variable, name=name)
279 if variable.dims == (name,):
280 variable = variable.to_index_variable()
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/variable.py in as_variable(obj, name)
158 "dimensions." % (name, obj.dims)
159 )
--> 160 obj = obj.to_index_variable()
161
162 return obj
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/variable.py in to_index_variable(self)
524 def to_index_variable(self):
525 """Return this variable as an xarray.IndexVariable"""
--> 526 return IndexVariable(
527 self.dims, self._data, self._attrs, encoding=self._encoding, fastpath=True
528 )
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/variable.py in __init__(self, dims, data, attrs, encoding, fastpath)
2345 # Unlike in Variable, always eagerly load values into memory
2346 if not isinstance(self._data, PandasIndexAdapter):
-> 2347 self._data = PandasIndexAdapter(self._data)
2348
2349 def __dask_tokenize__(self):
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __init__(self, array, dtype)
1387
1388 def __init__(self, array: Any, dtype: DTypeLike = None):
-> 1389 self.array = utils.safe_cast_to_index(array)
1390 if dtype is None:
1391 if isinstance(array, pd.PeriodIndex):
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/utils.py in safe_cast_to_index(array)
102 if hasattr(array, "dtype") and array.dtype.kind == "O":
103 kwargs["dtype"] = object
--> 104 index = pd.Index(np.asarray(array), **kwargs)
105 return _maybe_cast_to_cftimeindex(index)
106
/srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order)
81
82 """
---> 83 return array(a, dtype, copy=False, order=order)
84
85
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in __array__(self, dtype)
558 def __array__(self, dtype=None):
559 array = as_indexable(self.array)
--> 560 return np.asarray(array[self.key], dtype=None)
561
562 def transpose(self, order):
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in __getitem__(self, key)
26
27 def __getitem__(self, key):
---> 28 return indexing.explicit_indexing_adapter(
29 key, self.shape, indexing.IndexingSupport.OUTER_1VECTOR, self._getitem
30 )
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py in explicit_indexing_adapter(key, shape, indexing_support, raw_indexing_method)
843 """
844 raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
--> 845 result = raw_indexing_method(raw_key.tuple)
846 if numpy_indices.tuple:
847 # index the loaded np.ndarray
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py in _getitem(self, key)
36 with self.datastore.lock:
37 array = self.get_array(needs_lock=False)
---> 38 return array[key]
39
40
/srv/conda/envs/notebook/lib/python3.8/site-packages/h5netcdf/core.py in __getitem__(self, key)
144
145 def __getitem__(self, key):
--> 146 return self._h5ds[key]
147
148 def __setitem__(self, key, value):
h5py/_objects.pyx in h5py._objects.with_phil.wrapper()
h5py/_objects.pyx in h5py._objects.with_phil.wrapper()
/srv/conda/envs/notebook/lib/python3.8/site-packages/h5py/_hl/dataset.py in __getitem__(self, args)
571 mspace = h5s.create_simple(mshape)
572 fspace = selection.id
--> 573 self.id.read(mspace, fspace, arr, mtype, dxpl=self._dxpl)
574
575 # Patch up the output for NumPy
h5py/_objects.pyx in h5py._objects.with_phil.wrapper()
h5py/_objects.pyx in h5py._objects.with_phil.wrapper()
h5py/h5d.pyx in h5py.h5d.DatasetID.read()
h5py/_proxy.pyx in h5py._proxy.dset_rw()
h5py/_proxy.pyx in h5py._proxy.H5PY_H5Dread()
h5py/defs.pyx in h5py.defs.H5Dread()
h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()
/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py in readinto(self, b)
1407 """
1408 out = memoryview(b).cast("B")
-> 1409 data = self.read(out.nbytes)
1410 out[: len(data)] = data
1411 return len(data)
/srv/conda/envs/notebook/lib/python3.8/site-packages/fsspec/spec.py in read(self, length)
1392 length = self.size - self.loc
1393 if self.closed:
-> 1394 raise ValueError("I/O operation on closed file.")
1395 logger.debug("%s read: %i - %i" % (self, self.loc, self.loc + length))
1396 if length == 0:
ValueError: I/O operation on closed file.
In that case you take responsibility for the file
openfile = fsspec.open(url, mode='rb').open()
dsgcs = xr.open_dataset(openfile, chunks=3000)
...
openfile.close()
I don't understand what you mean.
Surely you understand what I want to do here: open the file and compute on it with a distributed cluster. If I am doing it wrong, please tell me the recommended way to do this.
I thought that fsspec was just dispatching to gcsfs based on url matching. Help me understand why that is not the case.
I would also still be interested in getting an answer to this question.
I just came across this while trying various opening options via intake-xarray (https://github.com/intake/intake-xarray/issues/88).
In that case you take responsibility for the file
@martindurant. I'm struggling with how best to do this for the intake-xarray case. If I understand correctly the issue is needing to use a context manager to have access to OpenFile methods, right? But how do we do that in the case of intake-xarray, given the current syntax? This issue is not unique to GCSFS, it also applies to S3 and HTTP - the following snippet leads to the same traceback in the first comment:
import intake
uri = 's3://its-live-data.jpl.nasa.gov/icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5'
ds = intake.open_netcdf(uri,
xarray_kwargs=dict(group='gt1l/land_ice_segments', engine='h5netcdf'),
storage_options=dict(anon=True)
).to_dask()
print(ds.h_li.mean())
/srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/backends/api.py in _get_engine_from_magic_number(filename_or_obj)
113 magic_number = filename_or_obj[:8]
114 else:
--> 115 if filename_or_obj.tell() != 0:
116 raise ValueError(
117 "file-like object read/write pointer not at zero "
AttributeError: 'OpenFile' object has no attribute 'tell'
A partial workaround is using simplecache::
, but that only works for a LocalCluster, not a distributed cluster like dask gateway:
import intake
uri = 'simplecache::s3://its-live-data.jpl.nasa.gov/icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5'
ds = intake.open_netcdf(uri,
chunks=dict(delta_time=20000),
xarray_kwargs=dict(group='gt1l/land_ice_segments', engine='h5netcdf'),
storage_options=dict(s3={'anon': True}, #default_cache_type='all',
#simplecache=dict(cache_storage="/tmp/atl06", same_names=True),
)
).to_dask()
# GatewayCluster
with Client(cluster) as client:
result = ds['h_li'].mean().compute()
/srv/conda/envs/notebook/lib/python3.8/site-packages/h5py/_hl/files.py in make_fid()
171 if swmr and swmr_support:
172 flags |= h5f.ACC_SWMR_READ
--> 173 fid = h5f.open(name, flags, fapl=fapl)
174 elif mode == 'r+':
175 fid = h5f.open(name, h5f.ACC_RDWR, fapl=fapl)
h5py/_objects.pyx in h5py._objects.with_phil.wrapper()
h5py/_objects.pyx in h5py._objects.with_phil.wrapper()
h5py/h5f.pyx in h5py.h5f.open()
OSError: Unable to open file (unable to open file: name = '/tmp/tmphulzch2c/96968a7bb03c66de5724914b4116a866819162a33560f08134284e08f670ad38', errno = 2, error message = 'No such file or directory', flags = 0, o_flags = 0)
One possibility is using a globally available S3:// scratch location for the cache: simplecache=dict(cache_storage="s3://pangeo-scratch/tmp/atl06")
. But it does seem like AttributeError: 'OpenFile' object has no attribute 'tell'
could be addressed in some way to avoid needing caching...
Was this situation working before? I expect that xarray's open with fsspec paths of file objects must have been working for some time, so I'm not sure why this is different. We should figure out why intake-xarray is apparently getting this wrong.
One possibility is using a globally available S3:// scratch location for the cache
I wouldn't mind making this happen - most of the code would stay the same, but we would need to be careful to to read/write cache metadata often.
Was this situation working before? I expect that xarray's open with fsspec paths of file objects must have been working for some time, so I'm not sure why this is different. We should figure out why intake-xarray is apparently getting this wrong.
I'm not sure. I'm not sure remote HDF or NetCDF files without OpenDAP was really explored or tested before. I opened a PR in intake-xarray to explore possible fixes https://github.com/intake/intake-xarray/pull/93
I wouldn't mind making this happen - most of the code would stay the same, but we would need to be careful to to read/write cache metadata often.
Seems like this could be particularly powerful, especially for rechunker or pangeo-forge conversion workflows for datasets on legacy servers.
To avoid getting too side-tracked with intake
from the original issue here, I think this is the key question, what is the difference between using fsspec.open()
(doesn't work) versus s3fs.open()
or gcsfs.open()
(works!) with xarray:
import xarray as xr
import fsspec
import s3fs
# Works
s3 = s3fs.S3FileSystem(anon=True)
url = 's3://its-live-data.jpl.nasa.gov/icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5'
openfile = s3.open(url)
ds = xr.open_dataset(openfile, group='gt1l/land_ice_segments', engine='h5netcdf', chunks={})
# AttributeError: 'OpenFile' object has no attribute 'tell'
url = 's3://its-live-data.jpl.nasa.gov/icesat2/alt06/rel003/ATL06_20181230162257_00340206_003_01.h5'
openfile = fsspec.open(url)
ds = xr.open_dataset(openfile, group='gt1l/land_ice_segments', engine='h5netcdf', chunks={})
what is the difference between using fsspec.open() (doesn't work) versus s3fs.open() or gcsfs.open() (works!) with xarray
fsspec.open('s3://...').open() == s3.open('s3://...')
fsspec.open('s3://...')
-> OpenFile, which is a context manager that produces the underlying file-like object in a with
clause. I can also wrap that file-like in a text or compression wrapper - or indeed other file-systems like simplecache; all are flushed and closed when exiting the context. It is designed for serialisation.
The file-like objects are also context managers: they are auto-closed when leaving the context.
I'm facing what I think is the same issue. The simplecache::
workaround also seems to work here.
I would like to open a gcs path using fsspec's url resolver and then read it with xarray:
this raises the following error:
However, if I do the same thing with gcsfs, it works
This feels like a bug. And it breaks my mental model of how fsspec works. I thought that fsspec was just dispatching to gcsfs based on url matching. Help me understand why that is not the case.
xref https://github.com/pydata/xarray/issues/4591, which helped me discover this (but is about something different)