Closed kbren closed 6 months ago
Might be related to this issue: https://github.com/pangeo-forge/pangeo-forge-recipes/issues/517
Thanks for raising this issue. I agree it may be related to #517, but difficult to say for sure without a minimal reproducible example.
@kbren, could you post a self-contained example of the recipe you're working on, which when executed raises this error?
Thanks!
@kbren, is this issue still present with pangeo-forge-recipes==0.10.0
?
@cisaacstern thanks for the heads up! I'm just getting back from vacation. I'll check to see if this is resolved, but it might take me a few days to get to it.
Hello, I've been encountering this same issue. I've been working on this with @kbren and we are still unsure of what specific circumstances that cause this error to be raised. Here's the context of the setup:
We are working with files that are split by months and contain hourly data. For example the month of January looks like this:
<xarray.Dataset>
Dimensions: (time: 744, height: 3, sn: 9, we: 7)
Coordinates:
* height (height) int64 0 1 2
* sn (sn) int64 0 1 2 3 4 5 6 7 8
* time (time) datetime64[ns] 2010-01-01 ... 2010-01-31T23:00:00
* we (we) int64 0 1 2 3 4 5 6
Data variables:
fvar (time, height, sn, we) int64 dask.array<chunksize=(744, 3, 9, 7), meta=np.ndarray>
We are concatenating on the 'time' dimension across multiple months. All of the other dimensions are consistent across files. Here's a sample of the pipeline we are running:
with beam.Pipeline(options=phandle.options.pipeline_options) as p:
(p | beam.Create(side_args["pattern"].items())
| OpenWithXarray(file_type=side_args["pattern"].file_type)
| Preprocess()
| StoreToZarr(
target_root=side_args["target_root"],
store_name=side_args["store_name"],
combine_dims=side_args["pattern"].combine_dim_keys,
target_chunks=side_args["target_chunk"])
)
The issue depends on what we specify for 'target_chunks' for StoreToZarr. For example these target chunks work fine:
{"time":1000,"sn":4}
If we try to chunk across the 'we' dimension with target chunks {"time":1000,"sn":4,"we":4}
the error is raised: in combine_fragments raise ValueError("Cannot combine fragments because they do not form a regular hypercube.") ValueError: Cannot combine fragments because they do not form a regular hypercube. [while running 'StoreToZarr/Rechunk/MapTuple(combine_fragments)']
Where that error is raised based on the output of "_invert_meshgrid": https://github.com/pangeo-forge/pangeo-forge-recipes/blob/f4ca53d3c61c1a321cdddcb69bd4635c971d2baa/pangeo_forge_recipes/rechunking.py#L217-L225
I've also saved the inputs passed to _invert_meshgrid for target_chunks that do work and attempts that didn't work that I can share if it's helpful
Perhaps relevant, if we specify the target_chunk for time to be 24, it works for chunking with {"time":24,"sn":4,"we":4}
as well as whatever chunk sizes we want for sn, we dimensions.
TLDR: What are the constraints on the target chunks that can be used in the StoreToZarr transform?
Thank You!
@tom-the-hill thanks for the detailed report.
TLDR: What are the constraints on the target chunks that can be used in the StoreToZarr transform?
I don't think this is fully defined at this point. In particular, I note that the place we should be testing our expectations of combine_fragments
errors...
...does not actually test for this particular error. It would be a great contribution if you and/or @kbren would consider extending our testing in this area to capture this error. I believe having a test that triggers this error would go a long way to building our shared understanding of how it arises.
If you are interested in working on this, I'm happy to review/discuss approaches on GitHub and/or video. In brief, we'd need to:
Add a block in that test something like:
with pytest.raises(ValueError, match="do not form a regular hypercube"):
_ = combine_fragments(group, [(index0, ds), (index1, ds)])
[(index0, ds), (index1, ds)]
inputs that trigger this error.Where it gets a bit tricky is that combine_fragments
is not aware of target_chunks
per se. That is handled by split_fragment
, which generates the input fragments for combine_fragments
. While admittedly a bit dense, the following block of code from one of the other rechunking tests demonstrates how to get from target_chunks
through to calling split_fragment
(using synthetic test data):
If you click through to the continuation of this ☝️ test, you can see how the outputs of split_fragment
are passed on to combine_fragments
.
@cisaacstern Sorry for the delayed response. Thank you for pointing me to that test, it really helped to understand the process of indexing, splitting and combining. With minimal changes to the existing test "test_split_and_combine_fragments_with_merge_dim" I believe I can illustrate my question. The only changes were to pass "target_chunks" instead of "time_chunks" and comment out assertions checking for "time_chunks".
def test_split_and_combine_fragments_with_merge_dim(nt_dayparam,target_chunks,do_print=False):
"""Test if sub-fragments split from datasets with merge dims can be combined with each other."""
def dprint(s):
if do_print:
print(s)
else:
pass
#target_chunks = {"time": time_chunks}
nt, dayparam = nt_dayparam
ds = make_ds(nt=nt)
dprint(f"original ds {ds}")
dsets, _, _ = split_up_files_by_variable_and_day(ds, dayparam)
dprint(f"{dsets = }")
# replicates indexes created by IndexItems transform.
time_positions = {t: i for i, t in enumerate(ds.time.values)}
merge_dim = Dimension("variable", CombineOp.MERGE)
concat_dim = Dimension("time", CombineOp.CONCAT)
indexes = [
Index(
{
merge_dim: Position((0 if "bar" in ds.data_vars else 1)),
concat_dim: IndexedPosition(time_positions[ds.time[0].values], dimsize=nt),
}
)
for ds in dsets
]
dprint(f"{indexes = }")
# split the (mock indexed) datasets into sub-fragments.
# the splits list are nested tuples which are a bit confusing for humans to think about.
# create a namedtuple to help remember the structure of these tuples and cast the
# elements of splits list to this more descriptive type.
splits = [
list(split_fragment((index, ds), target_chunks=target_chunks))
for index, ds in zip(indexes, dsets)
]
dprint(f"{splits = }")
Subfragment = namedtuple("Subfragment", "groupkey, content")
subfragments = list(itertools.chain(*[[Subfragment(*s) for s in split] for split in splits]))
# combine subfragments, starting by grouping subfragments by groupkey.
# replicates behavior of `... | beam.GroupByKey() | beam.MapTuple(combine_fragments)`
# in the `Rechunk` transform.
groupkeys = set([sf.groupkey for sf in subfragments])
grouped_subfragments: dict[GroupKey, list[Subfragment]] = {g: [] for g in groupkeys}
for sf in subfragments:
grouped_subfragments[sf.groupkey].append(sf)
dprint(f"{grouped_subfragments = }")
for g in sorted(groupkeys):
# just confirms that grouping logic within this test is correct
assert all([sf.groupkey == g for sf in grouped_subfragments[g]])
# for the merge dimension of each subfragment in the current group, assert that there
# is only one positional value present. this verifies that `split_fragments` has not
# grouped distinct merge dimension positional values together under the same groupkey.
merge_position_vals = [sf.content[0][merge_dim].value for sf in grouped_subfragments[g]]
assert all([v == merge_position_vals[0] for v in merge_position_vals])
# now actually try to combine the fragments
_, ds_combined = combine_fragments(
g,
[sf.content for sf in grouped_subfragments[g]],
)
dprint(f"{ds_combined = }")
# ensure vars are *not* combined (we only want to concat, not merge)
assert len([k for k in ds_combined.data_vars.keys()]) == 1
# check that time chunking is correct
#if nt % time_chunks == 0:
# assert len(ds_combined.time) == time_chunks
#else:
# # if `nt` is not evenly divisible by `time_chunks`, all chunks will be of
# # `len(time_chunks)` except the last one, which will be the lenth of the remainder
# assert len(ds_combined.time) in [time_chunks, nt % time_chunks]
Running that test with the parameter nt_dayparam=(8,"4D")
to emulate having two files with 4 time samples each I got these results for different target chunks.
For chunking on 1 or 2 dimensions, any chunk sizes seemed to work e.g: target_chunks={"time":1}
and target_chunks={"time":3,"lat":5}
However, when chunking by 3 dimensions the test only passes when chunk size for time evenly divides into the number of time samples per file ( for this example it is "4D" , 4 time samples ). For example these chunk sizes work: target_chunks={"time":1 or 2 or 4,"lat":5,"lon":7}
but target_chunks={"time":3,"lat":5,"lon":7}
fails and raises ValueError: Cannot combine fragments because they do not form a regular hypercube.
So if chunking by 3 or more dimensions, it seems that the first chunk size must evenly divide into the size of that dimension per file?
Incredible investigation, @tom-the-hill!
So if chunking by 3 or more dimensions, it seems that the first chunk size must evenly divide into the size of that dimension per file?
I think the work you've done above makes you perhaps our resident expert on this subject! So I'd defer to you on that 😄 .
Could you take what you've illustrated above and turn it into a PR? Ideally we'd use pytest parametrization (other examples should be available in the same testing module, or elsewhere in the tests) to preserve the existing test coverage, while also adding the test cases you describe here.
Once we can see the test in a PR, I could envision perhaps adding a more descriptive error along the lines you've described, but would help to see the parametrized test first.
xref https://github.com/leap-stc/cmip6-leap-feedstock/issues/72#issuecomment-1829848197 which appears to be the same issue
Per a meeting I had this week with @kbren and @tom-the-hill, the next action point here is for Tom to open a PR with a failing test that minimally reproduces this error. Our working hypothesis is that this is a corner case bug which needs to be fixed (not user error), and the failing test will give us something concrete to work against.
Let me know if I can help in any way here. This has quite a high priority on my end.
I think I'm hitting this one also with my ssebop recipe, which fails if time chunk !=1.
This is becoming a major roadblock for both the Pangeo/ESGF CMIP6 Zarr 2.0 data and our data ingestion at LEAP and seems like a central issue for PGF. @cisaacstern is there any way we can carve out some time to tackle this?
Thanks for weighing in, all!
And apologies for the delay here.
I have blocked off tomorrow to work on this.
Thanks, @cisaacstern! Although in my case, ignorance was to blame. I did get this working once I set target_chunk
equal to a factor of len(time)
! 🤕
Thanks all for your patience. To the best of my understanding, this is now fixed by #708 and released in 0.10.6
. Please re-open this issue or open a new one if you find otherwise! Thanks for everyone's patience and collaboration on this!
Charles this is huge! This bug has been bugging us for almost a year!!!
🚀 🚀 🚀
Credit to @jbusecke and @ta-hill for major contributions towards the fix!
I've been running the beam-refactor branch following the example of netcdf_zarr_sequential.ipynb with a dataset with dimensions (time, height, lat, lon) and am running into an error at the following assertion.
https://github.com/pangeo-forge/pangeo-forge-recipes/blob/97a7b3c211263ada4d0d73770027af782f58e8af/pangeo_forge_recipes/rechunking.py#L126-L128
I'm wondering if the _invert_meshgrid() function does not generalize to more than two dimensions? Specifically, np.meshgrid() only acts on the first two dimensions, so the dimensions of "actual" and "expected" do not match. When I comment out this assertion, things seem to run.
See the full error message below:
Traceback
``` Traceback (most recent call last): File "/homedirs/katibren/develop/pangeo-forge-recipes/pangeo_forge_recipes/rechunking.py", line 197, in combine_fragments starts = _invert_meshgrid(*starts_cube[::-1])[::-1] File "/homedirs/katibren/develop/pangeo-forge-recipes/pangeo_forge_recipes/rechunking.py", line 128, in _invert_meshgrid assert all( AssertionError During handling of the above exception, another exception occurred: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1418, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 625, in apache_beam.runners.common.SimpleInvoker.invoke_process File "/homedirs/katibren/miniconda3/envs/pangeo-forge-beam/lib/python3.10/site-packages/apache_beam/transforms/core.py", line -1, in