fsspec / kerchunk

Cloud-friendly access to archival data
https://fsspec.github.io/kerchunk/
MIT License
308 stars 80 forks source link

Despite more times available, dataset read from `MultiZarrToZarr` returns specific number time steps #343

Closed kthyng closed 1 year ago

kthyng commented 1 year ago

This is weird error but here goes...

I am going through the kerchunk steps for ROMS model output files and it works. BUT, no matter how many single file zarr metadata files I pass to MultiZarrToZarr and the resultant translated product, the dataset I read in stops at about a year's worth of time steps. Is there a built in limit somewhere that I need to adjust? These are local files.

The basic code is below. If json_list includes the single file metadata from

import kerchunk.hdf
import fsspec
import pathlib
from kerchunk.combine import MultiZarrToZarr
import ujson

fs2 = fsspec.filesystem('')  #local file system to save final jsons to

so = dict(default_fill_cache=False, default_cache_type='first')

for u in urls:
    outf = out_base / pathlib.Path(u.stem.split(".")[-1]).with_suffix(".json")
    if not outf.exists():
        with fsspec.open(u, **so) as inf:
            h5chunks = kerchunk.hdf.SingleHdf5ToZarr(inf, str(u), inline_threshold=100)
            with fs2.open(outf, 'wb') as f:
                f.write(ujson.dumps(h5chunks.translate()).encode());

json_list = fs2.glob("ciofs_kerchunk2/1999*.json") # combine single json files
json_list += fs2.glob("ciofs_kerchunk2/2000*.json") # combine single json files

mzz = MultiZarrToZarr(
    json_list,
    concat_dims=["ocean_time"],
    identical_dims= ['lat_rho', 'lon_rho', "lon_psi", "lat_psi",
                    "lat_u", "lon_u", "lat_v", "lon_v", 
                    "Akk_bak","Akp_bak","Akt_bak","Akv_bak","Cs_r","Cs_w",
                    "FSobc_in","FSobc_out","Falpha","Fbeta","Fgamma","Lm2CLM",
                    "Lm3CLM", "LnudgeM2CLM", "LnudgeM3CLM", "LnudgeTCLM",
                    "LsshCLM", "LtracerCLM", "LtracerSrc", "LuvSrc",
                    "LwSrc", "M2nudg", "M2obc_in", "M2obc_out", "M3nudg",
                    "M3obc_in", "M3obc_out", "Tcline", "Tnudg","Tobc_in", "Tobc_out",
                    "Vstretching", "Vtransform", "Znudg", "Zob", "Zos", "angle",
                    "dstart", "dt", "dtfast", "el", "f", "gamma2", "grid", "h",
                    "hc", "mask_psi", "mask_rho", "mask_u", "mask_v", "nHIS", "nRST",
                    "nSTA", "ndefHIS",  "ndtfast", "ntimes", "pm", "pn", "rdrg", 
                    "rdrg2", "rho0", "spherical", "theta_b", "theta_s", "xl",
                    ],
)

# to keep in memory
out = mzz.translate()

import xarray as xr
ds = xr.open_dataset(
    "reference://", engine="zarr",
    backend_kwargs={
        "storage_options": {
            "fo": out,
        },
        "consolidated": False
    }
)
martindurant commented 1 year ago

I recommend you run mzz.first_pass and have a look at the coords attribute to see what it populated there. If you have suggestions on how we can do more logging feedback, would happily oblige.

kthyng commented 1 year ago

Ok thanks I'll take a detailed look there.

kthyng commented 1 year ago

I understand the problem now.

ncdump -v ocean_time /mnt/vault/ciofs/HINDCAST/1999/axiom.ciofs.fields.hindcast.1999_0001.nc gives ocean_time = 3600, 7200, 10800, 14400, 18000, 21600, 25200, 28800, 32400, 36000, 39600, 43200, 46800, 50400, 54000, 57600, 61200, 64800, 68400, 72000, 75600, 79200, 82800, 86400 ; and ocean_time has units ocean_time:units = "seconds since 1999-01-01 00:00:00" ;

and

ncdump -v ocean_time /mnt/vault/ciofs/HINDCAST/2000/axiom.ciofs.fields.hindcast.2000_0001.nc gives ocean_time = 3600, 7200, 10800, 14400, 18000, 21600, 25200, 28800, 32400, 36000, 39600, 43200, 46800, 50400, 54000, 57600, 61200, 64800, 68400, 72000, 75600, 79200, 82800, 86400 ;

and ocean_time has units ocean_time:units = "seconds since 2000-01-01 00:00:00" ;

Is there a way to account for this in the kerchunk set up?

kthyng commented 1 year ago

I'm planning to fix this problem using nco if I can't get around it with kerchunk directly. If not, a warning that the concatenation variable units have to stay the same would be great.

As an example, the following modifies the units. I also need to go change the values:

ncap2 -O -s 'ocean_time@units="seconds since 1999-01-01 00:00:00"' axiom.ciofs.fields.hindcast.2000_0001.nc out.nc
martindurant commented 1 year ago

Ah, good old C-F. kerchunk takes the actual values of the coordinates at face value unless you specify to try to use C-F decoding. In MultiZarrToZarr's doc on selectors, arguments to coo_map=:

            - "cf:{var}", interpret the value of var using cftime, returning a datetime.
              These will be automatically re-encoded with cftime, *unless* you specify an
              "M8[*]" dtype for the coordinate, in which case a conversion will be attempted.
kthyng commented 1 year ago

@martindurant Awesome, this worked. For reference the MultiZarr command now looks like this and accurately maps time according to both their value and their unit:

json_list = fs2.glob("ciofs_kerchunk2/1999_0[2,3]??.json") # combine single json files
json_list += fs2.glob("ciofs_kerchunk2/2000*.json") # combine single json files
json_list = sorted(json_list)
mzz = MultiZarrToZarr(
    json_list,
    concat_dims=["ocean_time"],
    identical_dims= ['lat_rho', 'lon_rho', "lon_psi", "lat_psi",
                    "lat_u", "lon_u", "lat_v", "lon_v", 
                    "Akk_bak","Akp_bak","Akt_bak","Akv_bak","Cs_r","Cs_w",
                    "FSobc_in","FSobc_out","Falpha","Fbeta","Fgamma","Lm2CLM",
                    "Lm3CLM", "LnudgeM2CLM", "LnudgeM3CLM", "LnudgeTCLM",
                    "LsshCLM", "LtracerCLM", "LtracerSrc", "LuvSrc",
                    "LwSrc", "M2nudg", "M2obc_in", "M2obc_out", "M3nudg",
                    "M3obc_in", "M3obc_out", "Tcline", "Tnudg","Tobc_in", "Tobc_out",
                    "Vstretching", "Vtransform", "Znudg", "Zob", "Zos", "angle",
                    "dstart", "dt", "dtfast", "el", "f", "gamma2", "grid", "h",
                    "hc", "mask_psi", "mask_rho", "mask_u", "mask_v", "nHIS", "nRST",
                    "nSTA", "ndefHIS",  "ndtfast", "ntimes", "pm", "pn", "rdrg", 
                    "rdrg2", "rho0", "spherical", "theta_b", "theta_s", "xl",
                    ],
    coo_map = {"ocean_time": "cf:ocean_time"},
)

Thank you!!

kyle-hinson commented 4 months ago

Thanks @martindurant and @kthyng for this thread, has been very helpful as I've tried to work through a similar issue! I'm encountering a slightly different error when I try and use a similar argument for coo_map=. Running the mzz.first_pass() function gives me the error message below.

mzz.first_pass()

---------------------------------------------------------------------------
OverflowError                             Traceback (most recent call last)
Cell In[255], line 1
----> 1 mzz.first_pass()

File ~/micromamba/lib/python3.11/site-packages/kerchunk/combine.py:258, in MultiZarrToZarr.first_pass(self)
    256 z = zarr.open_group(fs.get_mapper(""))
    257 for var in self.concat_dims:
--> 258     value = self._get_value(i, z, var, fn=self._paths[i])
    259     if isinstance(value, np.ndarray):
    260         value = value.ravel()

File ~/micromamba/lib/python3.11/site-packages/kerchunk/combine.py:234, in MultiZarrToZarr._get_value(self, index, z, var, fn)
    232 units = datavar.attrs.get("units")
    233 calendar = datavar.attrs.get("calendar", "standard")
--> 234 o = cftime.num2date(o, units=units, calendar=calendar)
    235 if self.cf_units is None:
    236     self.cf_units = {}

File src/cftime/_cftime.pyx:617, in cftime._cftime.num2date()

File src/cftime/_cftime.pyx:414, in cftime._cftime.cast_to_int()

OverflowError: time values outside range of 64 bit signed integers

The units for the ocean_time variables are 'seconds since 1975-01-01 00:00:00', so there shouldn't be overlapping values for this dimension among different files. If I don't specify arguments to coo_map=, then I can only extract a dataset with 54 time-steps (should be ~30,000 from the nearly 10,000 JSON files) that all have bizarre values:

ds.ocean_time
array([-2.753701e+091, -2.897806e+079, -7.635061e-147,            nan,
        4.940656e-324,  7.905050e-323,  1.009376e-320,  5.215020e-310,
        5.215020e-310,  5.215020e-310,  5.215020e-310,  5.215020e-310,
        5.215020e-310,  6.951729e-310,  6.951729e-310,  6.951729e-310,
        6.951729e-310,  6.951729e-310,  6.951730e-310,  6.951730e-310,
        6.951730e-310,  6.951730e-310,  1.433150e-287,  1.458349e-287,
        1.483547e-287,  1.508745e-287,  1.533943e-287,  1.559141e-287,
        1.584339e-287,  1.609537e-287,  1.634735e-287,  1.659934e-287,
        1.685132e-287,  1.710330e-287,  1.735528e-287,  1.760726e-287,
        1.785924e-287,  1.811122e-287,  1.836320e-287,  1.861518e-287,
        1.886717e-287,  1.911915e-287,  1.937113e-287,  1.962311e-287,
        1.987509e-287,  2.012707e-287,  2.037905e-287,  2.063103e-287,
        2.088302e-287,  2.113500e-287,  2.138698e-287,  2.163896e-287,
        3.587487e+180,  5.812385e+180])

Would appreciate any suggestions about how to work through either of these issues, thanks so much!

For reference, here is my full code that I applied to create a set of JSON files and link them together after pulling NetCDFs from a remote repository.

from tempfile import TemporaryDirectory

import fsspec
import ujson
import xarray as xr
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr
from tqdm import tqdm

base_root = "s3://swem-scholarly-storage/hinson_data_archive_2023/Continuous/ChesROMS-ECB_Outputs/LongRun_"

# All directories from data archive
year_choices = ["1980_1984", "1985_1989", "1990_1994",
                "1995_1999", "2000_2004", "2005_2009",
                "2010_2014", "2015_2019", "2020_2024",
                "2025_2029", "2030_2034", "2035_2039",
                "2040_2044", "2045_2049", "2050_2054",
                "2055_2059", "2060_2064"]

# Grab all files from each 5-year directory
frange = range(0,61)
rfiles = []
for i in frange:
    iday = i + 1
    rfiles.append("roms_avg_" + f"{iday:04}" + ".nc")

base_urls = []
for ygrab in year_choices:
    for j in rfiles:
        base_urls.append(base_root + ygrab +"/" + j)

so = dict(mode="rb", anon=True, default_fill_cache=False, default_cache_type="first")
output_dir = "./"

# We are creating a temporary directory to store the .json reference
# files. Alternately, you could write these to cloud storage.
td = TemporaryDirectory()
temp_dir = td.name

# Use Kerchunk's `SingleHdf5ToZarr` method to create a `Kerchunk` index
# from a NetCDF file.

def generate_json_reference(u, temp_dir: str):
    with fs_read.open(u, **so) as infile:
        h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
        fname = u.split("/")[-2] + "_" + u.split("/")[-1].strip(".nc")
        outf = f"{fname}.json"
        with open(outf, "wb") as f:
            f.write(ujson.dumps(h5chunks.translate()).encode())
        return outf

# Iterate through filelist to generate Kerchunked files
output_files = []
for fil in tqdm(base_urls):
    outf = generate_json_reference(fil, temp_dir)
    output_files.append(outf)

mzz = MultiZarrToZarr(
    output_files,
    concat_dims=["ocean_time"],
    identical_dims=["tracer", "boundary", "s_rho", "s_w", "eta_rho", "xi_rho",
                   "eta_u", "xi_u", "eta_v", "xi_v", "eta_psi", "xi_psi"],
    coo_map = {"ocean_time": "cf:ocean_time"},
)
martindurant commented 4 months ago

The units for the ocean_time variables are 'seconds since 1975-01-01 00:00:00'

Can you verify that this is indeed the case throughout the dataset? My suspicion is that the epoch changes time to time, and you are seeing integer number of seconds somehow interpreted as floats. You should check that the times of the individual (pre-combine) datasets are interpreted correctly throughout the range before trying to convert them.

kyle-hinson commented 4 months ago

Thanks @martindurant for your response! I'm not sure that I can find discrepancies in the time units when they are read in versus those in the original file.

The NetCDF file shows this information in ncdump -h roms_avg_0001.nc

        double ocean_time(ocean_time) ;
                ocean_time:long_name = "averaged time since initialization" ;
                ocean_time:units = "seconds since 1975-01-01 00:00:00" ;
                ocean_time:calendar = "proleptic_gregorian" ;
                ocean_time:field = "time, scalar, series" ;

Below is the output from ncdump -v ocean_time roms_avg_0001.nc

 ocean_time = 157809600, 157896000, 157982400, 158068800, 158155200, 
    158241600, 158328000, 158414400, 158500800, 158587200, 158673600, 
    158760000, 158846400, 158932800, 159019200, 159105600, 159192000, 
    159278400, 159364800, 159451200, 159537600, 159624000, 159710400, 
    159796800, 159883200, 159969600, 160056000, 160142400, 160228800, 
    160315200 ;

These values look fine for multiple files, all with the same attributes. Using python to read in a single file using the commands below, it looks to me like the additional attributes for ocean_time (e.g. units and calendar) are already accounted for, but aren't listed explicitly?

fs = s3fs.S3FileSystem(anon=True)

fileObj = fs.open(base_urls[0], mode="rb")
ds = xr.open_dataset(fileObj, engine='h5netcdf', chunks={'ocean_time': 1})

array(['1980-01-01T12:00:00.000000000', '1980-01-02T12:00:00.000000000',
       '1980-01-03T12:00:00.000000000', '1980-01-04T12:00:00.000000000',
       '1980-01-05T12:00:00.000000000', '1980-01-06T12:00:00.000000000',
       '1980-01-07T12:00:00.000000000', '1980-01-08T12:00:00.000000000',
       '1980-01-09T12:00:00.000000000', '1980-01-10T12:00:00.000000000',
       '1980-01-11T12:00:00.000000000', '1980-01-12T12:00:00.000000000',
       '1980-01-13T12:00:00.000000000', '1980-01-14T12:00:00.000000000',
       '1980-01-15T12:00:00.000000000', '1980-01-16T12:00:00.000000000',
       '1980-01-17T12:00:00.000000000', '1980-01-18T12:00:00.000000000',
       '1980-01-19T12:00:00.000000000', '1980-01-20T12:00:00.000000000',
       '1980-01-21T12:00:00.000000000', '1980-01-22T12:00:00.000000000',
       '1980-01-23T12:00:00.000000000', '1980-01-24T12:00:00.000000000',
       '1980-01-25T12:00:00.000000000', '1980-01-26T12:00:00.000000000',
       '1980-01-27T12:00:00.000000000', '1980-01-28T12:00:00.000000000',
       '1980-01-29T12:00:00.000000000', '1980-01-30T12:00:00.000000000'],
      dtype='datetime64[ns]')

Attributes:
long_name : averaged time since initialization
field : time, scalar, series

The ocean_time values that I find for any number of individual files look fine to me as well. Previously, I was using the code below to open multiple files and the ocean_time field worked without issue, it just takes a very long time and seems to fail with this many files.

fs = s3fs.S3FileSystem(anon=True)

fObjects = []

for kurl in base_urls:
    fileObj = fs.open(kurl, mode="rb")
    fObjects.append(fileObj)

ds = xr.open_mfdataset(fObjects, engine='h5netcdf', chunks={'ocean_time': 1})
martindurant commented 4 months ago

Are these files public?

kyle-hinson commented 4 months ago

Yes, they're all publicly available here under Continuous/ChesROMS-ECB_Outputs/

martindurant commented 4 months ago

Trying it with just the first two files produced no problem

out = kerchunk.hdf.SingleHdf5ToZarr("roms_avg_0001.nc").translate()
out2 = kerchunk.hdf.SingleHdf5ToZarr("roms_avg_0002.nc").translate()
mzz = MultiZarrToZarr(
    [out, out2],
    concat_dims=["ocean_time"],
    identical_dims=["tracer", "boundary", "s_rho", "s_w", "eta_rho", "xi_rho",
                   "eta_u", "xi_u", "eta_v", "xi_v", "eta_psi", "xi_psi"],
    coo_map = {"ocean_time": "cf:ocean_time"},
)
final = mzz.translate()
ds = xr.open_dataset(final, engine="kerchunk")

producing ocean_time

<xarray.DataArray 'ocean_time' (ocean_time: 60)>
array(['1980-01-01T12:00:00.000000000', '1980-01-02T12:00:00.000000000',
       '1980-01-03T12:00:00.000000000', '1980-01-04T12:00:00.000000000',
       '1980-01-05T12:00:00.000000000', '1980-01-06T12:00:00.000000000',
...
martindurant commented 4 months ago

Adding in the first file from the second tranche (1985) still works ok, despite the gap in the timeline.

kyle-hinson commented 4 months ago

Thanks Martin for testing this! Your code also works fine for me if I download the files to my machine first. Is SingleHdf5ToZarr still intended to work with the JSON files that I've created? Below is some code that I've modified based on @kthyng's example. This still throws me the error OverflowError: time values outside range of 64 bit signed integers.

fs2 = fsspec.filesystem('')  #local file system to save final jsons to

so = dict(default_fill_cache=False, default_cache_type='first')

for u in base_urls:
    outf = pathlib.Path(u.split("/")[-2] + "_" + u.split("/")[-1].strip(".nc")).with_suffix(".json")
    if not outf.exists():
        with fsspec.open(u, **so) as inf:
            h5chunks = kerchunk.hdf.SingleHdf5ToZarr(inf, str(u), inline_threshold=100)
            with fs2.open(outf, 'wb') as f:
                f.write(ujson.dumps(h5chunks.translate()).encode());

json_list = fs2.glob("LongRun*.json") # combine single json files

# Concatenate JSON files
mzz = MultiZarrToZarr(
    json_list,
    concat_dims=["ocean_time"],
    identical_dims=["tracer", "boundary", "s_rho", "s_w", "eta_rho", "xi_rho",
                   "eta_u", "xi_u", "eta_v", "xi_v", "eta_psi", "xi_psi"],
    coo_map = {"ocean_time": "cf:ocean_time"},
)
final = mzz.translate()
ds = xr.open_dataset(final, engine="kerchunk")

Is there something in my code that could be altering ocean_time when I try and create the JSON files? Or is this the wrong way to go about creating the JSON files that I then feed into the MultiZarrToZarr function?

kthyng commented 4 months ago

examples.zip

Not sure how helpful this will be, but here are two example scripts I have for creating kerchunk files for two separate ROMS ocean models.

martindurant commented 4 months ago

Is SingleHdf5ToZarr still intended to work with the JSON files that I've created?

Yes, it should work whether the source file or reference file is local or remote.

Is there something in my code that could be altering ocean_time

I don't immediately see anything. You should look at the references "ocean_time/0", "ocean_time/.zattrs", "ocean_time/.zarray" for some of the reference sets to see if there's a discrepancy.

I notice you have inline_threshold set, so the data reference ("/0") should be some bytes that evaluates to ~ints like 157809600~ ; they are floats "<f8", not ints. I wonder if the "cf:" mapper assumes they are int. Also, the chunk size is (512, ), but only 30 values are valid values, shape=(30,), and the rest of the data is filled with 9.96920997e+36. I'm not sure if this is a problem - it still worked for me - but it's certainly it's odd.

kyle-hinson commented 4 months ago

Thanks @kthyng and @martindurant for your helpful responses! I tried implementing some of the example code from @kthyng but ran into the error NoCredentialsError: Unable to locate credentials when trying to implement the code below, specifically for the line with fsspec.open(u, **so) as inf:

for u in base_urls:
    outf = pathlib.Path(u.split("/")[-2] + "_" + u.split("/")[-1].strip(".nc")).with_suffix(".json")
    if not outf.exists():
        with fsspec.open(u, **so) as inf:
            h5chunks = kerchunk.hdf.SingleHdf5ToZarr(inf, str(u), inline_threshold=200)
            with fs2.open(outf, 'wb') as f:
                f.write(ujson.dumps(h5chunks.translate()).encode());

However, I was able to make more headway by following some of the steps from the kerchunk medium tutorial. The code below was able to rapidly (within ~5 minutes) open the list of JSONs and then read them in as a combined dataset without any problems in concatenating along the time dimension.

fname_overall_kerchunk = "ciofs_kerchunk.json"
json_list = fs2.glob("LongRun*.json") # combine single json files

m_list = []
for j in tqdm(json_list):
    with open(j) as f:
        m_list.append(fsspec.get_mapper("reference://", 
                        fo=ujson.load(f),
                        remote_protocol='s3',
                        remote_options={'anon':True}))

ds = xr.open_mfdataset(m_list, engine='zarr', combine='nested', concat_dim='ocean_time', 
                        coords='minimal', data_vars='minimal', compat='override',
                        parallel=True)

I appreciate all your suggestions in helping me figure out this error, this solution is now working well for me!

martindurant commented 4 months ago

You may want to talk with @TomNicholas about why xarray can concat your data but MZZ apparently can't. Since I never got as far as your problem, I can't say more, but if having xarray o the concat for you on every load is good enough for you, that's fine.

TomNicholas commented 4 months ago

@kyle-hinson @kthyng you should now be able to do this with VirtualiZarr, since https://github.com/zarr-developers/VirtualiZarr/pull/122 was merged. If you do

import glob
import xarray as xr
from virtualizarr import open_virtual_dataset

vds_list = [
    open_virtual_dataset(file, loadable_variables=['time'], cftime_variables=['ocean_time'], indexes={})
    for file in glob.glob("LongRun*.json")
]

vds = xr.combine_nested(
    vds_list, 
    concat_dim='ocean_time',
    coords='minimal',
    data_vars='minimal',
    compat='override',
)

vds.virtualize.to_kerchunk(...)  # this creates a set of kerchunk references that point to all of your files at once, with the ocean_time variable "inlined" 

then you will literally be using xarray to handle the concatenation.

In fact we have a roundtrip test thats extremely similar to your problem.