earth-mover / icechunk

Open-source, cloud-native transactional tensor storage engine
https://icechunk.io
Apache License 2.0
291 stars 17 forks source link

Zero values after saving to a local store with dask localcluster. #383

Closed sagunkayastha closed 1 week ago

sagunkayastha commented 1 week ago

I am trying to combine multiple CMAQ datasets. I am following quickstart guide. When I save the dataset with dask local cluster and client the values are all zeros. Everything works as expected when not using dask.

cluster = LocalCluster(n_workers=32, threads_per_worker=2, memory_limit='16GB')
client = Client(cluster)

storage_config = StorageConfig.filesystem("./ice_cmaq")
store = IcechunkStore.create(storage_config)

def preprocess(ds):
    ds = ds.isel(TSTEP=slice(0, 24))  # Select the first 24 timesteps
    ds = ds.drop_vars('TFLAG')  # Drop the TFLAG data variable
    ds = ds.sel(LAY=0)  # Select LAY=0
    return ds

cmaq_ds = xr.open_mfdataset(cmaq_paths, preprocess=preprocess, concat_dim='TSTEP', combine='nested')
cmaq_ds[['O3', 'NO2']].to_zarr(store, zarr_format=3, consolidated=False)
store.commit("added cmaq")
dcherian commented 1 week ago

Thanks for the nice bug report!

Yes only the threaded scheduler is supported at the moment: https://github.com/earth-mover/icechunk/issues/185

This will work once https://github.com/earth-mover/icechunk/pull/357 is merged, which is waiting on an upstream dask release. I expect to get it in next week.

sagunkayastha commented 1 week ago

Thank you for the quick response.

rabernat commented 1 week ago

We should find some way to warn if people try to use a standard distributed write. I can see this being a real footgun.

paraseba commented 1 week ago

@sagunkayastha until we have the nice dask array interface working, we are using a lower level approach to do distributed writes, unfortunately with an uglier interface. You can find an example here. Let us know if you want to know more about it.