observingClouds / slkspec

fsspec filesystem for stronglink tape archive
5 stars 2 forks source link

Issues with intake-esm #6

Open observingClouds opened 1 year ago

observingClouds commented 1 year ago
    On levante:
import intake
import json
import pandas as pd
with open("/pool/data/Catalogs/dkrz_cmip6_disk.json") as f:
    catconfig=json.load(f)
testcat=intake.open_esm_datastore(esmcol_obj=pd.read_csv("/home/k/k204210/intake-esm/catalogs/dkrz_cmip6_archive.csv.gz"),
                                  esmcol_data=catconfig)
subset=testcat.search(source_id="MPI-ESM1-2-LR",
               experiment_id="ssp370",
               variable_id="tas",
              table_id="Amon")
import os
if "slk" not in os.environ["PATH"]:
    os.environ["PATH"]=os.environ["PATH"]+":/sw/spack-levante/slk-3.3.67-jrygfs/bin/:/sw/spack-levante/openjdk-17.0.0_35-k5o6dr/bin"
SLK_CACHE="/scratch/k/k204210/INTAKE"
%env SLK_CACHE={SLK_CACHE}
subset.to_dataset_dict()

calls 33 slk retrieves which call 33 /sw/spack-levante/slk-3.3.67-jrygfs/lib/slk-cli-tools-3.3.67.jar retrieve (66 processes in total) for 10 unique tars from the same directory on hsm. that cant be right

Originally posted by @wachsylon in https://github.com/observingClouds/slkspec/issues/3#issuecomment-1340613830

observingClouds commented 1 year ago

@wachsylon, how many results does the search find? Are these results independent, i.e. are there no results that would request the same tar file?

observingClouds commented 1 year ago

This seems to be an issue with how intake-esm is requesting individual datasets when using to_dataset_dict(). The datasets are loaded independently ( different to e.g. xr.open_mfdataset):

        with concurrent.futures.ThreadPoolExecutor(max_workers=dask.system.CPU_COUNT) as executor:
            future_tasks = [
                executor.submit(_load_source, key, source) for key, source in sources.items()
            ]

[from intake_esm/core.py]

Due to these independent jobs, the slk-retrievals do not know anything from each other. One might think about allowing to set the queue for slkspec externally, similarly to the get_client of swiftspec.

observingClouds commented 1 year ago

@wachsylon could you give access to the catalog or point me to a public catalog where the same problem occurs?

PermissionError: [Errno 13] Permission denied: '/home/k/k204210/intake-esm/catalogs/dkrz_cmip6_archive.csv.gz'
wachsylon commented 1 year ago

@observingClouds

sorry, I put the catalog to /work/ik1017/Catalogs/dkrz_cmip6_archive.csv.gz

I either ran into

OSError: 
            Failed to open netCDF/HDF dataset.

            *** Arguments passed to xarray.open_dataset() ***:

            - filename_or_obj: <fsspec.implementations.tar.TarContainedFile object at 0x7ffddaec9120>
            - kwargs: {'chunks': {}}

            *** fsspec options used ***:

            - root: ./ScenarioMIP/MPI-M/MPI-ESM1-2-LR/ssp370/r7i1p1f1/Amon/tas/gn/v20190710/tas_Amon_MPI-ESM1-2-LR_ssp370_r7i1p1f1_gn_209501-210012.nc
            - protocol: tar

            ********************************************

Or with subset.to_dataset_dict(cdf_kwargs=dict(engine="h5netcdf")):

File ~/.conda/envs/slkspecenv/lib/python3.10/site-packages/h5py/_hl/dataset.py:741, in Dataset.__getitem__(self, args, new_dtype)
    740 try:
--> 741     return self._fast_reader.read(args)
    742 except TypeError:

File h5py/_selector.pyx:370, in h5py._selector.Reader.read()

File h5py/h5fd.pyx:160, in h5py.h5fd.H5FD_fileobj_read()

File ~/.conda/envs/slkspecenv/lib/python3.10/site-packages/fsspec/implementations/tar.py:175, in TarContainedFile.seek(self, to, whence)
    174     raise ValueError("Whence must be (0, 1, 2)")
--> 175 self.of.seek(to)

File /work/ik1017/CMIP6/meta/packems3/slkspec/slkspec/core.py:182, in SLKFile.seek(self, target)
    181     self._cache_files()
--> 182 return self._file_obj.seek(target)

ValueError: seek of closed file

The above exception was the direct cause of the following exception:

where it seems like fsspec.open is run twice.

observingClouds commented 1 year ago

@wachsylon I had to adjust your example a bit. The arguments you were using do not exist in the current intake-esm version:

import intake
import json
import pandas as pd
with open("/pool/data/Catalogs/dkrz_cmip6_disk.json") as f:
    catconfig=json.load(f)
df=pd.read_csv("/work/ik1017/Catalogs/dkrz_cmip6_archive.csv.gz")
testcat=intake.open_esm_datastore(obj={"esmcat":catconfig,"df":df})
subset=testcat.search(source_id="MPI-ESM1-2-LR",
               experiment_id="ssp370",
               variable_id="tas",
              table_id="Amon")
subset.to_dataset_dict(xarray_open_kwargs=dict(engine="h5netcdf"))

The issue here is how intake-esm creates the datasets. As mentioned in https://github.com/observingClouds/slkspec/issues/6#issuecomment-1341326470 intake-esm opens every catalog entry independently. Because the subsets refer to non-unique tar-archives, one tar-file might be opened by several intake-esm calls at the same time.

In [4]: testcat['ScenarioMIP.MPI-ESM1-2-LR.ssp370.Amon.gn'].df.iloc[0]["uri"]
Out[4]: 'tar://./ScenarioMIP/MPI-M/MPI-ESM1-2-LR/ssp370/r10i1p1f1/Amon/cct/gn/v20190710/cct_Amon_MPI-ESM1-2-LR_ssp370_r10i1p1f1_gn_201501-203412.nc::slk:///arch/ik1017/cmip6/CMIP6/ScenarioMIP_3964.tar'

In [5]: testcat['ScenarioMIP.MPI-ESM1-2-LR.ssp370.Amon.gn'].df.iloc[1]["uri"]
Out[5]: 'tar://./ScenarioMIP/MPI-M/MPI-ESM1-2-LR/ssp370/r10i1p1f1/Amon/cct/gn/v20190710/cct_Amon_MPI-ESM1-2-LR_ssp370_r10i1p1f1_gn_203501-205412.nc::slk:///arch/ik1017/cmip6/CMIP6/ScenarioMIP_3964.tar'

In [6]: testcat['ScenarioMIP.MPI-ESM1-2-LR.ssp370.Amon.gn'].df.iloc[2]["uri"]
Out[6]: 'tar://./ScenarioMIP/MPI-M/MPI-ESM1-2-LR/ssp370/r10i1p1f1/Amon/cct/gn/v20190710/cct_Amon_MPI-ESM1-2-LR_ssp370_r10i1p1f1_gn_205501-207412.nc::slk:///arch/ik1017/cmip6/CMIP6/ScenarioMIP_3964.tar'

In addition, and this is certainly something to fix upstream, because also a local tar file cannot be opened with intake-esm independent of slkspec. After retrieving the tar-files e.g. with slkspec, remove the slkspec protocol from the uri and load them directly from /scratch with:

subset.df["uri"] = subset.df["uri"].replace("slk://","file:///scratch/<path>/<to>/<SLK-CACHE>")
subset.to_dataset_dict()
Error message
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake_esm/source.py:240, in ESMDataSource._open_dataset(self)
    220 datasets = [
    221     _open_dataset(
    222         record[self.path_column_name],
   (...)
    237     for _, record in self.df.iterrows()
    238 ]
--> 240 datasets = dask.compute(*datasets)
    241 if len(datasets) == 1:

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/base.py:600, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    598     postcomputes.append(x.__dask_postcompute__())
--> 600 results = schedule(dsk, keys, **kwargs)
    601 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
     87         pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
     90     pool.submit,
     91     pool._max_workers,
     92     dsk,
     93     keys,
     94     cache=cache,
     95     get_id=_thread_get_id,
     96     pack_exception=pack_exception,
     97     **kwargs,
     98 )
    100 # Cleanup pools associated to dead threads

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    510     else:
--> 511         raise_exception(exc, tb)
    512 res, worker_id = loads(res_info)

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/local.py:319, in reraise(exc, tb)
    318     raise exc.with_traceback(tb)
--> 319 raise exc

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
    225 id = get_id()

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/dask/utils.py:71, in apply(func, args, kwargs)
     70 if kwargs:
---> 71     return func(*args, **kwargs)
     72 else:

File ~/.conda/envs/slkspec_dev/lib/python3.10/site-packages/intake_esm/source.py:67, in _open_dataset(urlpath, varname, xarray_open_kwargs, preprocess, requested_variables, additional_attrs, expand_dims, data_format)
     66 # Handle multi-file datasets with `xr.open_mfdataset()`
---> 67 if '*' in url or isinstance(url, list):
     68     # How should we handle concat_dim, and other xr.open_mfdataset kwargs?
     69     xarray_open_kwargs.update(preprocess=preprocess)

TypeError: argument of type 'TarContainedFile' is not iterable

Maybe you can come up with a minimal example and raise an issue upstream. After that is fixed, we can see what we are still missing here.

observingClouds commented 1 year ago

The queuing actually seems to work correctly now when patched with #18 . Therefore, I change the title.

wachsylon commented 1 year ago

The arguments you were using do not exist in the current intake-esm version:

I am sorry - the most recent version has changed keywords of the arguments and I eventually have to update the entire intake-esm workflow which will be... annoying... It is actually another reason to rather support intake-xarray than intake-esm.

one tar-file might be opened by several intake-esm calls at the same time.

But how can that be a problem if it works when I apply your idea replace("slk://","file:// .... )`? Isn't intake-esm calling tars several times in that case as well?

observingClouds commented 1 year ago

But how can that be a problem if it works when I apply your idea replace("slk://","file:// .... )`? Isn't intake-esm calling tars several times in that case as well?

Sry, maybe I wasn't clear. I meant that this is not working either and just an option to test intake-esm for tar-files. Or are you saying that this is working with your intake-esm version?

wachsylon commented 1 year ago

It is working.

wachsylon commented 1 year ago

@observingClouds

After the retrieval when tars exist, this is working:

import intake
import json
import pandas as pd
with open("/pool/data/Catalogs/dkrz_cmip6_disk.json") as f:
    catconfig=json.load(f)
testcat=intake.open_esm_datastore(esmcol_obj=pd.read_csv("/work/ik1017/Catalogs/dkrz_cmip6_archive.csv.gz"),
                                  esmcol_data=catconfig)
subset=testcat.search(source_id="MPI-ESM1-2-LR",
               experiment_id="ssp370",
               variable_id="tas",
              table_id="Amon")
import os
if "slk" not in os.environ["PATH"]:
    os.environ["PATH"]=os.environ["PATH"]+":/sw/spack-levante/slk-3.3.67-jrygfs/bin/:/sw/spack-levante/openjdk-17.0.0_35-k5o6dr/bin"
SLK_CACHE="/scratch/k/k204210/INTAKE"
%env SLK_CACHE={SLK_CACHE}
subset._df["uri"]=subset._df["uri"].str.replace("slk","file").str.replace("/arch","/scratch/k/k204210/INTAKE/arch")
subset.to_dataset_dict(cdf_kwargs=dict(engine="h5netcdf"))
wachsylon commented 1 year ago

which is the same old outdated code but with the extra line

subset._df["uri"]=subset._df["uri"].str.replace("slk","file").str.replace("/arch","/scratch/k/k204210/INTAKE/arch")

observingClouds commented 1 year ago

Interesting! It fails for me. Which version of intake-esm are you using? Could you try if this still works with 2022.9.18?

wachsylon commented 1 year ago

With the recent version, I end up with

---> 67 if '*' in url or isinstance(url, list):
     68     # How should we handle concat_dim, and other xr.open_mfdataset kwargs?
     69     xarray_open_kwargs.update(preprocess=preprocess)

TypeError: argument of type 'TarContainedFile' is not iterable

annoying

observingClouds commented 1 year ago

Great! Well, great that you can reproduce my issue, not so great that the feature that we need here had worked in the past. Would you mind opening an issue at intake-esm and link it here? Which version did work for you?

wachsylon commented 1 year ago

2021.8.17

but extrapolating the version releases of intake-esm, we can expect the next one in 2024 :(

wachsylon commented 1 year ago

I will open up an issue there!

Nevertheless, we could also try to get rid of

File ~/.conda/envs/slkspecenv/lib/python3.10/site-packages/fsspec/implementations/tar.py:175, in TarContainedFile.seek(self, to, whence)
    174     raise ValueError("Whence must be (0, 1, 2)")
--> 175 self.of.seek(to)

File /work/ik1017/CMIP6/meta/packems3/slkspec/slkspec/core.py:182, in SLKFile.seek(self, target)
    181     self._cache_files()
--> 182 return self._file_obj.seek(target)

ValueError: seek of closed file
observingClouds commented 1 year ago

@wachsylon I went ahead and raised an issue upstream.

antarcticrainforest commented 1 year ago

The queuing actually seems to work correctly now when patched with #18 . Therefore, I change the title.

Is that really true? The GIL should take care of the thread lock. If the thread lock doesn't work properly we'll have to find a better way of implementing it.

observingClouds commented 1 year ago

Yes, this is true. I think we should create additional tests for all these cases that don't yet work. As a first step it would be okay if those tests require data on Levante and can only be executed there.