fsspec / kerchunk

Cloud-friendly access to archival data
https://fsspec.github.io/kerchunk/
MIT License
307 stars 78 forks source link

Tree reduction problem #299

Open pl-marasco opened 1 year ago

pl-marasco commented 1 year ago

Following this example I'm trying to replicate the tree reduction approch with the Copernicus Global Land products TOC 300m.
In this specific case I've reduced the test to a simple combination of the archive along the time dimension (that's contained in the the filename). To avoid other issues I've previously added the time dimension.

Using the tree reduction approach I get a wrongly structured Dataset that has a single dimension on the concat_dim ( in my case a time coordinate ). With the simpler approch I've no problem; this bring me to think that the issue isn't coming from the data.

During the computation I get this warning:

UserWarning: Concatenated coordinate 'time' contains less than expectednumber of values across the datasets: [0]
  warnings.warn(

On top of this issue if I try to retreat the time value from the file name throught the coo_map this fails once he try to aggregate the data.

Here the code I'm using even if, as is super simple, I would say that nothing is wrong with in it : A small sub sample of the original data can be feached here

local_fs = LocalFileSystem()
input_nc =   r"tmp/S3/time/cgl_TOC_*_X2[1-2]Y0[5-6]_*A_*.nc"
input_json = r"tmp/S3/time/cgl_TOC_*_X2[1-2]Y0[5-6]_*A_*.json"
final_output = r"tmp/S3/ensamble.json"

def process(ith_path, outputfile, storage_options_in={}, storage_options_out={}):
    transformer = kerchunk.hdf.SingleHdf5ToZarr(ith_path, inline_threshold=300)
    refs = transformer.translate()
    with fsspec.open(outputfile, mode="wt", **storage_options_out) as f:
        ujson.dump(refs, f)

scan = glob.iglob(
    input_nc, recursive=True
)

infilenames = []
outfilenames = []

for ith_path in scan:
    if not os.path.isfile(ith_path.replace(".nc", ".json")):
        out_path = os.path.join(
            os.path.dirname(ith_path),
            os.path.basename(ith_path).replace(".nc", ".json"),
        )
        outfilenames.append(out_path)
        infilenames.append(ith_path)

tasks = [dask.delayed(process)(u, o) for u, o in zip(infilenames, outfilenames)]
dask.compute(tasks)

json_list = sorted(
    glob.iglob(input_json, recursive=True)
)
json_split = np.array_split(
    np.array(json_list), 2
)
json_split = [list(x) for x in json_split]

def multi_multizarr(json_list):
    mzz = MultiZarrToZarr(
        json_list,
        remote_protocol="file",
        concat_dims=["time"],
        identical_dims=["lat", "lon"],
    )
    return mzz.translate()

jsons = []
for json in json_split:
    x = dask.delayed(multi_multizarr)(json)
    jsons.append(x)

d = dask.delayed(multi_multizarr)(jsons)
d.compute()

d = multi_multizarr(json_list)

with local_fs.open(final_output, "wb") as f:
    f.write(ujson.dumps(d, indent=4).encode())

# Data 
ds = xr.open_dataset(
        "reference://",
        engine="zarr",
        backend_kwargs={
            "storage_options": {
                "fo": final_output,
            },
            "consolidated": False,
        },
    )
ds
martindurant commented 1 year ago

Before reading the details here, have you seen auto_dask ? It will do the tree reduction for you with whatever dask setup you have ready. The critical point is that the arguments to MultiZarrToZarr in the branches stage are not the same as in the final trunk stage - the function tries to get this right for you.

pl-marasco commented 1 year ago

I didn't as the dataset is covering a large area with a pretty big time dimension, and I thought that the classical approach (with intermediate single files) would have saved some time. Consider that, best scenario, the dataset has to be updated every day worst scenario every 10 days.

Anyhow, following your suggestion, I've tested the auto_dask and it works only if I use the coo_map to get the date from the filename. With a super small subset, no problem but I've to test with the entire archive.

martindurant commented 1 year ago

it works only if

What problem do you face?