Open DahnJ opened 4 months ago
@DahnJ this is super interesting!
When working with sparsely populated arrays, the natural question arises: which chunks are populated?
I actually recently had a separate use case where I built a really big Zarr store chunk-by chunk using a big parallel dask operation, and at the end a couple of tasks had failed. The result had some un-initialised chunks, but finding out which ones are not initialized is trickier than it should be. It did not actually occur to me that this problem is closely related to the manifests I built in VirtualiZarr, but you're completely right that it is!
It also reminds me of this little tool that CarbonPlan made for quickly checking how much of a Zarr store had actually been initialized (@andersy005 wrote it I think). I was already going to suggest upstreaming that.
What is the potential relationship to VirtualiZarr? Could they share functionality, or could a single package cover both uses?
I would imagine a lot of this can be upstreamed from VirtualiZarr into zarr-python at least. Reminds me also of @d-v-b 's issue https://github.com/zarr-developers/VirtualiZarr/issues/71, which also asks whether all Zarr arrays should have an intermediate chunk manifest layer.
In the meantime we could mess around with adding methods to VirtualiZarr's ManifestArray
e.g. .chunks_initialized(self) -> np.ndarray
, which would return your numpy array of 1s and 0s, with the same shape as the chunk grid.
FYI non-initialized chunks in VirtualiZarr are currently represented by the numpy array containing the chunk manifests' paths
having entries of empty strings.
Perhaps there's a common functionality for going from the manifest into an xarray object?
Remember that you need more than just the Chunk Manifest itself to get to the xarray representation - you also need the array name, the dim names, and the dtype.
Finally, ZEP 5 (https://github.com/zarr-developers/zarr-specs/pull/205) plans to propose a way to store chunk-level statistics, one of the potential use-cases of MetaArrays.
@rabernat suggested something very similar on https://github.com/zarr-developers/VirtualiZarr/issues/33#issuecomment-2000617606
I certainly agree that there are a lot of use cases for efficiently querying which chunks have data, and possibly storing additional per-chunk statistics. For arrays with a very large number of chunks, you might even want to do this hierarchically.
A related idea would be to have a way to store a binary mask indicating which individual elements have been written, in case writes can be unaligned to chunk boundaries, but in the common case where the mask value is same throughout an entire chunk, some way to efficiently represent that, similar to this MetaArray.
One issue that can come up with zarr arrays is the fact that there can be ambiguity between a chunk that was never written, and a chunk that was written but happens to be equal to the fill value at all positions. In some implementations, chunks that are equal to the fill value are not stored.
I'm not entirely clear how you are proposing to use the chunk manifest format. As currently proposed the chunk manifest would allow you to do list operations very cheaply, and therefore you could easily figure out which chunks are present, and modulo the issue I mentioned about chunks that are logically written but not stored because they are equal to the fill value, this would tell you which chunks have been written. The issue with the chunk manifest format is that, being a single json file either stored separately or embedded in the array metadata file, it is mostly a read-only format. If writes are supported at all, you are likely to run into contention if trying to concurrently write from multiple machines, unless you have some very specialized JSON storage system designed to support concurrent writes.
The chunk manifest, operating at the key-value store level, also doesn't very naturally offer a way to store any richer information than just whether a chunk is present or not.
I'm not entirely clear how you are proposing to use the chunk manifest format
it is mostly a read-only format
The chunk manifest, operating at the key-value store level, also doesn't very naturally offer a way to store any richer information than just whether a chunk is present or not.
My initial understanding of this issue (@DahnJ please correct me if I'm wrong), was that it's more concerned with giving users access to useful in-memory representations about the state of the chunk grid. It's less about actually changing the chunk manifest format on-disk. Perhaps it would have made more sense to raise it on the zarr-python repo instead?
But then this
Their updating & consistency would then be completely taken care of by the storage transformer for the chunk manifest
makes it sound much more like the whole arraylake database-of-chunks idea...
Thank you for the quick feedback @TomNicholas and @jbms. It's great to hear that this seems useful and that you've even already needed querying information about initialized chunks!
I think that the crux here is what the ambitions for the chunk manifest are.
I would therefore say that as long as chunk manifest supports changes made to the zarr array, it's a good candidate to use for MetaArrays. In such a case, the issue becomes only about providing a useful in-memory representation, exactly as @TomNicholas notes.
Remember that you need more than just the Chunk Manifest itself to get to the xarray representation - you also need the array name, the dim names, and the dtype.
Yes, you're right. For the MetaArrays, we also require the coordinate arrays. That way the user can query populated chunks in the label space – answering questions such as "do we have 2022 data for Ecuador".
chunk manifest, operating at the key-value store level, also doesn't very naturally offer a way to store any richer information than just whether a chunk is present or not.
Right, storing such information would be an extension of the manifest's planned capabilities. We can thus focus on the primary goal here – chunk initialization information, which the chunk manifest, as planned, does represent.
Thanks for the additional details @TomNicholas. The fact that uninitialized chunks are already represented is great and hacking the chunks_initialized
method thus sounds like a great way to continue exploring the potential open-source variant of the in-memory representation of our MetaArrays! I will try to get a bit more familiar with VirtualiZarr myself.
binary mask indicating which individual elements have been written, in case writes can be unaligned to chunk boundaries
This is a good point. We restrict our writes to entire chunks precisely for that reason. That allows us to then only store information at the chunk level.
One issue that can come up with zarr arrays is the fact that there can be ambiguity between a chunk that was never written, and a chunk that was written but happens to be equal to the fill value at all positions. In some implementations, chunks that are equal to the fill value are not stored.
Thanks for noting that, the behaviour around write_empty_chunks
is definitely something to be aware of and have clarity around when dealing with chunk initialization information.
This is a very cool idea and it does address one of the challenges people inevitably encounter when they start scaling zarr arrays. But I also agree with two points @jbms made:
The issue with the chunk manifest format is that, being a single json file either stored separately or embedded in the array metadata file, it is mostly a read-only format. If writes are supported at all, you are likely to run into contention if trying to concurrently write from multiple machines, unless you have some very specialized JSON storage system designed to support concurrent writes.
To elaborate, the basic premise of a conventionally chunked (i.e, unsharded) array is that concurrent chunk write operations are independent. A chunk manifest / index adds a synchronization step, which makes concurrent write operations dependent. This introduces new complexity (e.g., write contention) and new failure modes (e.g., writing a chunk succeeds but updating the manifest fails). New complexity is fine (and necessary in many cases) but it's important to map it out.
The chunk manifest, operating at the key-value store level, also doesn't very naturally offer a way to store any richer information than just whether a chunk is present or not.
I would rephrase this as "why not use a database / tabular format here?" Of course if we stick with vanilla zarr there are only two options, JSON or an array, and the array option looks more attractive than JSON. But if you want a rich, dynamic representation of a chunk mutation, then you probably want to store a record with multiple data types for each chunk (e.g, a creation time stamp, a modification timestamp, a size, some summary stats of the values in the chunk, etc), and in that case a Zarr array is going to be a very poor fit IMO. So I would be curious to see an analysis of how adding a non-zarr ingredient to the mix could solve this problem, weighed against the complexity that adds.
I will also throw out a random thought that occurred to me: zarr doesn't have a good way of structurally expressing a dependency relationship between arrays. But this could be solved if arrays could contain other arrays. If we support nested arrays, then it would be very natural to store a meta array as a sub-array of the array it describes. I'm not suggesting we do this, just pointing out that it solves a particular problem.
Thanks @dvb for providing helpful additional detail. You're right – this complexity absolutely needs to be mapped out. For the purposes of this issue, as long as the chunk manifest allows for updates, then I think it's a sufficient solution to power the MetaArrays (e.g. as a feature in VirtualiZarr).
I will wait for the community to converge on a concrete proposal for the chunk manifest – and potentially join in the conversation. Hopefully this issue serves as additional motivation for chunk manifests to go beyond just a read-only JSON file.
Discussing the details of the chunk manifest is not in the scope of this issue. However, to help the manifest discussion with the additional context from this issue, here's a summary of what's been said so far and some of my thoughts.
As noted, updating both chunks and the manifest brings in a potential failure where chunks are successfully written, but the manifest update fails. This could cause an inconsistent state where the manifest contains the wrong data (e.g. wrong chunk length).
Possible solution: One method would be to provide a layer of indirection and store chunks under a different key than just their indexes (e.g. use a hash value, or append a version number) and then those chunks are only ever truly "written" if they've been updated in the manifest. There might also be a feature to "vacuum" orphaned chunks during failed writes. This is quite related to ideas that have been explored previously:
Multiple processes could attempt to write into the chunk manifest at once.
Possible solution: The simplest solution here is an exclusive lock. More optimistic concurrency control may be possible – a potential direction is to have the manifest be an append-only event log with an optional compaction step. I haven't thought this through properly though.
Some of the larger datasets out there have 10s of millions of chunks (example). Our datasets frequently are on the scale of billions of chunks (global datasets with multiple bands and spanning a large time range). However, typically only a small fraction of those are populated, also in the order of 10s of millions.
The manifest must store at least three pieces of information. Due to regularities in the data and with the right encoding and compression, this might compress quite well – but it's still a significant amount of data.
The big advantage of the current storage of MetaArrays using zarr is that this naturally allows for fetching only those regions of the MetaArray one is interested in. This is a good way to keep the amount of data transferred to a minimum. It would be good, if not necessary, for manifests to provide similar subsetting feature.
The data format should also support transactional operations (e.g. updating the key for a chunk). Alternatively, an append-only log might be considered (similar to event sourcing).
There's also some potential in local caching or keeping a local synchronized version of the chunk manifest. Potentially related Pangeo discussion: Conflict-free Replicated Zarr
I want to record a special case of populating arrays incrementally that is potentially relevant here. This was brought up by @dstansby in Zulip
In our general case, we're populating the arrays with data from non-Zarr sources, potentially produced through a complex processing pipeline. The MetaArrays themselves do not know how the data is populated, nor should they.
However, the user might need to simply fetch regions of existing zarr array incrementally. In that case, the MetaArrays could potentially provide a convenient API not only to discover unpopulated chunks, but to also aid in fetching them. This would provide functionality similar to what @dstansby described:
for chunk in chunks:
if exists_on_hard_drive(chunk):
pass
download(chunk)
Perhaps you can clarify what you are trying to accomplish exactly, independent of any particular solution?
In https://github.com/zarr-developers/zarr-specs/issues/300#issuecomment-2204735276 you mention lazily-writing an array as individual chunks are requested. This seems to be something that is easily supported by the existing zarr v2 and v3 formats without chunk manifests: you can simply query the underlying storage to determine if the key corresponding to the chunk is present. If it is, you can read it. Otherwise, you can compute and store it.
If you want to construct an in-memory "MetaArray" indicating which chunks are present, this can be done by performing a list operation on the underlying kvstore in order to obtain the list of all chunks. List operations are very fast when using a chunk manifest since no additional I/O is required after loading the manifest, but I expect in most cases other underlying kvstores, like the local filesystem or even S3, would still provide adequate performance, as long as you can construct the in-memory MetaArray once and then do a bunch of queries.
If you want to construct a MetaArray corresponding to just a subregion of the full array, that can also be done by performing one or more "range-restricted" or "prefix-restricted" list operations on the underlying kvstore. However, if the query does not happen to align well with the "chunk_key_encoding" of the array (and the current chunk key encoding options in zarr v3 are generally problematic for this type of use), it could require a large number of queries. If you want to make this work better, you could propose some improved chunk_key_encodings for zarr v3. Note that this type of querying is used in tensorstore for https://google.github.io/tensorstore/python/api/tensorstore.TensorStore.storage_statistics.html#
In https://github.com/zarr-developers/zarr-specs/issues/300#issuecomment-2204734027 you mention wanting write support when using a chunk manifest, but it is not clear what form of writing you mean. Under the current chunk manifest proposal, zarr implementations would treat the manifest as read only, because the chunk manifest does not indicate where new keys are written. For existing keys, in principle you could attempt to overwrite the same location in place, but that only works if the size of the chunk stays the same (effectively meaning no compression), but that is not necessarily the actually desired behavior. You could also have some form of "writing by reference", where you "write" to the chunk manifest by linking in additional keys referring to existing data elsewhere. But that might more naturally be handled entirely separate from the zarr implementation, since it would require specialized APIs.
It isn't really clear to me what benefits you hope to gain from using a chunk manifest.
Thanks @jbms, these are great questions and points. This is precisely why I wrote the issue in the first place 🙂
It isn't really clear to me what benefits you hope to gain from using a chunk manifest.
The primary motivation has been that this could entirely remove the need for code that updates the MetaArrays. Since the manifest and the MetaArrays store similar information, it would remove the need to keep two parallel implementations.
That's of course only true if the manifest support updates. If it does not, it cannot be used to power MetaArrays.
You could also have some form of "writing by reference", where you "write" to the chunk manifest by linking in additional keys referring to existing data elsewhere
I agree, this could be a solution to a lot of the concistency/concurrency issues mentioned. I elaborated on this a bit in https://github.com/zarr-developers/zarr-specs/issues/300#issuecomment-2204734027. This is again going in the direction of https://github.com/zarr-developers/zarr-specs/issues/154.
You're asking a great question: why even store the chunk initialization information, when it's right there in the object store in the form of stored chunk objects?
This was actually our original method. We would initialize the xr.Dataset
MetaArray on-the-fly from the output of list_prefix
. However, at some point, with a growing number of populated chunks, the list_prefix
method in Zarr became prohibitively slow. We saw two options at that point
bool
values.We've explored these options for our storage (which is an on-prem object store, indexed by Arraylake) and, at that time, chose option 2. as the scalable solution.
However, this choice was in the context of that particular storage and may not apply in general. This issue is about opening up the functionality to the wider Zarr community and thus it's worth exploring 1. again.
I think an important difficulty with listing the object is that you can only list by prefix, not match a generic pattern. This means that one cannot easily slice in each dimension - if I want to slice in the first dimension only, e.g. any chunks with indexes of the form (x, 2, 2)
, I have to list all chunks in the other dimensions as well.
It would be very interesting to see what the performance is in listing in the order of millions of chunks in e.g. S3 and, if necessary, whether an improved chunk_key_encodings
could provide satisfactory performance.
It's also worth considering how well this works with sharding, virtual zarr and other potential (planned) features.
In #300 (comment) you mention lazily-writing an array as individual chunks are requested. This seems to be something that is easily supported by the existing zarr v2 and v3 formats without chunk manifests
If this is well-supported by Zarr already, then please ignore that comment. I only recorded it as a special case of incremental arrays that I saw in the Zulip discussion. It's not relevant to our use cases.
I believe there is a common usage pattern of Zarr that should be documented and more widely known – populating Zarr arrays incrementally and on-demand, chunk-by-chunk.
We have built internal tooling around this that resembles the chunk manifests (https://github.com/zarr-developers/zarr-specs/issues/154, https://github.com/zarr-developers/zarr-specs/issues/287, virtualizarr#chunk-manifests) and VirtualiZarr (https://github.com/TomNicholas/VirtualiZarr/issues/33, virtualizarr#manifestarray).
This issue explains the usage pattern and opens the question of how our tooling could be open-sourced.
Incrementally populating Zarr arrays
A core Zarr usecase for us is to initialize an empty Zarr array with a pre-determined grid and then fill it with data as needed.
For a great overview of this usecase, see Earthmover's recent webinar and the relevant code repository. Note that in the webinar, the entire grid is populated. In our case, parts of the grid are populated on-demand.
Code example
This code defines an empty Zarr array given an example schema. For a fuller example, see [`create_dataset_schema` in Earthmover's repository](https://github.com/earth-mover/serverless-datacube-demo/blob/26765146567452d66d04a0246e9a077b75b9d7ee/src/lib.py#L73). ![image](https://github.com/zarr-developers/zarr-specs/assets/18722560/1bfba4af-2a70-4dea-884f-e5f9a27aa0dc) ```python from odc.geo.geobox import GeoBox from odc.geo.xr import xr_zeros import pandas as pd def define_schema(varname, geobox, time, band): schema = ( # Chunks -1 to have a Dask-backed array # But without creating a massive computational graph xr_zeros(geobox, chunks=-1) .expand_dims( time=time, band=band, ) .to_dataset(name=varname) ) schema['band'] = schema['band'].astype(str) encoding = {varname: {"chunks": (1, 1, 256, 256)}} schema.to_zarr( 'schema.zarr', encoding=encoding, compute=False, # Do not initialize any data: no chunks written zarr_version=3, mode='w' ) geobox = GeoBox.from_bbox( (-2_000_000, -5_000_000, 2_250_000, -1_000_000), "epsg:3577", resolution=1000) define_schema( "data", geobox, time=pd.date_range("2020-01-01", periods=12, freq='MS'), band=["B1", "B2", "B3"] ) ```This pattern is powerful, as it enables us only to compute and ingest what's needed in a query-or-compute fashion. Without going too far from the scope of this issue, it enables us to cascade the computation/ingestion of data in a chunk-by-chunk manner illustrated in the following diagram.
Chunk-level MetaArray
When working with sparsely populated arrays, the natural question arises: which chunks are populated?
To this end, we employ an object we call the chunk-level meta-array (MetaArray for short). This is a second array where each pixel corresponds to a chunk in the original array. The value of the pixel is
1
if the corresponding chunk is populated0
if the corresponding chunk is emptyToy code example
Note: This is only a very rough sketch of the resulting object to illustrate the point. ```python import xarray as xr def create_metaarray(schema): """Create meta-array from a schema. A very rough sketch. Not the actual implementation. """ _, _, cy, cx = schema.data.encoding['chunks'] # Create spatial pixels with coordinates equal to the centre of the chunk xx = schema['x'].values xdiff = xx[cx] - xx[0] x_meta = xx[::cx] + xdiff/2 yy = schema['y'].values ydiff = yy[cy] - yy[0] y_meta = yy[::cy] + ydiff/2 # Initialize an empty meta-array meta = xr.DataArray( data=0, dims=schema.dims, coords=dict(time=schema.time, band=schema.band, x=x_meta, y=y_meta), ) meta.to_zarr('schema-meta.zarr', zarr_version=3, mode='w') schema = xr.open_zarr('schema.zarr', zarr_version=3) create_metaarray(schema) # When writing to the Zarr array, also populate the metaarray. ```Relationship to Chunk manifest & VirtualiZarr
Increasingly, the work in the Zarr world resembles the data structure we have built.
The similarities in these proposals and implementations make me think there is a path where the MetaArrays could become an open-source part of the Zarr ecosystem, potentially even enabling further use-cases such as chunk-level aggregate statistics outlined in ZEP 5.
The question is what the exact path forward would be.
Potential path forward
Here is my best guess at what the path forward could be.
There are essentially two layers to MetaArrays
1. Information on the chunk level
In the future, 1. could become part of Zarr. The chunk manifest would only need to represent the information on whether a chunk is initialized to contain the full information needed for construction of the MetaArray.
Currently, MetaArrays are stored as a separate Zarr array and updated using zarr. In the future
2. API to query this information
To be easily queryable, the chunk-level information must be exposed back in the coordinates of the original dataset and provide an API for its querying.
Xarray is our choice for this API. For example, asking the question "do we have data for this geometry" simply becomes a rioxarray clip operation and xarray reduction.
A similar conclusion was reached by VirtualiZarr, where the use of xarray e.g. exposes a familiar API for combining of multiple datasets.
MetaArrays could become another package building on top of Zarr, similar to VirtualiZarr. However, if the chunk manifest already contains information for the construction of MetaArrays and VirtualiZarr uses this information to construct Xarray(-like) datasets, a greater integration could be possible. For example in the form of common functionality to read the chunk manifest into xarray format.
Questions
I would appreciate suggestions from the Zarr community on where to best focus the efforts.
Here is a set of concrete open questions and discussion points:
A secondary set of questions is around the usefulness of this – would the community find the MetaArrays useful? Be it for storing chunk initialization information, or aggregate statistics. Are people perhaps already using variants of this functionality? Creating this functionality was necessary for us – and thus likely for others, too!
Thank you for any suggestions and answers!