pangeo-data / rechunker

Disk-to-disk chunk transformation for chunked arrays.
https://rechunker.readthedocs.io/
MIT License
162 stars 25 forks source link

Time Dimensions unified after rechunk #130

Closed meteoDaniel closed 1 year ago

meteoDaniel commented 1 year ago

Dear community,

I am facing a strange behaviour of rechunk.

<xarray.Dataset>
Dimensions:        (y: 900, x: 900, time: 720)
Coordinates:
    lat            (y, x) float64 dask.array<chunksize=(225, 225), meta=np.ndarray>
    lon            (y, x) float64 dask.array<chunksize=(225, 225), meta=np.ndarray>
  * time           (time) datetime64[ns] 2006-04-01T00:45:00 ... 2006-04-30T2...
Dimensions without coordinates: y, x
Data variables:
    precipitation  (time, y, x) float32 dask.array<chunksize=(45, 113, 113), meta=np.ndarray>

This is the head of my zarr archive stored in an s3 bucket.

I used different target_cunks:

{'precipitation': {'y': 10, 'x': 10, 'time': 720},
 'lat': {'y': 10, 'x': 10},
 'lon': {'y': 10, 'x': 10},
 'time': 720}
{'precipitation': {'y': 10, 'x': 10, 'time': 720},
 'lat': {'y': 10, 'x': 10},
 'lon': {'y': 10, 'x': 10},
 'time': None}

Both of them causing a unification of the time axis:

<xarray.Dataset>
Dimensions:        (y: 900, x: 900, time: 720)
Coordinates:
    lat            (y, x) float64 dask.array<chunksize=(10, 10), meta=np.ndarray>
    lon            (y, x) float64 dask.array<chunksize=(10, 10), meta=np.ndarray>
  * time           (time) datetime64[ns] 2006-04-01T00:45:00 ... 2006-04-01T0...
Dimensions without coordinates: y, x
Data variables:
    precipitation  (time, y, x) float32 dask.array<chunksize=(720, 10, 10), meta=np.ndarray>

I was able to generate a working example for you:

import xarray as xr
import pandas as pd
import numpy as np
import zarr
import rechunker
time = pd.date_range("2000-01-01", freq="6H", periods=365 * 4)
ds = xr.Dataset({"foo": ("time", np.arange(365 * 4)), "time": time})
ds.to_zarr('test.zarr')

group = zarr.open_consolidated('test.zarr', mode="r")
_ = rechunker.rechunk(
    group, {'foo': {'time': 10}, 'time': None}, '1GB', 'rechunked.zarr', temp_store='tmp.zarr'
)
zarr.convenience.consolidate_metadata('rechunked.zarr')
ds = xr.open_dataset('rechunked.zarr', engine='zarr', consolidated=True)

ds will contain 365*4 the same timestamp.

I hope there is a simple solution and I am doing somethin wrong. Otherwise I hope we can fix asap.

Best regards Daniel

rabernat commented 1 year ago

Can you clarify what you mean by "unification of the time axis"?

meteoDaniel commented 1 year ago

Every timestamp is the same.

Before: * time (time) datetime64[ns] 2006-04-01T00:45:00 ... 2006-04-30T2...

After: * time (time) datetime64[ns] 2006-04-01T00:45:00 ... 2006-04-01T0...

meteoDaniel commented 1 year ago

Here is the whole trace of the example:

In [1]: import xarray as xr
   ...: import pandas as pd
   ...: import numpy as np
   ...: import zarr
   ...: import rechunker

In [2]: time = pd.date_range("2000-01-01", freq="6H", periods=365 * 4)
   ...: 

In [3]: ds = xr.Dataset({"foo": ("time", np.arange(365 * 4)), "time": time})
   ...: ds.to_zarr('test.zarr')
Out[3]: <xarray.backends.zarr.ZarrStore at 0x7f856c571f90>

In [4]: group = zarr.open_consolidated('test.zarr', mode="r")
   ...: _ = rechunker.rechunk(
   ...:     group, {'foo': {'time': 10}, 'time': None}, '1GB', 'rechunked.zarr', temp_store='tmp.zarr'
   ...: )

In [5]: zarr.convenience.consolidate_metadata('rechunked.zarr')
   ...: 
Out[5]: <zarr.hierarchy.Group '/'>

In [6]: _ds = xr.open_dataset('rechunked.zarr', engine='zarr', consolidated=True)
---------------------------------------------------------------------------
OverflowError                             Traceback (most recent call last)
/usr/local/lib/python3.9/site-packages/xarray/coding/times.py in decode_cf_datetime(num_dates, units, calendar, use_cftime)
    260         try:
--> 261             dates = _decode_datetime_with_pandas(flat_num_dates, units, calendar)
    262         except (KeyError, OutOfBoundsDatetime, OverflowError):

/usr/local/lib/python3.9/site-packages/xarray/coding/times.py in _decode_datetime_with_pandas(flat_num_dates, units, calendar)
    216         warnings.filterwarnings("ignore", "invalid value encountered", RuntimeWarning)
--> 217         pd.to_timedelta(flat_num_dates.min(), delta) + ref_date
    218         pd.to_timedelta(flat_num_dates.max(), delta) + ref_date

/usr/local/lib/python3.9/site-packages/pandas/core/tools/timedeltas.py in to_timedelta(arg, unit, errors)
    147     # ...so it must be a scalar value. Return scalar.
--> 148     return _coerce_scalar_to_timedelta_type(arg, unit=unit, errors=errors)
    149 

/usr/local/lib/python3.9/site-packages/pandas/core/tools/timedeltas.py in _coerce_scalar_to_timedelta_type(r, unit, errors)
    155     try:
--> 156         result = Timedelta(r, unit)
    157     except ValueError:

/usr/local/lib/python3.9/site-packages/pandas/_libs/tslibs/timedeltas.pyx in pandas._libs.tslibs.timedeltas.Timedelta.__new__()

/usr/local/lib/python3.9/site-packages/pandas/_libs/tslibs/timedeltas.pyx in pandas._libs.tslibs.timedeltas.convert_to_timedelta64()

/usr/local/lib/python3.9/site-packages/pandas/_libs/tslibs/conversion.pyx in pandas._libs.tslibs.conversion.cast_from_unit()

OverflowError: Python int too large to convert to C long

During handling of the above exception, another exception occurred:

ModuleNotFoundError                       Traceback (most recent call last)
<ipython-input-6-c9c4614803e5> in <module>
----> 1 _ds = xr.open_dataset('rechunked.zarr', engine='zarr', consolidated=True)

/usr/local/lib/python3.9/site-packages/xarray/backends/api.py in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, backend_kwargs, *args, **kwargs)
    493 
    494     overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 495     backend_ds = backend.open_dataset(
    496         filename_or_obj,
    497         drop_variables=drop_variables,

/usr/local/lib/python3.9/site-packages/xarray/backends/zarr.py in open_dataset(self, filename_or_obj, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta, group, mode, synchronizer, consolidated, chunk_store, storage_options, stacklevel)
    812         store_entrypoint = StoreBackendEntrypoint()
    813         with close_on_error(store):
--> 814             ds = store_entrypoint.open_dataset(
    815                 store,
    816                 mask_and_scale=mask_and_scale,

/usr/local/lib/python3.9/site-packages/xarray/backends/store.py in open_dataset(self, store, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta)
     37         )
     38 
---> 39         ds = Dataset(vars, attrs=attrs)
     40         ds = ds.set_coords(coord_names.intersection(vars))
     41         ds.set_close(store.close)

/usr/local/lib/python3.9/site-packages/xarray/core/dataset.py in __init__(self, data_vars, coords, attrs)
    748             coords = coords.variables
    749 
--> 750         variables, coord_names, dims, indexes, _ = merge_data_and_coords(
    751             data_vars, coords, compat="broadcast_equals"
    752         )

/usr/local/lib/python3.9/site-packages/xarray/core/merge.py in merge_data_and_coords(data, coords, compat, join)
    481     explicit_coords = coords.keys()
    482     indexes = dict(_extract_indexes_from_coords(coords))
--> 483     return merge_core(
    484         objects, compat, join, explicit_coords=explicit_coords, indexes=indexes
    485     )

/usr/local/lib/python3.9/site-packages/xarray/core/merge.py in merge_core(objects, compat, join, combine_attrs, priority_arg, explicit_coords, indexes, fill_value)
    630         coerced, join=join, copy=False, indexes=indexes, fill_value=fill_value
    631     )
--> 632     collected = collect_variables_and_indexes(aligned)
    633 
    634     prioritized = _get_priority_vars_and_indexes(aligned, priority_arg, compat=compat)

/usr/local/lib/python3.9/site-packages/xarray/core/merge.py in collect_variables_and_indexes(list_of_mappings)
    289                 append_all(coords, indexes)
    290 
--> 291             variable = as_variable(variable, name=name)
    292 
    293             if variable.dims == (name,):

/usr/local/lib/python3.9/site-packages/xarray/core/variable.py in as_variable(obj, name)
    152                 "conflict with the coordinates used to label dimensions."
    153             )
--> 154         obj = obj.to_index_variable()
    155 
    156     return obj

/usr/local/lib/python3.9/site-packages/xarray/core/variable.py in to_index_variable(self)
    526     def to_index_variable(self):
    527         """Return this variable as an xarray.IndexVariable"""
--> 528         return IndexVariable(
    529             self.dims, self._data, self._attrs, encoding=self._encoding, fastpath=True
    530         )

/usr/local/lib/python3.9/site-packages/xarray/core/variable.py in __init__(self, dims, data, attrs, encoding, fastpath)
   2670         # Unlike in Variable, always eagerly load values into memory
   2671         if not isinstance(self._data, PandasIndexingAdapter):
-> 2672             self._data = PandasIndexingAdapter(self._data)
   2673 
   2674     def __dask_tokenize__(self):

/usr/local/lib/python3.9/site-packages/xarray/core/indexing.py in __init__(self, array, dtype)
   1270 
   1271     def __init__(self, array: pd.Index, dtype: DTypeLike = None):
-> 1272         self.array = utils.safe_cast_to_index(array)
   1273 
   1274         if dtype is None:

/usr/local/lib/python3.9/site-packages/xarray/core/utils.py in safe_cast_to_index(array)
    113         if hasattr(array, "dtype") and array.dtype.kind == "O":
    114             kwargs["dtype"] = object
--> 115         index = pd.Index(np.asarray(array), **kwargs)
    116     return _maybe_cast_to_cftimeindex(index)
    117 

/usr/local/lib/python3.9/site-packages/xarray/core/indexing.py in __array__(self, dtype)
    421     def __array__(self, dtype=None):
    422         array = as_indexable(self.array)
--> 423         return np.asarray(array[self.key], dtype=None)
    424 
    425     def transpose(self, order):

/usr/local/lib/python3.9/site-packages/xarray/coding/variables.py in __array__(self, dtype)
     68 
     69     def __array__(self, dtype=None):
---> 70         return self.func(self.array)
     71 
     72     def __repr__(self):

/usr/local/lib/python3.9/site-packages/xarray/coding/times.py in decode_cf_datetime(num_dates, units, calendar, use_cftime)
    261             dates = _decode_datetime_with_pandas(flat_num_dates, units, calendar)
    262         except (KeyError, OutOfBoundsDatetime, OverflowError):
--> 263             dates = _decode_datetime_with_cftime(
    264                 flat_num_dates.astype(float), units, calendar
    265             )

/usr/local/lib/python3.9/site-packages/xarray/coding/times.py in _decode_datetime_with_cftime(num_dates, units, calendar)
    191 def _decode_datetime_with_cftime(num_dates, units, calendar):
    192     if cftime is None:
--> 193         raise ModuleNotFoundError("No module named 'cftime'")
    194     return np.asarray(
    195         cftime.num2date(num_dates, units, calendar, only_use_cftime_datetimes=True)

ModuleNotFoundError: No module named 'cftime'

In [7]: _ds = xr.open_dataset('rechunked.zarr', engine='zarr', consolidated=True)

In [8]: _ds
Out[8]: 
<xarray.Dataset>
Dimensions:  (time: 1460)
Coordinates:
  * time     (time) datetime64[ns] 2000-01-01 2000-01-01 ... 2000-01-01
Data variables:
    foo      (time) int64 ...

In [9]: _ds.time
Out[9]: 
<xarray.DataArray 'time' (time: 1460)>
array(['2000-01-01T00:00:00.000000000', '2000-01-01T00:00:00.000000000',
       '2000-01-01T00:00:00.000000000', ..., '2000-01-01T00:00:00.000000000',
       '2000-01-01T00:00:00.000000000', '2000-01-01T00:00:00.000000000'],
      dtype='datetime64[ns]')
Coordinates:
  * time     (time) datetime64[ns] 2000-01-01 2000-01-01 ... 2000-01-01
rabernat commented 1 year ago

Thanks for this bug report. I agree it seems like a big problem!

Can you open the data with Zarr (bypassing xarray and its encoding stuff) and show the values and info of the time variable. Something like

import zarr
group = zarr.open_group('rechunked.zarr')
print(group.time.info)
print(dict(group.time.attrs))
print(group.time[:])
meteoDaniel commented 1 year ago

This is the output of attributes for the test dataset.


In [13]: time = pd.date_range("2000-01-01", freq="6H", periods=365 * 4)
    ...: ds = xr.Dataset({"foo": ("time", np.arange(365 * 4)), "time": time})
    ...: ds.to_zarr('test.zarr')
    ...: 
    ...: group = zarr.open_consolidated('test.zarr', mode="r")
    ...: _ = rechunker.rechunk(
    ...:     group, {'foo': {'time': 10}, 'time': None}, '1GB', 'rechunked.zarr', temp_store='tmp.zarr'
    ...: )

In [14]: print(group.time.info)
    ...: print(dict(group.time.attrs))
    ...: print(group.time[:])
Name               : /time
Type               : zarr.core.Array
Data type          : int64
Shape              : (1460,)
Chunk shape        : (1460,)
Order              : C
Read-only          : True
Compressor         : Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)
Store type         : zarr.storage.ConsolidatedMetadataStore
Chunk store type   : zarr.storage.DirectoryStore
No. bytes          : 11680 (11.4K)
Chunks initialized : 1/1

{'_ARRAY_DIMENSIONS': ['time'], 'calendar': 'proleptic_gregorian', 'units': 'hours since 2000-01-01 00:00:00'}
[   0    6   12 ... 8742 8748 8754]
rabernat commented 1 year ago

In your code above, it looks like you are showing test.zarr.

Now show the same thing for rechunked.zarr please.

meteoDaniel commented 1 year ago

Well, I mixed up some stuff. I am sorry.

After consolidated:

Name               : /time
Type               : zarr.core.Array
Data type          : int64
Shape              : (1460,)
Chunk shape        : (1460,)
Order              : C
Read-only          : False
Compressor         : Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)
Store type         : zarr.storage.DirectoryStore
No. bytes          : 11680 (11.4K)
No. bytes stored   : 456
Storage ratio      : 25.6
Chunks initialized : 0/1

{'_ARRAY_DIMENSIONS': ['time'], 'calendar': 'proleptic_gregorian', 'units': 'hours since 2000-01-01 00:00:00'}
[     140212323873504      140212323873504       94537209791872 ...
 -8458139203682520985      140211011937920      140211011939776]

Before consolidated:

In [29]: print(group.time.info)
    ...: print(dict(group.time.attrs))
    ...: print(group.time[:])
Name               : /time
Type               : zarr.core.Array
Data type          : int64
Shape              : (1460,)
Chunk shape        : (1460,)
Order              : C
Read-only          : False
Compressor         : Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)
Store type         : zarr.storage.DirectoryStore
No. bytes          : 11680 (11.4K)
No. bytes stored   : 456
Storage ratio      : 25.6
Chunks initialized : 0/1

{'_ARRAY_DIMENSIONS': ['time'], 'calendar': 'proleptic_gregorian', 'units': 'hours since 2000-01-01 00:00:00'}
[140212323873504  94537210008576  94537210008576 ... 140212269177152
               3 140212319897968]
rabernat commented 1 year ago

Ok, so this is telling you something very important. No data are being written for the time variable:

Chunks initialized : 0/1

One wild guess...maybe your syntax is off? Try

target_chunks = {
  'precipitation': {'y': 10, 'x': 10, 'time': 720},
 'lat': {'y': 10, 'x': 10},
 'lon': {'y': 10, 'x': 10},
 'time': {'time': 720}
}
meteoDaniel commented 1 year ago

Can't see that this change anything in the example:

In [34]: time = pd.date_range("2000-01-01", freq="6H", periods=365 * 4)
    ...: ds = xr.Dataset({"foo": ("time", np.arange(365 * 4)), "time": time})
    ...: ds.to_zarr('test.zarr')
    ...: 
    ...: group = zarr.open_consolidated('test.zarr', mode="r")
    ...: _ = rechunker.rechunk(
    ...:     group, {'foo': {'time': 10}, 'time': {'time': 10}}, '1GB', 'rechunked.zarr', temp_store='tmp.zarr'
    ...: )

In [35]: 

In [35]: zarr.convenience.consolidate_metadata('rechunked.zarr')
    ...: 
Out[35]: <zarr.hierarchy.Group '/'>

In [36]: group = zarr.open_group('rechunked.zarr')

In [37]: print(group.time.info)
    ...: print(dict(group.time.attrs))
    ...: print(group.time[:])
Name               : /time
Type               : zarr.core.Array
Data type          : int64
Shape              : (1460,)
Chunk shape        : (10,)
Order              : C
Read-only          : False
Compressor         : Blosc(cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=0)
Store type         : zarr.storage.DirectoryStore
No. bytes          : 11680 (11.4K)
No. bytes stored   : 454
Storage ratio      : 25.7
Chunks initialized : 0/146

{'_ARRAY_DIMENSIONS': ['time'], 'calendar': 'proleptic_gregorian', 'units': 'hours since 2000-01-01 00:00:00'}
[140212323873504 140212323873504  94537209888400 ... 140212269606336
              77 140212320088816]

These are the target chunks : {'foo': {'time': 10}, 'time': {'time': 10}} schould be equivalent to the real world example suggestion from your side.

rabernat commented 1 year ago

Quite a puzzle. 🤔 Why is the time variable not being written? Do you get any errors or warnings during the rechunk? Which executor are you using?

rabernat commented 1 year ago

Ok I found your problem! No data are being written at all for any of your arrays. You are missing a very important step. Can you figure it out by reading the docs?

meteoDaniel commented 1 year ago

Thats it.

Thanks for this fast debugging session. It was a bit misleading to see the files exist before the .execute() step.

rabernat commented 1 year ago

It was a bit misleading to see the files exist before the .execute() step.

Yeah I agree this is confusing. It has to do with how we use Xarray to initialize the target stores, but I agree it is not super intuitive.