Open d70-t opened 2 years ago
In principle, this would be fine to do, and obviously it wouldn't bee too complicated to implement. Maybe even it opens up more types of intra- and inter-file data organisation. It would need a change both in the kerchunk repo (where the contents of a references JSON is described) and in fsspec (for using these).
Before all that, though, I am surprised that you might need to do this. Can you describe further the structure of your netCDF file? Actually, it all sounds rather like zarr's sharding model.
However, it should be possible to disambiguate this, by defining that single-element raw data blocks must always be written without the list.
That is already the case
Before all that, though, I am surprised that you might need to do this. Can you describe further the structure of your netCDF file? Actually, it all sounds rather like zarr's sharding model.
I've got a series of netCDF files which I'd like to concatenate using Kerchunk. Unfortunately, while those files logically fit together, some of them have been created using different internal chunkings. While I could in principle find the smallest common chunk size and use those for all of the files (I've to admit I didn't think about this when writing the original issue), this would significantly increas the size of the resulting references file. If I could instead join some of the oddly shaped chunks into larger chunks, I could stick with the smaller references file (which still is about 400MB of JSON in this case).
Interesting. So this only works if the chunks are uncompressed, right? Or do you think the kerchunk layer should also be able to call numcodecs directly?
Yes, this only works with uncompressed stuff (which it luckily is in my case).
Calling numcodecs sounds like a bit of an overkill for kerchunk... But maybe it isn't? Probably calling an arbitrary transformation function on a block of data would be the next step of generalization after flexible byte rearrangements.
The other option (for my use case) would be to have more flexible / irregular chunking in zarr, but that probably comes at a performance penalty (e.g. O(1) chunk index calculations won't be possible anymore). As kerchunk already copes with relatively irregular data, kerchunk might indeed be a better place for such things.
OK, I am convinced that this is a reasonable thing to do, worth the extra flexibility at a small complexity cost.
I think it might be worthwhile to also define a couple byte block combination schemes, with simple "concat" being the default. This would allow for a numcodecs filter to be applied to each chunk before concatenation. Also, we would think about whether we need to declare the (uncompressed?) sizes of each block, so that if someone wants to cat
only part of a virtual file, we don't need to load all subchunks. That will be important when zarr is able to pass intra-chunk selection down to the storage layer.
Yes, we'll definitely want to have the sizes of the pieces to be concatenated (for partial loads). For uncompressed things, this would be the size of the blocks themselves. In that case something like:
[["path", offset, size], ["path", offset, size], ["path", offset, size], ...]
would do the trick, but offset
and size
would be mandatory.
We could think of adding a fourth item to the inner lists, which could be a function (or maybe just an identifier for now) and then maybe a fifth which could be the resulting size after function application (e.g. after decompression). But I'm wondering if we'd want to continue by adding more and more items to the list (which would often use less space) or if we want to make it a dictionary? ... so far, the order ["path", offset, size, function, final_size]
seems to be a reasonable order (e.g. you'll never need the final_size
without the function
, but there may be cases in which the function
doesn't change the size, so one could skip the last element).
We might be able to re-use the template-system to compress the things going into the function
part 🤷 .
I'm wondering if we'd also want something which operates on all chunks at once? But probably we don't need that, because that could be handled by the users (e.g. zarr). Although I'm wondering if I captured what you've had in mind with other "block combination schemes"?
We might be able to re-use the template-system to compress the things going into the function part 🤷 .
Compression of JSON is pretty good, and decompression/parsing often faster than applying templates in python (it's part of what compression does!). Zstd seems consistently the best one of the common ones.
In the end, we want to store in something other than JSON to allow for lazy, chunk-wise loading of the references themselves for very large datasets. I am not certain how to achieve that. I wonder if the existing schema could be rationalised to parquet; but that's only worth doing if the many many strings can be stored without interleaved sizes or in "delta" mode.
Although I'm wondering if I captured what you've had in mind with other "block combination schemes"?
Yes, you understood me
I wonder if the existing schema could be rationalised to parquet
Putting the references into parquet is pretty awesome! I've tried to pack my 491MB JSON mentioned above to parquet and it turned out to be only 19MB :+1: and it doesn't need any templating. Just gzipping the JSON compressed to 40MB, which is also good, but parquet definitely looks better.
I'm wondering if we'd even have to care to translate the entire model (including templating and generators) to parquet, or we'd just use the builtin columnar compression, which seems to work out nicely.
We could probably replace the need for lists of references per key by repeated row-keys in the parquet table. E.g.:
{
"a/b": [["foo", 12, 4], ["bar", 122, 123]],
"a/c": ["baz", 5, 6]
}
could translate to: | key | path | offset | size |
---|---|---|---|---|
a/b | foo | 12 | 4 | |
a/b | bar | 122 | 123 | |
a/c | baz | 5 | 6 |
We could append function
and final_size
as additional columns.
... rethinking this a bit:
Using repeated keys only would make the table sensitive to its order, which might not be what we want. We could lift this by adding another column which would define the order. This could either be just an ordering key, or it could be something like a taget_offset
, e.g. specifying the resulting byte-offset in the assembled (virtual) files.
Using repeated keys only would make the table sensitive to its order, which might not be what we want. We could lift this by adding another column which would define the order. This could either be just an ordering key, or it could be something like a taget_offset, e.g. specifying the resulting byte-offset in the assembled (virtual) files.
Actually, parquet covers this directly by having a nested schema structure. Union types are possible, but rare, so in practice it would mean that you always have a list-of-lists, even if the lenth of the list is 1. This still gives you exactly the same amount of data packing. Note that this packing is likely much worse for the case that the embedded URLs are of a more typical length like "http://server.com:port/path/to/data". That's why parquet's delta-length and delta-string encodings would be very useful. In arrow's list, DELTA_BINARY_PACKED is given as read-only. fastparquet doesn't have this at all, but could easily (r/w), if we can find some sample files for testing.
However, the actually problem is on the python side. Do we want to walk the data structure and build python lists at load time? Probably not. We could use awkward to keep the data in original vectorised arrays; or maybe pandas compound indexes can save us. Either way, it makes things much more complicated!
I've not yet used the nested schema structure of parquet, do you have any recommended reading on this?
But probably I'd rather go for a flat layout if possible, as I find flat layouts often play much nicer with array libraries. For the example above, I could imagine having something like:
import pandas as pd
def show_sources(df, key):
size = 0
for row in df.loc[[key]].itertuples():
target_start = size
target_end = size + row.size
print(f"{row.path} {row.offset}...{row.offset + row.size} -> {target_start} ... {target_end}")
size += row.size
print(f"total size: {size}")
df = pd.DataFrame([
["a/b", "foo", 12, 4],
["a/b", "bar", 122, 123],
["a/c", "baz", 5, 6]],
columns=["key", "path", "offset", "size"]).set_index("key")
where
show_sources(df, "a/b")
returns
foo 12...16 -> 0 ... 4
bar 122...245 -> 4 ... 127
total size: 127
and
show_sources(df, "a/c")
returns
baz 5...11 -> 0 ... 6
total size: 6
Of course, show_sources
would be just a placeholder for something else which fetches the required source-references for the requested key. But this kind of layout would directly work on something which could be loaded as-is from parquet.
Note that this packing is likely much worse for the case that the embedded URLs are of a more typical length like "http://server.com:port/path/to/data".
I'm not sure, if I fully get this. I've had the impression that the example above has already quite a reasonable compression (491 MB -> 19 MB, which is 4%) and was referencing a bunch of netCDF files using absolute (local) filesystem-pathes of (I'd say) a typical length. However, as I just stored them using pandas.DataFrame.to_parquet()
with default options, I don't really know how it was compressed. Unfortunately, I currently have no access to the machine I tried it on, so I can't provide further details on that test.
... of course, if DELTA_BINARY_PACKED
would provide an even better compression ratio, I wouldn't say no :-)
I've had the impression that the example above has already quite a reasonable compression (491 MB -> 19 MB, which is 4%)
I mean the you will probably not do this well when the strings are a bigger fraction of the total data.
I wonder how we could combine parquet with lazy lookups and filtering on values and columns (i.e., only load what you need). Maybe I am thinking too far ahead.
How many keys in a 491MB JSON file?
How many keys in a 491MB JSON file?
Unfortunately, I currently can't look this up, maybe tomorrow.
But the typical len(path)
of each reference was about 80-90 bytes and I'd estimate that I've had roughly 4e6 keys.
I wonder how we could combine parquet with lazy lookups and filtering on values and columns (i.e., only load what you need). Maybe I am thinking too far ahead.
This would be great. Probably I don't know enough about parquet to be of help here, so I'm just thinking out loud.
If this would be a useful line of thinking, then this looks like either a partitioned parquet (but I don't know if this is possible on ranges instead of unique values), or like a bunch of individual parquets and a meta-parquet referencing the ones below...
Another question would be: how much does this help in reality? For this to work, the fraction of requested chunks should be low, but that depends a lot on the ordering and the access pattern of the key-space... My gut feeling would say, we'd need more than 1e8 - 1e9 keys for this to start becoming useful. But maybe that's not an unreasonable number.
Yes, exactly that kind of thing. If the key column is ordered, parquet can keep hold of the max/min in each "row group" to allow only loading certain parts for a given request of keys. You can even partition by directory structure, so you could to some extent mirror the dataset directory layout. As you say, none of this is too important in the ~10**6 keys regime, but maybe it will be eventually. I wonder how well/fast pandas' indexing copes with possibly multiple rows matching a key label: maybe it's fast, and maybe it can take advantage or odering.
This sounds very nice! I'm wondering if partitioning by directory structure would even be needed, if row groups keep min/max values (I currently think of row groups somewhat like partitions, don't know if that's right). If that's directly supported by pandas / dask as well, then lazy loading of those tables seems to be relatively straightforward.
The ordering thing still bothers me a little (and also goes back to what I've mentioned in this comment). We would like to have as many hits as possible within each loaded "row group", while loading as few as possible of them. To me, this looks quite similar to the sharding idea: we want a group of adjacent (in n dimensions) keys close together in "row groups". But as currently the lexicographic ordering of keys doesn't align with n-dimensional adjacency, this won't work out of the box. (But that would then be more like a zarr issue than a kerchunk issue).
How many keys in a 491MB JSON file?
I've had 4098642 keys in that JSON file of which I've written 4098603 into the parquet (the others were inline-keys and thus didn't fit the tabular layout).
Note that the non-key data and inlined keys would fit naturally into parquet's user key-value metadata store (so long as it's relatively small).
Note that the non-key data and inlined keys would fit naturally into parquet's user key-value metadata store (so long as it's relatively small).
Perfect! Yes, I'd suspect that for most use cases, the inlined keys should be a tiny fraction of the keys. Probably it will be best to just do a lookup for both: first into a small kv-store with inlined keys and then into the large table of references.
@martindurant I've experimented a bit using some ideas from above: https://github.com/d70-t/preffs It's a very rudimentary and likely broken implementation of a parquet-based reference filesystem (with eager parquet-loading tough). It supports, references, in-line data and concatenation. I used it to bring down a 6.8 GB reference-JSON (about 60 Million entries) down to 360 MB parquet. The loading time went down from over 20 minutes for json to less than 1 minute for parquet. (Those numbers are all with v0 references, as jinja2 slowed down the things too much to wait for it)
I'll likely not have the time to work on that project, but it seems to be a very useful direction to go for (especially if lazy loading is on the horizon), so I thought I'll share it. Maybe there's someone else to find some time?
I'm not sure if this should go here or to the main fsspec repo.
I'd like to use the reference filesystem to concatenate pieces. That is, for one reference, I'd like to specify a list of things to join together for making up a new file in stead of just having a single pointer. This could roughly look like:
etc... Using that method, one could completely rearrange existing files. In my current application, I'd like to join existing chunks of uncompressed netCDF file into a single larger chunk to be used within zarr.
A potential issue might be, that the following would become ambiguous:
This could refer to either just a single piece of raw data containing the text "https://test" or a reference to the entire object behind the link. However, it should be possible to disambiguate this, by defining that single-element raw data blocks must always be written without the list.