JackKelly / light-speed-io

Read & decompress many chunks of files at high speed
MIT License
57 stars 0 forks source link

Think about how to aggregate and/or scatter chunks when copying #24

Closed JackKelly closed 7 months ago

JackKelly commented 7 months ago

For example, for the ICON NWP data: each init time consists of 146,000 GRIB2 files! One file for each vertical level, variable, timestep.

Jacob has a script which converts ICON NWP GRIB2 to Zarr. This conversion takes 0.5 to 1 hour per init time, and the output compressed Zarr is about 50 GBytes per init time. Reading from a local SSD on one of Open Climate Fix's on-prem computers. Zarr is compressed using blosc2. No sub-selection.

(Did I get the details right, @jacobbieker?!)

How to merge multiple input files into a single output file in LSIO?

Maybe we should pass back to Python (async)? But that'll have a large performance impact.

Is there a way for users to tell LSIO to merge or split input files, in arbitrary, user-configurable ways?

TODO:

jacobbieker commented 7 months ago

Yes, that's right. The final ICON-EU size is actually 20ish GB, the ICON-Global is 50GB, processed examples are here: https://huggingface.co/datasets/openclimatefix/dwd-icon-eu/tree/main/data and the raw data is here: https://opendata.dwd.de/weather/nwp/icon-eu/grib/06/

JackKelly commented 7 months ago

Thinking this through:

We could think about this in terms of "scatter" and "gather" operations. Or maybe that's too restrictive, as illustrated by the example below.

Copying chunks of many GRIB2 files into a single Zarr chunk would be a "gather" op. To make life complicated, let's assume the GRIB2 files are compressed. And we want to put half each GRIB2 files into ZarrA, and the other half into ZarrB. It could would go something like this:

  1. Pre-allocate memory buffers for ZarrA and ZarrB.
  2. Create an AtomicUInt for ZarrA, and another AtomicUInt for ZarrB. These count the number of outstanding loading operations until the buffers for ZarrA or ZarrB are complete.
  3. For each GRIB2 file:
    1. Load all the GRIB2 file
    2. Decompress
    3. Split in half
    4. Put each half into the appropriate location in the ZarrA and zarrB buffers
    5. Decrement the AtomicUInts
    6. If the atomicUInts get to zero then compress & write the Zarr buffers to disk?

TODO: Think about this more! This is a very early draft!

JackKelly commented 7 months ago

We could think of this as:

  1. Read each individual source file
  2. Map each source file (decompress)
  3. Reduce (merge the first half of all GRIB files into ZarrA; and the second half into ZarrB)
  4. Map (compress each Zarr chunk)
  5. Write the compressed Zarrs to disk

So maybe there is a general-purpose data structure to specify this. Something like:

struct ReadMapReduceMapWrite { // Horrible name!
  source_file_chunks: Vec<(PathBuf, ByteRanges)>,

  // As soon as LSIO finishes reading source_file_chunk, LSIO
  // will start reading the source_file_chunks for the next ReadMapReduceMapWrite,
  // so LSIO can overlap reading from IO whilst running reduce_func and dst_ma_func.

  /// Applied to each byte range? Or applied to each (PathBuf, ByteRanges)?
  source_map_func: Fn([u8]) > [u8],

  /// Takes as input all the outputs of the source_map_func,
  /// along with their source location. And outputs 1 or more buffers, with their destination.
  reduce_func: Fn(ArrayQueue<([u8], PathBuf, ByteRange)>) -> Vec<([u8], PathBuf, ByteRange)>, 

  // Applied in parallel to each output of reduce_func
  // Data is written after reduce_func completes
  dst_map_func,
};
JackKelly commented 7 months ago

Or, put more responsibility on the user's code.

The reader and writers would run in their own threads, separate from the main thread. We'd receive groups of decompressed GRIBs from a channel. And then we'd send uncompressed output buffers to another channel.

Something like this:

let source_receiver = read_and_map([gribs_for_zarrA_and_zarrB, gribs_for_zarrC_and_zarrD], decompress_func);
for decompressed_gribs in source_receiver {
    let outputs_for_task = merge_into_zarrs(decompressed_gribs);
    for output_buf in outputs_for_task {
        writer_sender.send(MapAndWrite{map: compression, buf: output_buf, path: "foo", byte_ranges: vec![...]})
    }
}

I think this second approach might be more flexible. And, unlike the ReadMapReduceMapWrite, it's clear how this "channels" approach would allow us to have the source storage subsystem be different to the dst storage (e.g. reading from local SSD and writing to a cloud storage bucket.). So maybe I'm learning towards this second approach. Even though it involves yet more threads!

JackKelly commented 7 months ago

The thought process which started with this issue has ended in me totally changing the API design! So this specific issue is now out-of-date. For my latest plans, see https://github.com/JackKelly/light-speed-io/milestone/1