pangeo-forge / pangeo-forge-recipes

Python library for building Pangeo Forge recipes.
https://pangeo-forge.readthedocs.io/
Apache License 2.0
126 stars 54 forks source link

fsspec HDF5 read issue with noaa-oisst-avhrr-only. #106

Open sharkinsspatial opened 3 years ago

sharkinsspatial commented 3 years ago

@rabernat I'll investigate a bit more but I wanted to flag this here since you had logged a similar issue previously. When running a subset of the noaa-oisst-avhrr-only recipe I hit the following during processing with a single worker node

Unexpected error: ValueError('I/O operation on closed file.')
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
    new_state = method(self, state, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 299, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/usr/local/lib/python3.8/site-packages/rechunker/executors/prefect.py", line 30, in run
    return self.stage.func(key)
  File "/Users/seanharkins/projects/pangeo-forge-flow-registration/venv/lib/python3.8/site-packages/pangeo_forge/recipe.py", line 347, in _store_chunk
  File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.8/site-packages/pangeo_forge/recipe.py", line 454, in open_chunk
    dsets = [stack.enter_context(self.open_input(i)) for i in inputs]
  File "/usr/local/lib/python3.8/site-packages/pangeo_forge/recipe.py", line 454, in <listcomp>
    dsets = [stack.enter_context(self.open_input(i)) for i in inputs]
  File "/usr/local/lib/python3.8/contextlib.py", line 425, in enter_context
    result = _cm_type.__enter__(cm)
  File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
    return next(self.gen)
  File "/usr/local/lib/python3.8/site-packages/pangeo_forge/recipe.py", line 421, in open_input
    ds = xr.open_dataset(f, **self.xarray_open_kwargs)
  File "/usr/local/lib/python3.8/site-packages/xarray/backends/api.py", line 500, in open_dataset
    backend_ds = backend.open_dataset(
  File "/usr/local/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py", line 383, in open_dataset
    ds = store_entrypoint.open_dataset(
  File "/usr/local/lib/python3.8/site-packages/xarray/backends/store.py", line 25, in open_dataset
    vars, attrs, coord_names = conventions.decode_cf_variables(
  File "/usr/local/lib/python3.8/site-packages/xarray/conventions.py", line 512, in decode_cf_variables
    new_vars[k] = decode_cf_variable(
  File "/usr/local/lib/python3.8/site-packages/xarray/conventions.py", line 360, in decode_cf_variable
    var = times.CFDatetimeCoder(use_cftime=use_cftime).decode(var, name=name)
  File "/usr/local/lib/python3.8/site-packages/xarray/coding/times.py", line 522, in decode
    dtype = _decode_cf_datetime_dtype(data, units, calendar, self.use_cftime)
  File "/usr/local/lib/python3.8/site-packages/xarray/coding/times.py", line 139, in _decode_cf_datetime_dtype
    [first_n_items(values, 1) or [0], last_item(values) or [0]]
  File "/usr/local/lib/python3.8/site-packages/xarray/core/formatting.py", line 71, in first_n_items
    return np.asarray(array).flat[:n_desired]
  File "/usr/local/lib/python3.8/site-packages/numpy/core/_asarray.py", line 102, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/usr/local/lib/python3.8/site-packages/xarray/core/indexing.py", line 503, in __array__
    return np.asarray(self.array, dtype=dtype)
  File "/usr/local/lib/python3.8/site-packages/numpy/core/_asarray.py", line 102, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/usr/local/lib/python3.8/site-packages/xarray/core/indexing.py", line 568, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/usr/local/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py", line 44, in __getitem__
    return indexing.explicit_indexing_adapter(
  File "/usr/local/lib/python3.8/site-packages/xarray/core/indexing.py", line 857, in explicit_indexing_adapter
    result = raw_indexing_method(raw_key.tuple)
  File "/usr/local/lib/python3.8/site-packages/xarray/backends/h5netcdf_.py", line 54, in _getitem
    return array[key]
  File "/usr/local/lib/python3.8/site-packages/h5netcdf/core.py", line 157, in __getitem__
    return self._h5ds[key]
  File "h5py/_objects.pyx", line 54, in h5py._objects.with_phil.wrapper
  File "h5py/_objects.pyx", line 55, in h5py._objects.with_phil.wrapper
  File "/usr/local/lib/python3.8/site-packages/h5py/_hl/dataset.py", line 790, in __getitem__
    self.id.read(mspace, fspace, arr, mtype, dxpl=self._dxpl)
  File "h5py/_objects.pyx", line 54, in h5py._objects.with_phil.wrapper
  File "h5py/_objects.pyx", line 55, in h5py._objects.with_phil.wrapper
  File "h5py/h5d.pyx", line 192, in h5py.h5d.DatasetID.read
  File "h5py/_proxy.pyx", line 112, in h5py._proxy.dset_rw
  File "h5py/h5fd.pyx", line 162, in h5py.h5fd.H5FD_fileobj_read
  File "/usr/local/lib/python3.8/site-packages/fsspec/spec.py", line 1457, in readinto
    data = self.read(out.nbytes)
  File "/usr/local/lib/python3.8/site-packages/fsspec/spec.py", line 1442, in read
    raise ValueError("I/O operation on closed file.")
ValueError: I/O operation on closed file.
rabernat commented 3 years ago

Thanks for the issue.

Can you give a little more context. What code produced this error?

sharkinsspatial commented 3 years ago

Running this recipe the failure is occurring on open_dataset. Currently running on a cluster with a single worker node using the following versions (this PR branch of xarray is necessary to support NetCDFToZarrRecipe use of safe_chunks.

"botocore==1.19.52",
"s3fs==0.6.0",
"boto3==1.16.52",
"xarray@git+https://github.com/pydata/xarray.git@refs/pull/5065/merge#egg=xarray",
"dask-cloudprovider==2021.3.1",
"rechunker@git+https://github.com/pangeo-data/rechunker#egg=rechunker",
"pangeo_forge@git+https://github.com/pangeo-forge/pangeo-forge#egg=pangeo_forge",
rabernat commented 3 years ago

Ok and how are you "running" it? Via the prefect executor? Can you share the code that is loading and executing the recipe?

We need to get to a reproducer that we can pass around, for example, to @martindurant, to dig into the fsspec part of this. Clearly the recipe works in other contexts, e.g. https://pangeo-forge.readthedocs.io/en/latest/tutorials/netcdf_zarr_sequential.html

sharkinsspatial commented 3 years ago

@rabernat The is being run using the PrefectPipelineExecutor against our AWS bakery with a Prefect agent and dynamically created Dask cluster running on our Fargate infrastructure and S3 storage. For @martindurant to view detailed logs it may be most straightforward for me to add him to our development Prefect Cloud account directly. This is the code which loads and executes the recipe from the meta.yaml using hardcoded values to instantiate the DaskExecutor

import yaml
import os
import importlib
from rechunker.executors import PrefectPipelineExecutor
import dask.distributed as distributed
from distributed import PipInstall
from prefect import storage
from prefect.executors import DaskExecutor
from prefect.run_configs import ECSRun
from typing import TypedDict, Literal
from s3fs import S3FileSystem
from pangeo_forge.storage import FSSpecTarget, CacheFSSpecTarget

definition = yaml.safe_load(
    """
    networkMode: awsvpc
    cpu: 1024
    memory: 2048
    containerDefinitions:
        - name: flow
    """
)
definition["executionRoleArn"] = "arn:aws:iam::552819999234:role/pangeo-forge-aws-bakery-p-prefectecstaskexecutionr-5ZGMH0A8LVXF"
worker_image = "552819999234.dkr.ecr.us-west-2.amazonaws.com/pangeo-forge-aws-bakery-worker"

with open("meta.yaml") as f:
    meta = yaml.load(f, Loader=yaml.FullLoader)
    for recipe_meta in meta["recipes"]:
        module_path = os.path.abspath(recipe_meta["module"])
        spec = importlib.util.spec_from_file_location(recipe_meta["name"], module_path)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        recipe = module.recipe
        fs = S3FileSystem(anon=False, default_cache_type='none', default_fill_cache=False,)
        target_path = f"s3://pangeo-forge-aws-bakery-flowcachebucketpangeofor-196cpck7y0pbl/target/{recipe_meta['id']}"
        target = FSSpecTarget(fs, target_path)
        recipe.target = target
        cache_path = f"s3://pangeo-forge-aws-bakery-flowcachebucketpangeofor-196cpck7y0pbl/cache/{recipe_meta['id']}"
        cache_target = CacheFSSpecTarget(fs, cache_path)
        recipe.input_cache = cache_target
        recipe.metadata_cache = target

        pipeline = recipe.to_pipelines()

        dask_executor = DaskExecutor(
            cluster_class="dask_cloudprovider.aws.FargateCluster",
            cluster_kwargs={
                "image": worker_image,
                "vpc": "vpc-0e519fd83fa521d72",
                "cluster_arn": "arn:aws:ecs:us-west-2:552819999234:cluster/pangeo-forge-aws-bakery-pangeo-forge-dask-bakeryclusterpangeoforgedask71B831F8-BTL3Vmp8cuso",
                "task_role_arn": "arn:aws:iam::552819999234:role/pangeo-forge-aws-bakery-p-prefectecstaskrolepangeo-3R73K3Z1XU70",
                "execution_role_arn": "arn:aws:iam::552819999234:role/pangeo-forge-aws-bakery-p-prefectecstaskexecutionr-5ZGMH0A8LVXF",
                "security_groups": ["sg-0c4d6e997637c801d"],
                "n_workers": 1,
                "scheduler_cpu": 1024,
                "scheduler_mem": 2048,
                "worker_cpu": 1024,
                "worker_mem": 4096,
                "scheduler_timeout": "15 minutes",
                "tags": {
                    "Project": "pangeo-forge",
                    "Recipe": recipe_meta["id"],
                }
            },
        )
        executor = PrefectPipelineExecutor()
        flow = executor.pipelines_to_plan(pipeline)
        flow.storage = storage.S3(bucket="pangeo-forge-aws-bakery-flowstoragebucketpangeof-71w6gsnambj9")
        flow.run_config = ECSRun(
            image=worker_image,
            labels=["dask_test"],
            task_definition=definition,
            run_task_kwargs={
                "tags": [
                    {"key": "Project", "value": "pangeo-forge"},
                    {"key": "Recipe", "value": recipe_meta["id"]}
                ]
            },
        )
        flow.executor = dask_executor
        flow.name = recipe_meta["id"]
        flow.register(project_name="pangeo-forge-aws-bakery")

For testing purposes I have running flows via Prefect Cloud console to view log aggregation rather than using execute_plan.

rabernat commented 3 years ago

Sorry to keep asking questions! ... but can you link me to the worker logs? Do you have the pangeo_forge logs turned on?

sharkinsspatial commented 3 years ago

@rabernat Let me kick off a new run with the log level set for debug and collect the worker logs for you.

sharkinsspatial commented 3 years ago

@rabernat I attempted 4 more full runs without reproducing the error so I'm unsure what the root cause may have been. Regardless it was a good exercise as I was able to investigate some of the logging points raised in https://github.com/pangeo-forge/pangeo-forge/issues/92 and develop a good potential approach. I'll post my notes in https://github.com/pangeo-forge/pangeo-forge/issues/92.

rabernat commented 3 years ago

Ok, glad it resolved, but nevertheless we should keep an eye on this. Intermittent bugs are the hardest to squash!

cisaacstern commented 3 years ago

Do we think https://github.com/pangeo-forge/pangeo-forge-recipes/pull/171 fixed this?