Open alimanfoo opened 4 years ago
Just to say that the v3 core protocol currently includes the concepts of data type, array shape, chunk shape, chunk memory layout, array metadata. I imagine that protocol extensions can extend, modify or add to these core protocol features.
Exactly how this works has yet to be fully defined, but there is also some general discussion of protocol extensions in #49.
I'll add more later, but I should start with an updated link:
Serialization has been implemented in Awkward 0.x but not yet in Awkward 1.x. This is how it works in the old version:
Given an array with a complex data type,
>>> import awkward as ak0
>>> array = ak0.fromiter([1.1, 2.2, [3, 4, 5], [6, 7], None, {"x": 3.14, "y": 2.72}])
>>> array
<IndexedMaskedArray [1.1 2.2 [3 4 5] [6 7] None <Row 0>] at 0x7fc6f24aa310>
>>> array.tolist()
[1.1, 2.2, [3, 4, 5], [6, 7], None, {'x': 3.14, 'y': 2.72}]
It can be broken down into columns for each node in the data type, and each of these columns is a contiguous, one-dimensional array with a different length.
>>> storage = {}
>>> ak0.persist.serialize(array, storage, name="xyz")
>>> storage.keys()
dict_keys(['xyz-1', 'xyz-4', 'xyz-6', 'xyz-9', 'xyz-10', 'xyz-12', 'xyz-14', 'xyz'])
All of these objects can be stored in any backend that accepts named, binary blobs. One of these is a navigation object (JSON, most likely) and the rest are numerical arrays.
>>> for n, x in storage.items():
... print(repr(n), repr(x))
...
'xyz-1' array([ 0, 1, 2, 3, -1, 4])
'xyz-4' array([0, 0, 1, 1, 2], dtype=uint8)
'xyz-6' array([1.1, 2.2])
'xyz-9' array([3, 2])
'xyz-10' array([3, 4, 5, 6, 7])
'xyz-12' array([3.14])
'xyz-14' array([2.72])
'xyz' b'{"awkward": "0.12.20", "schema": {"call": ["awkward", "IndexedMaskedArray"], "args": [{"call": ["awkward", "numpy", "frombuffer"], "args": [{"read": "1"}, {"dtype": "int64"}, {"json": 6, "id": 2}], "id": 1}, {"call": ["awkward", "UnionArray", "fromtags"], "args": [{"call": ["awkward", "numpy", "frombuffer"], "args": [{"read": "4"}, {"dtype": "uint8"}, {"json": 5, "id": 5}], "id": 4}, {"list": [{"call": ["awkward", "numpy", "frombuffer"], "args": [{"read": "6"}, {"dtype": "float64"}, {"json": 2, "id": 7}], "id": 6}, {"call": ["awkward", "JaggedArray", "fromcounts"], "args": [{"call": ["awkward", "numpy", "frombuffer"], "args": [{"read": "9"}, {"dtype": "int64"}, {"ref": 7}], "id": 9}, {"call": ["awkward", "numpy", "frombuffer"], "args": [{"read": "10"}, {"dtype": "int64"}, {"ref": 5}], "id": 10}], "id": 8}, {"call": ["awkward", "Table", "frompairs"], "args": [{"pairs": [["x", {"call": ["awkward", "numpy", "frombuffer"], "args": [{"read": "12"}, {"dtype": "float64"}, {"json": 1, "id": 13}], "id": 12}], ["y", {"call": ["awkward", "numpy", "frombuffer"], "args": [{"read": "14"}, {"dtype": "float64"}, {"ref": 13}], "id": 14}]]}, {"json": 0}], "id": 11}]}], "id": 3}, {"json": -1}], "id": 0}, "prefix": "xyz-"}'
So, for example, these arrays can be stored in HDF5, but only if wrapped with our filter:
>>> h5file = h5py.File("some.hdf5", "a")
>>> ak_h5file = ak0.hdf5(h5file)
>>> ak_h5file
<awkward.hdf5 '/' (0 members)>
>>> ak_h5file["xyz"] = array
>>> ak_h5file["xyz"]
<IndexedMaskedArray [1.1 2.2 [3 4 5] [6 7] None <Row 0>] at 0x7fc6d6c12690>
Here, we interpret the HDF5 group as a single array. If you look at it without the filter, you see all the columns:
>>> h5file["xyz"]
<HDF5 group "/xyz" (8 members)>
>>> h5file["xyz"].keys()
<KeysViewHDF5 ['1', '10', '12', '14', '4', '6', '9', 'schema.json']>
>>> h5file["xyz"]["4"]
<HDF5 dataset "4": shape (5,), type "|u1">
>>> np.asarray(h5file["xyz"]["4"])
array([0, 0, 1, 1, 2], dtype=uint8)
I'm planning to do a similar thing in Awkward 1.x, but in C++ so that it can be used from both C++ and Python. The biggest wart in all of this is that somebody could receive an HDF5 file without being told they need the Awkward wrapper to interpret it, and then it's not easily interpretable.
Since you're developing the v3 extensions interface, I'd like to do this first as a Zarr extension and somehow bake into it the fact that you need the Awkwrad library to interpret it (i.e. no dependency, but some sort of warning or hint of what the user is supposed to do).
The datetime data types extension is similar in one aspect: the underlying data are in primitive types (64-bit integers) while the interpretation is something else (timestamps). However, this extension can be implemented without external libraries and one low-level array is mapped to exactly one high-level array.
For an Awkward extension, a group of different-length low-level arrays are mapped to one high-level array. Zarr has the same concept of a group as HDF5, so it's technically possible to do that mapping, but it would have to be expressed as part of the specification that extensions can map a group to an array.
Could the scope include that?
Serialization has been implemented in Awkward 0.x but not yet in Awkward 1.x.
PR scikit-hep/awkward-1.0#348 added a similar type of serialization to Awkward 1. There are two new functions, ak.to_arrayset and ak.from_arrayset, which turn an Awkward Array into a schema and a collection of flat arrays (an "arrayset"):
>>> original = ak.Array([[1, 2, 3], [], [4, 5]])
>>> form, container, num_partitions = ak.to_arrayset(original)
>>> form
{
"class": "ListOffsetArray64",
"offsets": "i64",
"content": {
"class": "NumpyArray",
"itemsize": 8,
"format": "l",
"primitive": "int64",
"form_key": "node1"
},
"form_key": "node0"
}
>>> container
{'node0-offsets': array([0, 3, 3, 5], dtype=int64),
'node1': array([1, 2, 3, 4, 5])}
>>> print(num_partitions)
None
The flat arrays, 'node0-offsets': array([0, 3, 3, 5])
and 'node1: array([1, 2, 3, 4, 5])'
in the above, can be stored in any system that manages named arrays. The extra layer is to group those arrays and pass the metadata (form
in the above) to the right functions to reconstruct them (ak.from_arrayset
).
If the v3 protocol can make one-to-many associations like this (one user-level object to many physical objects in the "arrayset"), then we should be able to do this.
We were just talking about this on the Zarr meeting today (2020-12-16): I've made a minor change to the Awkward <--> group of named buffers transformation. (Motivation and details here: https://github.com/scikit-hep/awkward-1.0/pull/592#issuecomment-743430896.) To follow a deprecation procedure properly, the name of the function had to change.
To keep this thread updated, I'll translated the above example into the new interface (awkward>=1.0.1
) below.
The following non-rectilinear array,
>>> import awkward as ak
>>> original = ak.Array([[1, 2, 3], [], [4, 5]])
>>> original
<Array [[1, 2, 3], [], [4, 5]] type='3 * var * int64'>
can be decomposed into metadata and flat buffers like this:
>>> mutablemapping = {}
>>> form, length, container = ak.to_buffers(original, container=mutablemapping)
>>> # The values from the original array are now flat buffers in this container, which could be a Zarr sink.
>>> container is mutablemapping
True
>>> container # dict of NumPy arrays; we only assume they're <memory>
{
'part0-node0-offsets': array([0, 3, 3, 5], dtype=int64),
'part0-node1-data': array([1, 2, 3, 4, 5])
}
>>> # The metadata describing how to put those buffers together again is expressed in JSON.
>>> # (It could be included in the container as a byte array.)
>>> form
{
"class": "ListOffsetArray64",
"offsets": "i64",
"content": {
"class": "NumpyArray",
"itemsize": 8,
"format": "l",
"primitive": "int64",
"form_key": "node1"
},
"form_key": "node0"
}
>>> # We also need to save the length somewhere.
>>> length
3
This goes in the other direction like so:
>>> # Needs the form, length, and container.
>>> ak.from_buffers(form, length, container)
<Array [[1, 2, 3], [], [4, 5]] type='3 * var * int64'>
This page has a full example of saving arbitrary structures to HDF5 using h5py. The problem with that example is that if someone is given an HDF5 file with exploded data in it, they don't know that they need to run ak.from_buffers
on it. It will appear to be a bag of one-dimensional arrays not the original
array. What I'd want to do with a Zarr v3 extension is to add metadata that says, "I need to import awkward as ak
and run ak.from_buffers
to extract this group as an array!"
Also in the time since my last comment here, @martindurant has implemented this in Zarr v2: https://github.com/martindurant/awkward_extras/blob/main/awkward_zarr/core.py (though it's based on ak.from_arrayset
, not ak.from_buffers
, because I only updated that interface earlier this week).
Would the main spec be open to allowing arrays where the chunk shapes are not fixed, but the chunk lengths in each dimension are defined by an array (or are constant)?
A concrete example is the rechunking done in dask overlap calculations when the last chunk is smaller than the overlap. A small example is a size 22 1-d array with chunks of 10 -- chunks=(10,10,2). If an overlap of size 3 is needed, dask automatically rechunks to (10,9,3).
Would it be too much overhead to allow each dimension's chunks to be specified as an array? This seems to allow additional flexibility without much extra complexity/space, so could be part of the core spec (instead of as an extension).
I am also interested in defining chunks as an array along each dimension like dask.array does (I have an issue somewhere) and this would be pretty easy to implement given the existing code in dask that could be copied. It could be done for v2, and leave the existing constant-chunk case unaltered. That would be really useful for cases such as a time dimension wth chunks of 365, 365, 366, 365 days - which you see all the time.
Maybe that's enough for awkward, but I am not sure. Each batch of top-level rows in an awkward array has a variable number of leaf elements within each of the rows. So we can know the total number of leaf values in a batch of rows and call this a "chunk" to be stored together. Maybe that's exactly what we need.
In our discussions about chunking Awkward Arrays, we've settled on considering them as logically 1-dimensional, so chunks/partitions would be defined by lengths, not shapes. It wouldn't be a problem to make the length of each chunk/partition constant or given by some list (e.g. 365, 365, 366, 365
). I also don't see a problem with the lengths of chunks being 10, 9, 3
instead of 10, 10, 2
.
The lengths of internal buffers that constitute those logical arrays are not predictable—or anyway, their lengths depend on or have to agree with values in other arrays. In the above example,
>>> container # dict of NumPy arrays; we only assume they're <memory>
{
'part0-node0-offsets': array([0, 3, 3, 5], dtype=int64),
'part0-node1-data': array([1, 2, 3, 4, 5])
}
the length of partition 0 being 3
means that the buffer at the root of the tree, 'part0-node0-offsets'
, must have length 4
(fencepost principle for ListOffsetArray), but there's no constraint on the length of 'part0-node1-data'
other than that its length must be (at least) equal to part0-node0-offsets[-1]
, which is 5
. With a deeper, more complex tree, there would be a lot of lengths depending on values in their parent nodes' buffers. For a given array, all of the lengths of all of the internal buffers could be specified somewhere, but they would change in ways that depend on values if you ever wanted to change the partitioning.
As an implementation of partition-changing, you could turn a set of buffers for the two partitions that need to be resized into two ak.Arrays
, slice them both, and combine the part that needs to be combined with ak.concatenate
(in other words, exactly as you would do with NumPy arrays). That would take care of all the internal lengths (ak.from_buffers
checks lengths as it descends) and offsets (which have to count from the beginning of a partition, so all of their values increase or decrease by an offset). It necessarily involves accessing all of the data, but anyway, all non-leaf data strictly have to be accessed in order to compute what the new lengths should be. The only possible optimization would be to avoid accessing the buffers for the leaf nodes.
My assumption at the current point in the sharding discussion leads me to believe that there should be an extension mechanism (a la codecs) at the array level, e.g. current simple chunks, sharded chunks, etc. Somewhat naively I wonder if this might not be a strategy for irregular chunking but then there's the natural question of how would one then combine sharding and irregular chunking. Thoughts welcome.
I am also interested in defining chunks as an array along each dimension like dask.array does (I have an issue somewhere)...
@martindurant, maybe issue ( https://github.com/zarr-developers/zarr-specs/issues/40 )?
Yes, totally. Given dask.array has all the logic for translating getitems into chunk selections, it ought to be simple to implement (as an alternative to regular chunking, which would presumably be slightly faster).
I mentioned this in the meeting but wanted to record this comment here: instead of just defining awkward arrays on top of regular zarr arrays, which imposes a lot of limitations, we may instead want to consider modifying the chunk encoding to allow variable length data, in order to define an awkward array mechanism more in the spirit of zarr.
Zarr v2 has ragged arrays, which (I've heard) are implemented in an interleaved way: size, item, item, item, size, item, size, ... Replacing this with at least two buffers—a cumulative sum of sizes ("offsets") in one and all the items in the other—would be an improvement because this would be random-access, can be appended to, and could in principle be edited in-place.
I just want to clarify that the proposal of accessing Awkward Arrays targets a much larger set of data types than a single ragged array. Here is an example:
>>> import awkward as ak
>>> array = ak.Array([
... [{"x": 1.1, "y": [1]}, {"x": 2.2, "y": [1, 2]}, {"x": 3.3, "y": [1, 2, 3]}],
... [],
... None,
... [{"x": None, "y": [1, 2, 3, 4]}, {"x": 5.5, "y": [1, 2, 3, 4, 5]}]
... ])
It includes variable-length lists at multiple levels of hierarchy, record structures, missing numbers, and missing lists. It can be decomposed into "JSON, integer, and named 1D arrays" as I described in the meeting.
>>> # decompose
>>> form, length, buffers = ak.to_buffers(array)
>>> # reconstitute
>>> ak.from_buffers(form, length, buffers)
<Array [[{x: 1.1, y: [1]}, ... 2, 3, 4, 5]}]] type='4 * option[var * {"x": ?floa...'>
The 1D buffers can be stored in a group:
>>> for name, data in buffers.items():
... print(f"{name}:\t{data!r}")
...
part0-node0-index: array([ 0, 1, -1, 2], dtype=int64)
part0-node1-offsets: array([0, 3, 3, 5], dtype=int64)
part0-node3-index: array([ 0, 1, 2, -1, 3], dtype=int64)
part0-node4-data: array([1.1, 2.2, 3.3, 5.5])
part0-node5-offsets: array([ 0, 1, 3, 6, 10, 15], dtype=int64)
part0-node6-data: array([1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5])
The form
(JSON) and length
(integer) can be stored in metadata:
>>> length
4
>>> print(form)
{
"class": "IndexedOptionArray64",
"index": "i64",
"content": {
"class": "ListOffsetArray64",
"offsets": "i64",
"content": {
"class": "RecordArray",
"contents": {
"x": {
"class": "IndexedOptionArray64",
"index": "i64",
"content": {
"class": "NumpyArray",
"itemsize": 8,
"format": "d",
"primitive": "float64",
"form_key": "node4"
},
"form_key": "node3"
},
"y": {
"class": "ListOffsetArray64",
"offsets": "i64",
"content": {
"class": "NumpyArray",
"itemsize": 8,
"format": "l",
"primitive": "int64",
"form_key": "node6"
},
"form_key": "node5"
}
},
"form_key": "node2"
},
"form_key": "node1"
},
"form_key": "node0"
}
In the above proposal, I'm asking for a way to label a Zarr group as being "awkward", so that users without a library registered to this name are either prevented from poking around with "part0-node4-data"
or are recommended not to do so.
Following up on Dennis's comments about an API for slicing: it's an interesting idea that I didn't consider, but it would be possible to extend ak.from_buffers
to take a slice. That could be passed through a Zarr API for reading an array in library X within a given slice. (And if it crosses chunk boundaries, library X can also know how to concatenate data coming from different chunks. Awkward has ak.concatenate
.)
Editing these arrays in-place would not be a goal, since it's not even possible in Awkward Array itself, since we do structural sharing and therefore want to avoid any mutability (other than append-only).
Following up on @joshmoore's comment in the meeting about not all languages having a suitable library: one could say, "that's okay, the library-labels are an open-ended set, if a library is unavailable (being in the wrong language is one way to not be able to access a library), then it just can't be interpreted in the recommended way." Or, to get the full set of data types in a language-independent way, a different proposal would be to include Apache Arrow within Zarr. That wouldn't be opaque to Zarr, as the above would be, though you would have to take the Arrow specification as a part of Zarr.
You can break down Arrow arrays in the same way as Awkward arrays:
>>> arrow = ak.to_arrow(array)
>>> # Arrow type plays the role of Awkward form
>>> arrow.type
LargeListType(large_list<item: struct<x: double, y: large_list<item: int64 not null> not null> not null>)
>>> # Arrow buffers are a list whose order corresponds to a walk over the type
>>> for index, data in enumerate(arrow.buffers()):
... print(f"{index}\t{data if data is None else np.frombuffer(data, np.uint8).tobytes()}")
...
0 b'\x0b'
1 b'\x00\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00'
2 None
3 b'\xbf'
4 b'\x9a\x99\x99\x99\x99\x99\xf1?\x9a\x99\x99\x99\x99\x99\x01@ffffff\n@\x9a\x99\x99\x99\x99\x99\xf1?\x9a\x99\x99\x99\x99\x99\x01@ffffff\n@\x9a\x99\x99\x99\x99\x99\xf1?\x00\x00\x00\x00\x00\x00\x16@'
5 None
6 b'\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x07\x00\x00\x00\x00\x00\x00\x00\t\x00\x00\x00\x00\x00\x00\x00\x0c\x00\x00\x00\x00\x00\x00\x00\x10\x00\x00\x00\x00\x00\x00\x00\x15\x00\x00\x00\x00\x00\x00\x00'
7 None
8 b'\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00'
The value of Arrow format is that there are interpreters in C, C++, C#, Go, Java, JavaScript, Julia, MATLAB, Python, R, Ruby, and Rust (from https://arrow.apache.org/). It's well-specified, has a large community, and if Zarr could deliver arrays of complex data in Arrow format, I'd be happy. (My users would call ak.from_arrow
, which is zero-copy.)
So the two proposals,
have different pros and cons. An advantage of the first proposal is that it should be easy, though it could be disappointing to users if they have a Zarr library in their language and some Zarr data, but the data isn't (or says it isn't) interpretable. An advantage of the second proposal is that it would be usable data in many languages, but it would be a larger project to take on, and maybe you wouldn't want Arrow to be a required dependency.
You raise some very interesting points regarding Apache Arrow:
Apache Arrow defines a complex data model with a lot of nice features, including nested structures and variable length lists, and it would be nice to have those features in zarr and integrate with Apache Arrow. It is also oriented around columnar storage, which is a good choice in terms of compression efficiently and partial I/O efficiency. Apache Arrow is oriented around sequential streaming I/O, rather than random access.
Zarr v2 is based on the numpy data model, which allows nested fixed-size structures and arrays to be defined, but is not very flexible, is oriented around interleaved storage. While it makes some sense for numpy's purposes I don't think it is a good fit for zarr. Zarr v2 does not support partial I/O or indexing within these nested fields at all --- from the perspective of zarr the entire dtype is an atomic unit.
HDF5 has its own elaborate data model, which I believe has roughly similar capabilities to Apache Arrow's data model. However, non-variable-length data is interleaved, and variable length data is encoded separately within the file and referenced by pointer. HDF5 does not support partial I/O or indexing within a data type --- for the purpose of reading and writing, the entire data type is an atomic unit.
I think it could be great to define the zarr data model in a way that is compatible with Apache Arrow and cover the use cases of zarr v2/HDF5 structured/variable length data types. However, the core feature of zarr is partial I/O (as far as I am concerned its main reason to exist at all) --- you can perform arbitrary indexing operations and only read and write the relevant portion of the data (modulo the chunk shape). I think it is critical that in integrating the Apache Arrow data model that we do it in a way that fully supports partial I/O and chunking, as otherwise we will have lost the key advantage of zarr over existing formats like hdf5.
While the zarr v2 tutorial mentions ragged arrays, I wouldn't actually consider that to really be zarr supporting ragged arrays --- it is just a regular array of variable-length values, which are encoded using the custom VLenArray codec. From the perspective of zarr you have a regular array and can only index the "non-ragged" dimensions; there is no chunking of the variable length arrays and you have to read them in their entirety. With VLenArray it is true that an "interleaved" encoding is used within each chunk, where you have length, sub_array_data, length, sub_array_data, ...
and I agree it would be better to use a columnar encoding instead, which would be a simple change.
But more generally, what I'd like to see as far as awkward/ragged array support in zarr is full support for chunking and indexing and partial I/O on variable length dimensions, to be in keeping with what I see as the principle of zarr. I think that can be done, but it would require careful consideration of the chunk format.
If Arrow and zarr could have alignment on data types, that would be a dream come true for me. I would love it if data types we use and support are just arrow data types, and that anyone that can work with arrow can work with our data. I think this could be a huge benefit for anndata as well as OME-Zarr.
That said, I think there would be quite a lot of work to do here (definitely outside the scope of a GSOC student, though perhaps this could be a first step). My understanding so far is that Arrow and zarr are focussing on fairly distinct kinds of data. Maybe this is a huge challenge to overcome, maybe it’s complementary.
AFAIK, development for arrow is targeted very specifically on tabular data, while zarr focuses on multidimensional arrays. There is a Tensor type in arrow, but it seems largely unsupported (e.g. jorgecarleitao/arrow2#8). Maybe this is a great opportunity for experts from zarr to come in and push development in this area of the library. Also maybe a ton of work.
On the technical side, I believe Arrow currently supports very flexible chunking via partitioning/ ChunkedArray
s. I don’t know if it’s able to take advantage of regularly sized chunks. I would strongly suspect these features haven’t made it to Tensors.
For how to access zarr as arrow, it seems like there could be a ZarrDataset class (which I think would be implemented in C++ or Rust) which would provide an interface to zarr from every language that has an arrow interface. Plus then I could run DataFusion queries on the tables we have in a zarr AnnData 🤩.
@jpivarski, btw, your first proposal is essentially how we were thinking our first round of awkward array support in AnnData would go (theislab/anndata#647, here). But something that can definitely be read from other languages is preferable.
In proposal (1), providing a mechanism for labeling a group as belonging to library X, library X can be "awkward"
, "pyarrow"
, or something else. From the point of view of Zarr, it's less effort than proposal (2) because you're leaving its interpretation as a responsibility of library X. You could require library X to define functions X._from_zarr_group_
and X._to_zarr_group_
to do all of the work. Libraries can be unavailable either because they're not installed or they don't exist in the language of a particular Zarr client. Then it's either a warning or an error to access the datasets in the group naively.
In proposal (2), specifically taking on Arrow as a part of the Zarr specification, is a lot more work. You'd have to define a mapping from a group's datasets to Arrow buffers and back, interpret Arrow types and store them as metadata—everything that I've done to implement ak.to_arrow
and ak.from_arrow
in Awkward Array. If you want to benefit from the fact that Arrow is implemented in all the languages, you'd have to do this for all of the Zarr clients in each language.
@martindurant and I were talking about this today, and there might not be enough people available to work on (2), and requiring Zarr clients to depend on Arrow libraries—what you'd need for the Arrow data to be truly transparent in the Zarr client—may be too big of a dependency. I may have been rash to have suggested it yesterday. At least, we should be aware of what it entails.
He also pointed out that both of these would require non-uniform chunking. If we have an array of variable-length lists (Awkward or Arrow) decomposed into an offsets
buffer and a content
buffer in a Zarr group, the offsets
buffer may be uniformly chunked, but the placement of the divisions between chunks in the content
is something that depends on the values of offsets
and they need to be irregular. Even if the Arrow array is not decomposed into its buffers as I had been assuming, if instead it's a single buffer per chunk in its IPC or Feather form, those buffers would need to have different lengths for each chunk. So non-uniform chunking is a prerequisite for this (and sounds to me like a good thing to have, in general).
AFAIK, development for arrow is targeted very specifically on tabular data, while zarr focuses on multidimensional arrays.
That's part of it: pyarrow's Tabular is just an extension type, not part of the general data model, whereas multidimensional arrays are first-class to Zarr. But also, the record/struct types, variable-length lists of any type, missing data, heterogeneous unions, etc., are beyond the model of multidimensional arrays, and that's why I'd like to add this as an extension type for Zarr. Proposal (1) is to make this not first-class in Zarr, but an extension type with the right external libraries. Proposal (2) is to make this first-class in Zarr by swallowing Arrow. I'll let you decide if that's too much.
Sorry about the late response here! I’ll try to be make my point succinctly so I actually finish leaving the comment 😅:
I agree that having complete interoperability between zarr and arrow is quite a lot of work. I also agree that zarr depending on arrow would be a big ask.
But I think there is some middle ground here. I think zarr could strive to use buffers which are binary compatible with arrow, as well as similar types, and gain a lot from it. I wouldn’t target the entire Arrow library as what needs to be implemented (not sure those union types would be so useful here).
You wouldn’t have to rely on arrow, but if you did want to point arrow at the relevant buffers and give it some metadata that could be a pretty fast way to get an implementation running in a new language.
Coming back from the arrow tangent, I would just like to make sure we’re on the same page on this discussion.
The primary goal for a GSOC student or contributor would be to implement variable length chunk support (as mentioned previously: https://github.com/zarr-developers/zarr-specs/issues/62#issuecomment-993652321), right?
If so, I have a couple implementation thoughts/ questions:
Where are the chunk offsets stored? Would this be storing a long list of offsets in a json file and would that be bad?
In the JSON metadata, where currently you have:
chunks: array[int]
you can now have
chunks: array[int | array[int]]
or perhaps the simpler
chunks: array[int] | array[array[int]]
So yes, regular or irregular chunks on any index.
"it's okay if a chunk size is a list of integers"?
Exactly. Still, it's a departure. Implementation will need some work. I hope for zarr-python, we can copy most of what we need from dask.array.
@martindurant What you are describing I would characterize as support for an "irregular chunk grid", in contrast to the "regular chunk grid" that zarr currently supports --- I agree it is potentially a useful feature, but I think it is unrelated to what is being discussed here --- awkward arrays, i.e. arrays where the extent along one dimension is dependent on the indices within other dimensions. For example, the simplest case of an awkward array would be a 2-d array where the first dimension "x" has a fixed size of 100 and the second dimension "y" has a variable length dependent on the position along "x". In Python you could represent this as a list of lists, where indices into the outer list correspond to "x" and indices into the inner list correspond to "y". You could have regular or irregular chunking of an awkward array, but I think that is orthogonal to the awkward array concept.
The way awkward arrays are stored, is a set of 1-d ordinary arrays. So for the 2d case, you would have:
Let's say that we chunk on 50 rows, the first 50 rows contain 100 values, and the second 50 rows 500 values. In order to be able to read "the first chunk" and "the second chunk" independently, then the data array must have chunks of [500, 100].
When you have more deeply nested structure of the awkward array, you would in general need variable chunks on all of the data arrays except the very top level.
(I'm sure how clear that was, @jpivarski could do better)
@martindurant Thanks for the clarification --- that implementation strategy didn't occur to me.
I see that the strategy you propose of irregular chunking solves some of the problems with a naive mapping of the awkward
python package to zarr, but it also introduces some other problems:
In general while the 1-d array representation used by the awkward package makes sense for a read-only in-memory representation I don't think it is the best choice of representation for on-disk read-write storage, even with the mitigation of irregular grid chunking.
the metadata file has a size of O(number of chunks)
This doesn't appear to be a problem. The size of this data will always be much much smaller than any chunk.
modifying both the chunk and the metadata file
Were this a zarr extension, I think it would state that awkward datasets are immutable once written. The metadata for the chunks should then be written exactly once. Non-parallel append-only would be OK; first you write data, then you update the matadata, so that a failure will not cause corruption.
if you have more than one variable-length dimension then the number of chunks also changes, which means that a lot of chunks will have to be renamed as well.
Sorry, I don't follow.
Note that sparse (or graph) storage might be somewhat similar, and zarr would be a great binary store for it.
the metadata file has a size of O(number of chunks)
This doesn't appear to be a problem. The size of this data will always be much much smaller than any chunk.
Only if you have less than ~10000 chunks.
modifying both the chunk and the metadata file
Were this a zarr extension, I think it would state that awkward datasets are immutable once written. The metadata for the chunks should then be written exactly once. Non-parallel append-only would be OK; first you write data, then you update the matadata, so that a failure will not cause corruption.
I can see how this could work for some use cases, but this design has a lot of fundamental limitations and loses a lot of the key benefits of zarr. I imagine that there will be a lot of use cases that require parallel writing for which an entirely different awkward array implementation would be required.
if you have more than one variable-length dimension then the number of chunks also changes, which means that a lot of chunks will have to be renamed as well.
Sorry, I don't follow.
My comment here was in regards to resizing one of the dimensions.
@martindurant summarized the requirements for Awkward Array as well as I could have.
The primary goal for a GSOC student or contributor would be to implement variable length chunk support
Originally, my interest in this had been to ensure that there's some sort of marker on a group of datasets to say, "These are collectively one Awkward Array. You [probably/definitely] need the library named 'awkward' to read this correctly" (maybe with a hook to invoke the interpretation function). It would just be a metadata marker and a policy.
But I had been assuming that Zarr datasets can be irregularly partitioned. That would be a technical prerequisite to be able to store Awkward Arrays in Zarr at all, regardless of whether the absence of a metadata marker would confuse users. Learning that, it's now a larger project than I had previously thought, but still it doesn't seem too large for a summer student.
As I understand it from @martindurant, there are other features that will require irregular partitions, so we could be having this discussion without reference to Awkward Arrays. Oh, sparse matrices is one such case: even if the partitions represent equal-sized regions of the logical array, the physical data associated with the partitions are usually different sizes.
Some of the issues:
- the metadata file has a size of O(number of chunks)
It does. Chunks are "as large as can be comfortably read into memory," say ~GB, and the number of chunks can be as large "as can comfortably be stored in metadata and JSON encoded/decoded." For numbers (sizes of each chunk, not cumulative offsets) on the order of 1 billion (the chunk size of 1 GB), encoding and decoding 1 million of them as JSON takes 0.2 seconds in Python's json
module and the disk size of the encoded numbers is 10 MB. I would take that (or a factor of 10 higher) to be a limit of comfort, and that sets the maximum array size at 1 billion × 1 million = 1 quadrillion. The corresponding data behind this comfortable metadata is ~PB.
>>> import json, time, numpy as np
>>> stuff = np.random.poisson(1000000000, 1000000).tolist()
>>> starttime = time.time(); tmp = json.loads(json.dumps(stuff)); time.time() - starttime
0.2118852138519287
>>> len(json.dumps(stuff)) / 1024**2
10.966504096984863
To address datasets larger than the 1 PB scale, more than 10 MB of metadata will be required (or larger chunks than 1 GB): supercomputer scales require special handling. Most use-cases are considerably smaller than that, and what I've shown here is that a large enough range of data can be addressed by a reasonably small metadata file. (If it didn't, we'd have to consider "partitions of partitions" and start building the third rung of a tree.)
The conclusion is not changed much (40%) if cumulative offsets are used, rather than counts. Cumulative offsets are needed for random access, but they can be stored in JSON as counts and cumulatively summed when loading into memory, so it could be defined either way.
Oh, in case there's any confusion about this, the partition sizes are not determined by the variable lengths of lists within an Awkward Array. List lengths, which are determined by the data they represent and might therefore be quite small, are encoded in one of the 1-dimensional datasets within the group: data, not metadata. The partition sizes are determined by physical memory capacity of the machines reading the data, and that's why I said ~GB.
- changing the number of elements in a chunk requires modifying both the chunk and the metadata file. Since most underlying stores won't allow that to be done atomically, if the process is interrupted the array becomes corrupted.
- there will be contention on updating the metadata file if there are concurrent writes from multiple machines
- if you have more than one variable-length dimension then the number of chunks also changes, which means that a lot of chunks will have to be renamed as well.
Completely setting aside Awkward Arrays for the moment (which are write-once, read-many immutable), wouldn't all of the above problems also hold for regular arrays? Resizing and/or repartitioning a regular array is a massive shuffle: that's no different here. Maybe you're thinking of adding data to a sparse array, which changes the sizes of partitions?
The last question makes me think that you're assuming the partition sizes would be tied to the lengths of lists in a ragged array, or that we would be changing the lengths of lists within a ragged array. That's not the Awkward Array use-case, in which all of these are immutable. I could see it being an issue with sparse arrays, which do grow as values are changed in-place, but I also think it could be managed if the sparse array is considered eventually consistent: changing a value that adds a new COO coordinate would first append to the physical data and then put a request to increment the metadata in a queue. If a reader looks at the sparse array after the coordinate is added but before the metadata is updated, they'd see an old version of the array. I think a mutable sparse array could be handled, but my use-case is immutable, anyway.
Note that sparse (or graph) storage might be somewhat similar, and zarr would be a great binary store for it.
Oh yeah: trees and graphs would be another use-case.
I can see how this could work for some use cases, but this design has a lot of fundamental limitations and loses a lot of the key benefits of zarr. I imagine that there will be a lot of use cases that require parallel writing for which an entirely different awkward array implementation would be required.
These use-cases we have are already useful. The Zarr implementation of Awkward Array storage doesn't have to go beyond the capabilities of in-memory Awkward Arrays, which are also immutable. (Or append-only, but that means adding extra partitions at the end.)
I might add - I don't think I did so far - that a set of chunk sizes also allows use cases with kerchunk (which are read-only by nature) where the original datasets do not have the same uniform chunking as one-another.
Available memory isn't the only practical limit on chunk size. The optimal chunk size depends on the access patterns as well --- for example I have a lot of use cases involving 3d (non awkward) arrays where the optimal chunk size is ~64^3 elements but the total array is ~1PB. Using 1GB chunk size would mean a massive read overhead. While these precise use cases do not involve awkward arrays, I can think of some related use cases with similar access patterns that would benefit from awkward arrays.
For the immutable-only non-concurrent write use case using the proposed encoding of awkward arrays as 1d zarr arrays, I think it is not strictly necessary even to support irregular grids --- instead you could use an arbitrary fixed chunk size of e.g. [1000000]. This would mean 1 zarr chunk does not correspond to any rectangular region of the awkward array, but that would just mean somewhat less efficient access (depending on access patterns), but basically you could tune the chunk size so that the overall overhead is fairly modest. In any case since it would require no changes to zarr at all, it would allow you to evaluate the existing awkward
Python library using zarr as storage immediately.
It is true that changing the chunk size of a zarr array would require rewriting all of the data. However, increasing a dimension of the array just requires rewriting the .zarray metadata file --- none of the chunks need to be touched. Decreasing a dimension of the array may require deleting chunks (or you could skip that and just let them remain). And when overwriting a portion of an existing array, only the affected chunks are touched, and this can be done with arbitrarily high parallelism (as long as each machine touches disjoint chunks).
In my mind one of the most common use cases for an awkward array would be an array of variable-length strings. For example, we might have 4 dimensions, "x", "y", "z", "k", where 'x", "y", "z" are non-variable dimensions and "k" is a variable dimension that depends on "x", "y", and "z". Furthermore we might wish to use a chunk size of [100, 100, 100, 100]. As I see it, a zarr-like awkward array implementation would allow us to write to arbitrary strings in parallel from multiple machines, as long as they are in separate [x, y, z] chunks of size [100, 100, 100]. Furthermore, in this example the variable-length dimension is chunked as well, with a chunk size of 100, so that it is possible to efficiently read just a portion of the string if it is very long.
I understand that the awkward
python library provides an immutable-only interface and I can see that such an interface makes a lot of sense for a lot of use cases. I just think that there are also a lot of use cases that require read-write support when dealing with persistent storage, and while read-write support adds implementation complexity I don't think it necessarily adds any overhead, so if there will ultimately be a need for read-write support, it may be better to just standardize that rather than also support a separate immutable awkward array implementation. I think for very large arrays in particular, read-write support is more important than for small in-memory arrays where you can easily just re-copy/re-create the entire thing to modify it.
I think an important detail that has been missed in the awkward-specific part of this conversation, is that our arrays are necessarily one-dimensional in every case. This means that values of the chunk sizes will be much larger than what you might have experienced with >=3d, while keeping the byte-size of each chunk data file relatively small.
Furthermore, the access pattern is not as you suggest. Let's say, again, that we have a variable-length logical awkward array, which would be stored as one integer offsets array and one data array. If the chunking is per ten rows, and first ten rows contains 100 data values, and the next ten rows contains 500, we want to be able to read "the first chunk" and "the second chunk" independently. Without variable chunking, the only way to know how many values are represented by the first 10 offsets is to read and interpret then and then, as a separate step, do [:] selection on the data (which will be wasteful). With variable chunking, the boundaries align, and you can efficiently distribute the reads.
Let me try to explain how I understand your example in the context of using the existing zarr format (without irregular grid support):
Suppose we pick a chunk size of 50
for the data array and 50 for the size/cumulative offset array.
To read an arbitrary consecutive 10 rows, which we will assume contain 100 data values in total, requires:
[k, k+100)
from the data array. Most likely k
won't be a multiple of 50, which means we will have to read an extra chunk at the beginning and end. In total we will have to read 4 chunks of 50 elements each (200 elements total); in contrast, with irregular grid support we would only have to read 1 chunk of 100 elements (assuming the start row is a multiple of 10).Then consider another arbitrary consecutive 10 rows, which we will assume contain 500 data values in total.
[k, k + 500)
from the data array. As before we can assume k
is not a multiple of 50
, so we will have to read 12 chunks in total; in contrast, with irregular grid support, we would only have to read 1 chunk of 500 elements (assuming the start row is a multiple of 10).We can see that there is some overhead in both of these cases from not supporting an irregular chunk grid, but especially for the 500 values case the overhead was fairly small, and it certainly seems workable as a prototype. In general if we choose the data chunk size to be e.g. 10% of the number of consecutive values that we expect to process at a time, then the overhead will be fairly small.
Looks like I spawned a lively conversation then immediately went on vacation 😄. I'll try to update with some perspective here.
About whether some metadata is needed or variable chunk sizes are needed, ultimately I don't think either is 100% necessary. We can absolutely round trip an awkward array through zarr right now. I do buy the argument that variable length chunking would add so much to this (at least ergonomically), that it could be considered required.
Without variable length chunks partial access seems possible through what @jbms is describing. That is actually how I've worked with CSC/ CSR sparse data stored in zarr/ hdf5. I'm only saying "seems" since I haven't actually tried with awkward, but think it would be fine.
We can see that there is some overhead in both of these cases from not supporting an irregular chunk grid, but especially for the 500 values case the overhead was fairly small, and it certainly seems workable as a prototype.
It's workable. But manually dealing with boundaries is a pain, especially when working with numba or jax code – where you have to do a lot of jumping back and forth between "operate on data" (in compiled code) and "grab chunk from data storage" (in python) (https://github.com/zarr-developers/zarr-python/issues/771). Asking users to then pick good chunk sizes here is kind complex. Saying "chunk every ten rows" is easy for users (e.g. me). See also: https://github.com/scverse/anndata/issues/523#issuecomment-813894805.
I'm not sure I'm understanding the disagreement about writing. If anything, variably chunked storage would be great for parallel writing. With variable chunk sizes I don't need to know how the current logical-chunk aligns with previous logical-chunks physical-chunking. I just need to make sure my offsets are correct once I'm done. To me, this seems much more tractable than storing variable length data in fixed size chunks (where write locations for chunks are dependent on previous chunks).
Suppose we pick a chunk size of 50 for the data array and 50 for the size/cumulative offset array.
I don't think the offset array and data array need to have the same logical chunking (which I think you are suggesting here). This is pretty close in my mind to why it would probably be okay to write the irregular chunk sizes to the json metadata. In practice, ten entries of data is going to be much much larger than the ten relevant offsets. It will actually be a lot of overhead to have to hit the disk/ network for each chunk of offsets, especially when a far greater number (probably all for most use-cases) could be held in memory.
However (this may be a bit more out there), would it be necessary to save actually know the offset metadata for the data array at all? I could figure it out from the offset array or the decoded buffer + type information in most (all?) cases. This could solve some of the metadata problem for parallel writes.
Thanks for the clarifying words here.
One nugget I would add: if awkward and sparse and kerchunk (and maybe other use cases) could benefit from this, and would otherwise need to solve the problem for each library, solving it once here would minimise duplicated work and errors.
don't think the offset array and data array need to have the same logical chunking (which I think you are suggesting here).
They do need to align, the simplest pattern of which is to have them the same. I agree that's probably not optimal.
It's workable. But manually dealing with boundaries is a pain, especially when working with numba or jax code – where you have to do a lot of jumping back and forth between "operate on data" (in compiled code) and "grab chunk from data storage" (in python) (zarr-developers/zarr-python#771).
I don't think users would necessarily need to manually deal with boundaries --- the zarr-python library already handles chunking and indexing.
Asking users to then pick good chunk sizes here is kind complex. Saying "chunk every ten rows" is easy for users (e.g. me). See also: scverse/anndata#523 (comment).
Agreed. Although on the other hand if some rows may be much larger than others, it may be desirable to chunk based on elements rather than rows.
I'm not sure I'm understanding the disagreement about writing. If anything, variably chunked storage would be great for parallel writing. With variable chunk sizes I don't need to know how the current logical-chunk aligns with previous logical-chunks physical-chunking. I just need to make sure my offsets are correct once I'm done. To me, this seems much more tractable than storing variable length data in fixed size chunks (where write locations for chunks are dependent on previous chunks).
Agreed --- the irregular grid support does make parallel writing simpler, as long as variable-length dimensions cannot themselves be chunked. If variable-length dimensions can be chunked, then you would need to know all of the sizes ahead of time, which would be an unfortunate requirement. However, if variable-length dimensions cannot be chunked, then effectively every chunk is its own independent awkward array, and it would seem much more natural for each chunk to just use some custom binary encoding of the awkward array (e.g. similar to the way the "ragged arrays" tutorial example is stored https://zarr.readthedocs.io/en/stable/tutorial.html#ragged-arrays), rather than trying to represent awkward arrays as non-awkward zarr arrays. Then there would be no need for irregular grid support, or for a separate offsets array at all.
Suppose we pick a chunk size of 50 for the data array and 50 for the size/cumulative offset array.
I don't think the offset array and data array need to have the same logical chunking (which I think you are suggesting here). This is pretty close in my mind to why it would probably be okay to write the irregular chunk sizes to the json metadata. In practice, ten entries of data is going to be much much larger than the ten relevant offsets. It will actually be a lot of overhead to have to hit the disk/ network for each chunk of offsets, especially when a far greater number (probably all for most use-cases) could be held in memory.
Yes I understand that the offset array need not use the same chunk size as the data array --- I should have picked a different size for the example.
If a separate offsets array is used, probably it would be advantageous to cache the offsets in memory, which could be done automatically e.g. via least-recently-used policy. I'm not sure to what extent zarr-python already supports caching, though.
However (this may be a bit more out there), would it be necessary to save actually know the offset metadata for the data array at all? I could figure it out from the offset array or the decoded buffer + type information in most (all?) cases. This could solve some of the metadata problem for parallel writes.
I'm not sure I understand what you are proposing here --- my understanding is that the original proposal was to store offsets in one 1-d zarr array, and the flattened list of elements in another zarr array (with no extra metadata). The offsets array is the only place that tells you where the boundaries are between "rows" in the flattened array. With the irregular grid support, if you have a chunk size of 10 rows, then there would be some redundancy, in that the grid boundaries would tell you the boundaries between every chunk of rows, but unless the chunk size is 1 row you would still need the separate offsets array to know the boundaries between every individual row.
it would seem much more natural for each chunk to just use some custom binary encoding of the awkward array (e.g. similar to the way the "ragged arrays" tutorial example is stored https://zarr.readthedocs.io/en/stable/tutorial.html#ragged-arrays), rather than trying to represent awkward arrays as non-awkward zarr arrays. Then there would be no need for irregular grid support, or for a separate offsets array at all.
We are not storing one ragged array, but a structured tree of these, and we want to be able to pick parts of this tree to load. Some will have common offsets arrays; some will have offsets of offsets. a Custom encoding is no use at all for this. Zarr lets us access the different components independently and load exactly what we need, which is why we are talking about this.
Sorry for the confusion in bringing up the ragged arrays example from the tutorial. To be clear, by "custom encoding" I just mean something other than re-using the normal zarr on-disk representation. I don't mean that it should be implemented as a zarr codec (which I understand is how the ragged arrays example from the tutorial is implemented). However, I do imagine that if you don't support chunking of variable-length dimensions, a reasonable on-disk representation might be similar to that used by the ragged arrays tutorial example, except that it would be better to avoid interleaving the sizes and the data.
If you have multiple awkward arrays, even if they have some shared sizes, but do not allow chunking of variable-length dimensions, you could still just represent them independently. The shared size information would be duplicated but that would likely be rather small compared to the size of the data. Some amount of duplication would probably help make the format more robust if writing is supported and your underlying storage doesn't support atomic updates to more than one file/key.
This issue is a starting point for discussing possible protocol extensions to allow support for various types of "awkward" arrays that do not fit into the model of arrays with fixed size dimensions and/or with fixed size data types.
For example, this includes "ragged" or "jagged" arrays which include at least one dimension of variable size. The simplest example of this would be an array of variable sized arrays of some simple type like integers or floats.
It may be necessary to divide up this space and split out into separate issues if it would make sense to break the problem down. Suggestions for how to do this welcome.
Some related material: