fsspec / kerchunk

Cloud-friendly access to archival data
https://fsspec.github.io/kerchunk/
MIT License
303 stars 78 forks source link

Support the Open Meteo custom data format #464

Open alxmrs opened 3 months ago

alxmrs commented 3 months ago

OM is a custom format created by and for open-meteo.com. It’s a Zarr-like format that I bet would be easy to Kerchunkify.

There seems to be a considerable catalog of weather data curated by Open Meteo on AWS: https://github.com/open-meteo/open-data

Here’s some more info about the format:

Some potential sharp edges:

alxmrs commented 3 months ago

xref: https://github.com/open-meteo/open-data/issues/11

martindurant commented 3 months ago

Sounds like a fun little project for someone with spare time or funding. Let's see if open-meteo is interested in your issue. Of course, I know nothing about the format, but "zarr-like" is good.

Each individual chunk varies in size

So maybe it needs ZEP0003, yet another reason why that should be adopted and released. Feel free to add another point in the zep discussion.

alxmrs commented 3 months ago

Thanks Martin! I added to the discussion.

patrick-zippenfenig commented 1 month ago

Over the past week I have been thinking about a portable implementation to access Open-Meteo files from different platforms. Currently, it is only available in Swift and tailored to a specific use-case. To make it more generic, a dedicated library in C/Swift/Rust could be implemented with bindings to different programming languages.

The most basic interface could look like:

let file = OmFile(path: "era5_temperature_2m_2020.om")
/// One year of era5 data for a given location
let data = file.read(offset=[340, 450, 0], count=[1,1,8760])

Of course this kind of interface is useless to support cloud-native operations. The interface should support:

Typically such an interface is realised by providing callbacks for IO and implement threading into the library. In combination with async this makes it very hard to make a portable library.

The Open-Meteo file format is relatively simple consisting only of (more details here):

A read operation needs 3 IO operations:

  1. Read meta data with dimension information
  2. Calculate which chunks are required and read data offsets in the lookup table
  3. Read compressed chunks, decompress them and return a data array to the user

Sans IO example using read/decompress functions

To eliminate any threading and IO, a library can provide functions that return the IO read offsets and separately accept data in a second function call. In a very simplified way, this could look like the following dummy code (Note: open_meteo_* function calls are kept C-style to make them as portable as possible)

let file = await FileHandle(path: "era5_temperature_2m_2020.om")

let params = ReadOperation(
  // Attributes from the file meta header
  dimensions = [3600, 1800, 8760],
  chunk_dimensions = [3, 3, 20],
  compression = .pfor,
  scalefactor = 20,

  // What to read
  read_offset = [340, 450, 0],
  read_length = [1, 1, 8760]
)

var output = [Float]()

// Which indices need to be read from the lookup table? Attributes `offset` and `length`
let indexRead = open_meteo_read_index(params)
let indexData = await file.read(indexRead.offset, indexRead.length)

// Which chunks need to be read from the data block? Attributes `offset` and `length`
let chunkRead = open_meteo_read_data(params, indexData)
let chunkData = await file.read(chunkRead.offset, chunkRead.length)

// Once data has been read, decompress chunks and place data into the output array
open_meteo_decompress_chunk(params, chunkData, &output)

print(output)

Using functions that return IO operations with offset and length makes it trivial to implement any kind of IO. The await file.read(offset, length) can be replaced by a HTTP library to fetch data from S3. A cache layer could be added as well.

Example sequential IO operations, no concurrency

As multiple data blocks must be read from file, it needs a couple of loops.

var output = [Float]()
var buffer = [Uint8](size: open_meteo_buffer_size(params))

while (let indexRead = open_meteo_read_index(params)) {
  let indexData = await file.read(indexRead.offset, indexRead.length)

  while (let chunkRead = open_meteo_read_data(params, indexData)) {
    let chunkData = await file.read(chunkRead.offset, chunkRead.length)
    open_meteo_decompress_chunk(params, chunkData, &buffer, &output)
  }
}

print(output)

Example full concurrency

Alternatively, reads could be batched, read concurrently and processed concurrently show-casting the full potential of this approach. (Note: Pure dummy code. This is not intended to be valid code)

// Automatically merge and break up IO to ideal sizes
// Merging is important to reduce the number of IO operations for the lookup table
// A maximum size will break up chunk reads. Otherwise a full file read could result in a single 20GB read.
let params = ReadOperation(
  [...]
  io_size_max = 65536
  io_size_merge = 512
)

// Submit all index reads concurrently (small number, 1-2 MB)
var indexDataFutures: [Future<T>]
while (let indexRead = open_meteo_read_index(params)) {
  let indexData = file.read(indexRead.offset, indexRead.length)
  indexDataFutures.append(indexData)
}
let indexDatas = await indexDataFutures

// Once we know the index, collect all data read concurrently (potentially big, up to 20GB)
var chunkDataFutures: [Future<T>]
for indexData in indexDatas {
  let chunkRead = open_meteo_read_data(params, indexData)
  let chunkData = file.read(chunkRead.offset, chunkRead.length)
  chunkDataFutures.append(chunkData)
}
let chunksDatas = await chunkDataFutures

// Once all data arrived, decompress it concurrently
// This could also be done while downloading data, to save memory
concurrent(cores=8) for chunkData in chunksDatas {
  // Each thread needs its own buffer
  var buffer = [Uint8](size: open_meteo_buffer_size(params))
  open_meteo_decompress_chunk(params, chunkData, &buffer, &output)
}

print(output)

The final interface could be as simple as providing the following functions:

In terms of IO read dependencies, a single index read will enable multiple chunk reads. This flow can be used to further optimise concurrent operations. E.g. while a part of an index is still being read, other data chunks can already be decompressed

flowchart LR
    ReadMeta --> ReadIndex45
    ReadMeta --> ReadIndex67
    ReadIndex45 --> ReadChunk42
    ReadIndex45 --> ReadChunk342
    ReadIndex45 --> ReadChunk372
    ReadIndex67 --> ReadChunk62
    ReadIndex67 --> ReadChunk32
    ReadChunk42 --> Decompress42
    ReadChunk342 --> Decompress342
    ReadChunk372 --> Decompress372
    ReadChunk62 --> Decompress62
    ReadChunk32 --> Decompress32

Separating IO and concurrent processing from the library, should make the library relatively straight forward. As long as all functions are strictly concurrency safe, the caller can chose freely how to do IO, async and concurrency.

The drawback is, that the library will be hard to implement in this fashion. However, I am using a similar approach already to write files as streams.

Another advantage is that this approach is very flexible. E.g. additional compression methods or filters can be added easily as callbacks. The library will be effectively reduced to handle how chunks are indexed, compressed and stored. It is therefore more an encoding than a file format and could be integrated as a binary blob into other systems.

Revise Format

The Open-Meteo format needs some smaller updates to be more generic:

I considered adding support for attributes or the ability to add multiple arrays in a single file, but this will increase complexity. If features like this will ever be required, the format could be wrapped inside a container providing more complex features.

Next steps

  1. I am not familiar with kerchunk and how it handles IO / concurrency. It would be great to receive feedback if the presented approach would fit kerchunk.

  2. I (or a contributor) could implement a reference implementation. I would start in Swift which could later be ported easily to C. The underlaying compression is based on a modified version of TurboPFor in C with SIMD. C is therefore somehow required. I would also like to implement some data filters in C with better SIMD support. Ideally, I would also like to reimplement the entire compression layer.

  3. Bindings for Python can be created based on the underlaying C implementation. The first version will only export the C-style functions as presented above. In later version a more user-friendly interface could be provided.

  4. Expose metadata for all models in https://github.com/open-meteo/open-data. To read all data provided, you will need some metadata information like the length of each time-chunk. This can be added as a simple JSON file.

martindurant commented 1 month ago

I haven't yet read the detail of what you have written, but I have a couple of thoughts from a quick scan.

First, the pattern where the metadata and chunk offsets for a given read operation are done, followed by reading a set of binary chunks for decompression: this is exactly the flow that kerchunk is made for. If those chunk offset/sizes can be extracted, there is no need for compiled custom code to be called at runtime - it can all be handled by kerchunk, and will come with the side benefits of distributable parallelism and concurrency for the fetches. Kerchunk supports JSON and parquet storage of the chunk information, the later being necessary where there are very large number of chunks. Indeed, this also gets around the "data is 2D problem", since the reference sets can index as many files as you like, to make the overall dataset many-dimensional.

The core question to be asked for kerchunking will be: what's the fundamental chunk size in the data, where "chunk" is a indivisible binary blob, usually a single compressed buffer. Of course, this is a factor of the data, but if the size is very small (~kbytes), then it becomes less practical to index potentially thousands of files, and perhaps it's better to regard each whole file as a chunk. Still may be worth trying, though!

Also, a quick glance at https://github.com/open-meteo/open-data?tab=readme-ov-file#file-format says that the data format is actually really simple. Do we need more than some numpy for finding all the offsets?

patrick-zippenfenig commented 1 month ago

Thanks for the quick reply!

Of course, this is a factor of the data, but if the size is very small (~kbytes), then it becomes less practical to index potentially thousands of files

Unfortunately, this is exactly the case. Chunks are very small (~ kbyte). Chunks are stored sequentially and can be indexed through a lookup table (Int64 Array). Only one Int64 address is stored per chunk. The length of each chunk can be calculated with the address of the next chunk.

The chunk index lookup table can contain millions of chunks and requires a couple of MB. Making the index table more efficient to require less than 64 bits per chunk is actually one of the improvements I would like to do for the next major version.

Do we need more than some numpy for finding all the offsets?

It would be possible to get all chunk offsets just by reading the index table as Int64, but I think it will be less practical store all chunks again in Parquet. This is not only to size, but updates to open-data repository get more complicated. Most models update every 1-3 hours and rewrite a good amount of data, that would need to be reindexed again. You also would have to implement filtering and compression which are non-standard and not available as Python bindings.

What would require indexing is to combine multiple "om"-files into a longer time-series. Files are split either by:

Each file then contains one year or 7*24 hourly time-steps which need to be reassembled into a larger array. This steps would require some logic in kerchunk.

martindurant commented 1 month ago

The chunk index lookup table can contain millions of chunks and requires a couple of MB. Making the index table more efficient to require less than 64 bits per chunk is actually one of the improvements I would like to do for the next major version.

Although kerchunk stores both offset and size for each chunk, the arrays are compressed and themselves chunked (size not connected to the number of chunks per file), so it's not inconceivable that parquet does better than the flat index in each file. Avoiding having to preemptively touch every file we will need at read time could be a big bonus.

You also would have to implement filtering and compression which are non-standard and not available as Python bindings.

Absolutely for the compression - I understand this is a totally custom algorithm.

Each file then contains one year or 7*24 hourly time-steps which need to be reassembled into a larger array. This steps would require some logic in kerchunk.

kerchunk can probably already do this, although whether you can simply use MultiZarrToZarr, I'm not yet certain.

patrick-zippenfenig commented 1 month ago

Absolutely for the compression - I understand this is a totally custom algorithm.

I will do some tests starting in September to provide the necessary building blocks to interact with OM-files. I have received more and more interest from different parties to provide Python bindings and I think with some work, the OM format could be an interesting option to store more environmental data and make it accessible as cloud-native formats.

martindurant commented 1 month ago

the OM format could be an interesting option to store more environmental data and make it accessible as cloud-native formats.

Sure, you are the expert! I'd also be happy to talk in general about the direction.