pangeo-forge / pangeo-forge-recipes

Python library for building Pangeo Forge recipes.
https://pangeo-forge.readthedocs.io/
Apache License 2.0
126 stars 54 forks source link

Error opening zipped tiff collection via OpenURLWithFSSpec recipe #659

Open thodson-usgs opened 11 months ago

thodson-usgs commented 11 months ago

I was unable to open a collection of zipped tiffs using the OpenURLWithFSSpec recipe. I tested the semantics with fsspec and xarray, and everything worked in my test notebook but failed when I built them into a pangeo-forge recipe:

SystemError: <class 'rasterio._err.CPLE_OpenFailedError'> returned a result with an exception set [while running 'Create|OpenURLWithFSSpec|OpenWithXarray|Preprocess|StoreToZarr/OpenWithXarray/Open with Xarray']

In the end, I was able to work around and open the tiffs directly with rioxarray (recipe.py); however, I believe it would be better if the recipes worked as intended.

Here's an example that will generate the error. I believe I have isolated the problem to OpenURLWithFSSpec, because avoiding OpenWithXarray will yield the same error, so the problem seems to be with the former or with xarray.

from datetime import date

import apache_beam as beam
import pandas as pd
import xarray as xr

from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.transforms import Indexed, OpenURLWithFSSpec, OpenWithXarray, StoreToZarr, T

# note the the filepattern differs from the working example
input_url_pattern = (
    'https://edcintl.cr.usgs.gov/downloads/sciweb1/shared/uswem/web/'
    'conus/eta/modis_eta/daily/downloads/'
    'det{yyyyjjj}.modisSSEBopETactual.zip'
)

start = date(2001, 1, 1)
end = date(2022, 10, 7)
dates = pd.date_range(start, end, freq='1D')

def make_url(time: pd.Timestamp) -> str:
    return input_url_pattern.format(yyyyjjj=time.strftime('%Y%j'))

pattern = FilePattern(make_url,
                      ConcatDim(name='time', keys=dates, nitems_per_file=1))
pattern = pattern.prune()

class Preprocess(beam.PTransform):
    """Preprocessor transform."""

    @staticmethod
    def _preproc(item: Indexed[T]) -> Indexed[xr.Dataset]:
        import numpy as np

        index, f = item
        time_dim = index.find_concat_dim('time')
        time_index = index[time_dim].value
        time = dates[time_index]

        da = rioxarray.open_rasterio(f.open()).drop('band')
        da = da.rename({'x': 'lon', 'y': 'lat'})
        ds = da.to_dataset(name='aet')
        ds = ds.expand_dims(time=np.array([time]))

        return index, ds

    def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
        return pcoll | beam.Map(self._preproc)

recipe = (
    beam.Create(pattern.items())
    | OpenURLWithFSSpec(open_kwargs={'compression': 'zip'})
    | OpenWithXarray(xarray_open_kwargs={'engine': 'rasterio'})
    | Preprocess()
    | StoreToZarr(
        store_name='us-ssebop.zarr',
        target_root='.',
        combine_dims=pattern.combine_dim_keys,
        target_chunks={'time': 1, 'lat': int(2834 / 2), 'lon': int(6612 / 6)},
    )
)

with beam.Pipeline() as p:
    p | recipe
thodson-usgs commented 11 months ago

If someone picks this up, I'd also be curious what tricks they use to debug Beam.

ranchodeluxe commented 9 months ago

maybe @moradology can help look into this too @thodson-usgs (as we talked about at today's meeting)