ecmwf / earthkit-data

A format-agnostic Python interface for geospatial data
Apache License 2.0
47 stars 9 forks source link

dask.distributed does not work with grib files opened by earthkit #378

Open malmans2 opened 1 month ago

malmans2 commented 1 month ago

What happened?

I think it's the same problem reported in #375, but I'm opening a new issue as I'm not 100% sure. I'm not able to use dask.distributed with grib files opened by earthkit-data with dask.

I get this warning when I open the data with dask:

In  , overriding the default value (chunks=None) with chunks={} is not recommended.

Is that intentional? We are not supposed to use dask with grib files?

What are the steps to reproduce the bug?

import earthkit.data
import dask.distributed

client = dask.distributed.Client()

collection_id = "reanalysis-era5-single-levels"
request = {
    "variable": "2t",
    "product_type": "reanalysis",
    "date": "2012-12-01",
    "time": "12:00",
}
earthkit_ds = earthkit.data.from_source("cds", collection_id, **request)
xr_ds = earthkit_ds.to_xarray(xarray_open_dataset_kwargs={"chunks": {}})
xr_ds.to_netcdf("test.nc")  # TypeError

Version

0.7.0

Platform (OS and architecture)

Linux eqc-quality-tools.eqc.compute.cci1.ecmwf.int 5.14.0-362.8.1.el9_3.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Nov 8 17:36:32 UTC 2023 x86_64 x86_64 x86_64 GNU/Linux

Relevant log output

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/pickle.py:63, in dumps(x, buffer_callback, protocol)
     62 try:
---> 63     result = pickle.dumps(x, **dump_kwargs)
     64 except Exception:

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/readers/grib/xarray.py:38, in IndexWrapperForCfGrib.__getstate__(self)
     37 def __getstate__(self):
---> 38     return dict(index=serialise_state(self.index), ignore_keys=self.ignore_keys)

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/utils/serialise.py:26, in serialise_state(obj)
     25 LOG.info("serialise %s", fullname)
---> 26 return (fullname, SERIALISATION[fullname][0](obj))

KeyError: ('earthkit.data.readers.grib.reader', 'GRIBReader')

During handling of the above exception, another exception occurred:

KeyError                                  Traceback (most recent call last)
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/pickle.py:68, in dumps(x, buffer_callback, protocol)
     67 buffers.clear()
---> 68 pickler.dump(x)
     69 result = f.getvalue()

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/readers/grib/xarray.py:38, in IndexWrapperForCfGrib.__getstate__(self)
     37 def __getstate__(self):
---> 38     return dict(index=serialise_state(self.index), ignore_keys=self.ignore_keys)

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/utils/serialise.py:26, in serialise_state(obj)
     25 LOG.info("serialise %s", fullname)
---> 26 return (fullname, SERIALISATION[fullname][0](obj))

KeyError: ('earthkit.data.readers.grib.reader', 'GRIBReader')

During handling of the above exception, another exception occurred:

KeyError                                  Traceback (most recent call last)
File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/serialize.py:363, in serialize(x, serializers, on_error, context, iterate_collection)
    362 try:
--> 363     header, frames = dumps(x, context=context) if wants_context else dumps(x)
    364     header["serializer"] = name

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/serialize.py:78, in pickle_dumps(x, context)
     76     writeable.append(not f.readonly)
---> 78 frames[0] = pickle.dumps(
     79     x,
     80     buffer_callback=buffer_callback,
     81     protocol=context.get("pickle-protocol", None) if context else None,
     82 )
     83 header = {
     84     "serializer": "pickle",
     85     "writeable": tuple(writeable),
     86 }

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/pickle.py:81, in dumps(x, buffer_callback, protocol)
     80     buffers.clear()
---> 81     result = cloudpickle.dumps(x, **dump_kwargs)
     82 except Exception:

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/cloudpickle/cloudpickle.py:1479, in dumps(obj, protocol, buffer_callback)
   1478 cp = Pickler(file, protocol=protocol, buffer_callback=buffer_callback)
-> 1479 cp.dump(obj)
   1480 return file.getvalue()

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/cloudpickle/cloudpickle.py:1245, in Pickler.dump(self, obj)
   1244 try:
-> 1245     return super().dump(obj)
   1246 except RuntimeError as e:

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/readers/grib/xarray.py:38, in IndexWrapperForCfGrib.__getstate__(self)
     37 def __getstate__(self):
---> 38     return dict(index=serialise_state(self.index), ignore_keys=self.ignore_keys)

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/earthkit/data/utils/serialise.py:26, in serialise_state(obj)
     25 LOG.info("serialise %s", fullname)
---> 26 return (fullname, SERIALISATION[fullname][0](obj))

KeyError: ('earthkit.data.readers.grib.reader', 'GRIBReader')

The above exception was the direct cause of the following exception:

TypeError                                 Traceback (most recent call last)
Cell In[1], line 15
     13 earthkit_ds = earthkit.data.from_source("cds", collection_id, **request)
     14 xr_ds = earthkit_ds.to_xarray(xarray_open_dataset_kwargs={"chunks": {}})
---> 15 xr_ds.to_netcdf("test.nc")

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/xarray/core/dataset.py:2298, in Dataset.to_netcdf(self, path, mode, format, group, engine, encoding, unlimited_dims, compute, invalid_netcdf)
   2295     encoding = {}
   2296 from xarray.backends.api import to_netcdf
-> 2298 return to_netcdf(  # type: ignore  # mypy cannot resolve the overloads:(
   2299     self,
   2300     path,
   2301     mode=mode,
   2302     format=format,
   2303     group=group,
   2304     engine=engine,
   2305     encoding=encoding,
   2306     unlimited_dims=unlimited_dims,
   2307     compute=compute,
   2308     multifile=False,
   2309     invalid_netcdf=invalid_netcdf,
   2310 )

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/xarray/backends/api.py:1348, in to_netcdf(dataset, path_or_file, mode, format, group, engine, encoding, unlimited_dims, compute, multifile, invalid_netcdf)
   1345 if multifile:
   1346     return writer, store
-> 1348 writes = writer.sync(compute=compute)
   1350 if isinstance(target, BytesIO):
   1351     store.sync()

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/xarray/backends/common.py:297, in ArrayWriter.sync(self, compute, chunkmanager_store_kwargs)
    294 if chunkmanager_store_kwargs is None:
    295     chunkmanager_store_kwargs = {}
--> 297 delayed_store = chunkmanager.store(
    298     self.sources,
    299     self.targets,
    300     lock=self.lock,
    301     compute=compute,
    302     flush=True,
    303     regions=self.regions,
    304     **chunkmanager_store_kwargs,
    305 )
    306 self.sources = []
    307 self.targets = []

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/xarray/namedarray/daskmanager.py:249, in DaskManager.store(self, sources, targets, **kwargs)
    241 def store(
    242     self,
    243     sources: Any | Sequence[Any],
    244     targets: Any,
    245     **kwargs: Any,
    246 ) -> Any:
    247     from dask.array import store
--> 249     return store(
    250         sources=sources,
    251         targets=targets,
    252         **kwargs,
    253     )

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/dask/array/core.py:1236, in store(***failed resolving arguments***)
   1234 elif compute:
   1235     store_dsk = HighLevelGraph(layers, dependencies)
-> 1236     compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
   1237     return None
   1239 else:

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/dask/base.py:402, in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
    400 schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
    401 dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 402 return schedule(dsk2, keys, **kwargs)

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/client.py:3259, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3186 def get(
   3187     self,
   3188     dsk,
   (...)
   3200     **kwargs,
   3201 ):
   3202     """Compute dask graph
   3203
   3204     Parameters
   (...)
   3257     Client.compute : Compute asynchronous collections
   3258     """
-> 3259     futures = self._graph_to_futures(
   3260         dsk,
   3261         keys=set(flatten([keys])),
   3262         workers=workers,
   3263         allow_other_workers=allow_other_workers,
   3264         resources=resources,
   3265         fifo_timeout=fifo_timeout,
   3266         retries=retries,
   3267         user_priority=priority,
   3268         actors=actors,
   3269     )
   3270     packed = pack_data(keys, futures)
   3271     if sync:

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/client.py:3155, in Client._graph_to_futures(self, dsk, keys, workers, allow_other_workers, internal_priority, user_priority, resources, retries, fifo_timeout, actors)
   3152 from distributed.protocol import serialize
   3153 from distributed.protocol.serialize import ToPickle
-> 3155 header, frames = serialize(ToPickle(dsk), on_error="raise")
   3157 pickled_size = sum(map(nbytes, [header] + frames))
   3158 if pickled_size > parse_bytes(
   3159     dask.config.get("distributed.admin.large-graph-warning-threshold")
   3160 ):

File /data/common/miniforge3/envs/wp3/lib/python3.11/site-packages/distributed/protocol/serialize.py:389, in serialize(x, serializers, on_error, context, iterate_collection)
    387     except Exception:
    388         raise TypeError(msg) from exc
--> 389     raise TypeError(msg, str_x) from exc
    390 else:  # pragma: nocover
    391     raise ValueError(f"{on_error=}; expected 'message' or 'raise'")

TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7feabb21bf90>\n 0. 140646138612736\n>')

Accompanying data

No response

Organisation

B-Open / CADS-EQC

sandorkertesz commented 1 month ago

Thank you for reporting this issue. I looked into it and it does not seem to be related to #375.

sandorkertesz commented 1 month ago

The reason for this failure is that serialisation for this GRIB data is yet to be implemented.