google / xarray-beam

Distributed Xarray with Apache Beam
https://xarray-beam.readthedocs.io
Apache License 2.0
126 stars 7 forks source link

Better documentation of behaviour with irregular source chunks #33

Open mjwillson opened 2 years ago

mjwillson commented 2 years ago

Hiya!

In some source datasets the original chunks have irregular sizes. For example one netcdf file per year, where some years are leap years hence shorter than others.

Although the data model would seem to support this in principle, I couldn't get Rechunk or ConsolidateChunks to work without regular source chunk sizes.

Is this a fundamental limitation or something that could be in scope to address? And do you have any recommendations for getting data like this into regular chunks? In the leap year case, first splitting the source chunks into one chunk per day using SplitChunks could be an option. Although in general this would require splitting into chunks of gcd(*all_possible_source_chunk_sizes) which could be too small.

shoyer commented 2 years ago

This should totally work! This happens internally in rechunking all the time, if the source and destination chunks are not a multiple of the other.

If you can provide an reproducible example, that would be very helpful for narrowing this down.

And do you have any recommendations for getting data like this into regular chunks? In the leap year case, first splitting the source chunks into one chunk per day using SplitChunks could be an option. Although in general this would require splitting into chunks of gcd(*all_possible_source_chunk_sizes) which could be too small.

You can do this via SplitChunks followed by ConsolidateChunks with the same chunks, as described this section of the docs: https://xarray-beam.readthedocs.io/en/latest/rechunking.html#low-level-rechunking

Xarray-Beam uses a smarter algorithm than splitting into the GCD of all source chunk sizes. Instead, it should only make just the right irregular chunks (in SplitChunks) such that they can be joined together into the new regular chunks (in ConsolidateChunks).

shoyer commented 2 years ago

To give a concrete example, here's what it would look like to rechunk "per year" daily data into a chunk-size of exactly 365 days:

import apache_beam as beam
import pandas as pd
import numpy as np
import xarray
import xarray_beam as xbeam

def make_chunks():
  offset = 0
  for year in range(2010, 2020):
    time = pd.date_range(f'{year}-01-01', f'{year}-12-31', freq='D')
    key = xbeam.Key({'time': offset})
    chunk = xarray.Dataset({'foo': ('time', np.zeros(time.size)), 'time': time})
    yield key, chunk
    offset += time.size

print(
    make_chunks()
    | beam.MapTuple(lambda k, v: v.sizes['time'])
)
# [365, 365, 366, 365, 365, 365, 366, 365, 365, 365]

print(
    make_chunks()
    | xbeam.SplitChunks({'time': 365})
    | beam.MapTuple(lambda k, v: v.sizes['time'])
)
# [365, 365, 365, 1, 364, 1, 364, 1, 364, 1, 364, 2, 363, 2, 363, 2, 363, 2]

print(
    make_chunks()
    | xbeam.SplitChunks({'time': 365})
    | xbeam.ConsolidateChunks({'time': 365})
    | beam.MapTuple(lambda k, v: v.sizes['time'])
)
# [365, 365, 365, 365, 365, 365, 365, 365, 365, 365, 2]
mjwillson commented 2 years ago

Ah that's great, thanks! This solves my problem, but just to give a bit more detail why I was confused about this:

With Rechunk, it explicitly asks for a dict of source_chunks: sizes of source chunks so not clear at all what you would pass for this if the source chunks are irregular, I tried passing None, {} and {"time": None} none of which worked. I now see that if I pass source_chunks={"time": some_arbitrary_incorrect_number} it seems to work, although I'd be wary relying on this?

With ConsolidateChunks: I now see that this does work provided that each consecutive grouping of chunks adds up to the exact requested target chunk size. I was trying to use it without first splitting the chunks into this form, and got a rather opaque AssertionError. A simplified example of what I was attempting:

inputs = [
  (xb.Key({"time": 0}, {"foo"}), xa.Dataset({"foo": (("time",), np.zeros(2))})),
  (xb.Key({"time": 2}, {"foo"}), xa.Dataset({"foo": (("time",), np.ones(7))})),
  (xb.Key({"time": 9}, {"foo"}), xa.Dataset({"foo": (("time",), np.ones(3))})),
]
with beam.Pipeline() as p:
    p | beam.Create(inputs) | xb.ConsolidateChunks(target_chunks={"time": 7})

=> AssertionError: (Key(offsets={'time': 7}, vars={'foo'}), Key(offsets={'time': 9}, vars={'foo'}))

My mistake although a better error would help here.

I also wasn't aware that SplitChunks would work the way it does with irregular chunks (with consecutive groupings in its output summing to the requested chunk size), I had assumed it would split each input chunk independently in a way that wouldn't guarantee this invariant, or that it would only split into an exact divisor of the source chunk size. Although now I see the example under low-level rechunking which clarifies things a bit.

It would help if these functions (SplitChunks, ConsolidateChunks, Rechunk) had some explanation in their docstrings about what invariants they require in their input and guarantee in their output. And for Rechunk, some guidance on whether irregular chunk sizes are supported and if so, what to pass for source_chunks.

Your example above would be great to include in the docs too, to demonstrate how to work with irregularly-chunked source data :)

Thanks again!

shoyer commented 2 years ago

With Rechunk, it explicitly asks for a dict of source_chunks: sizes of source chunks so not clear at all what you would pass for this if the source chunks are irregular, I tried passing None, {} and {"time": None} none of which worked. I now see that if I pass source_chunks={"time": some_arbitrary_incorrect_number} it seems to work, although I'd be wary relying on this?

Indeed, Rechunk can safely handle arbitrary source chunks. They are only used by the heuristic for determining intermediate chunks, so it's OK if they are not exact. The algorithm should handle small descrepancies (like 365 vs 366 elements) just fine.

I agree that this would all be well worth documenting!

shoyer commented 2 years ago

With ConsolidateChunks: I now see that this does work provided that each consecutive grouping of chunks adds up to the exact requested target chunk size. I was trying to use it without first splitting the chunks into this form, and got a rather opaque AssertionError. A simplified example of what I was attempting:

inputs = [
  (xb.Key({"time": 0}, {"foo"}), xa.Dataset({"foo": (("time",), np.zeros(2))})),
  (xb.Key({"time": 2}, {"foo"}), xa.Dataset({"foo": (("time",), np.ones(7))})),
  (xb.Key({"time": 9}, {"foo"}), xa.Dataset({"foo": (("time",), np.ones(3))})),
]
with beam.Pipeline() as p:
    p | beam.Create(inputs) | xb.ConsolidateChunks(target_chunks={"time": 7})

=> AssertionError: (Key(offsets={'time': 7}, vars={'foo'}), Key(offsets={'time': 9}, vars={'foo'}))

My mistake although a better error would help here.

Thinking about this case a little more, I think we could actually probably safely remove this assertion error (after adding suitable documentation).

The behavior of ConsolidateChunks would then be simply to group together all chunks starting within the given multiple of the origin. If existing chunks don't exactly align with the multiples of the desired chunks, then you'll end up with irregular groups, but they'll be roughly the same size. In this example, you'd have one chunk of size 9 and another of size 3.