JackKelly / light-speed-io

Read & decompress many chunks of files at high speed
MIT License
58 stars 0 forks source link

What would user's code would look like the for simple use-cases? (After dropping `async`, and instead only using `channels` and `Rayon`)? #104

Open JackKelly opened 6 months ago

JackKelly commented 6 months ago

For reference: Here's a quick code sketch I wrote, back when I was planning to use Tokio with Rayon.

Use cases

1) File conversion & re-chunking

  1. Send LSIO a list of millions of chunks to read.
  2. Have a Rayon threadpool which decompresses those chunks as soon as they arrive from disk. Tell the IO code to re-use IO buffers as soon as the data has been decompressed from an IO buffer.
  3. Have a thread which "reduces" and creates output chunks. Let's say that each output chunk contains 10 input chunks.
  4. Rayon threadpool compresses those chunks.
  5. And writes those compressed chunks back into the io_uring channel for writing to disk.

2) ML training: Load random crops of Zarr data from disk.

3) Vincent's use-case: Loading timeseries (added 2024-04-04)

Imagine the chunks are arranged like this:

0.0   0.1   0.2   0.3
1.0   1.1   1.2   1.3
2.0   2.1   2.2   2.3
3.0   3.1   3.2   3.3

Each row is a timeseries. So, for example, chunks 0.0, 0.1, 0.2, 0.3 have to be given to the user in order.

But the decompression could happen out-of-order.

Perhaps the solution is for the user to group each row. And then, within a row, decompress chunks in any order. And then re-order the decompressed chunks before delivering to the user?

JackKelly commented 6 months ago

UPDATE: This attempt is pretty dumb :slightly_smiling_face:. See the comment below for a new, improved approach!


Here's an attempt at making use-case 1 work:

Inputs:

const MAX_N_BUFFERS: usize = 1024;
let mut uring_local = IoUringLocal::new(MAX_N_BUFFERS);

// Start loading the files in a separate threadpool. Completed buffers will
// appear in `rx_of_filled_buffers`. We apply back-pressure by either letting
// `rx_of_filled_buffers` fill to `MAX_N_BUFFERS`, or by not telling `uring_local`
// that it can re-use buffers. This will not block. This will start loading immediately.
let rx_of_filled_buffers = uring_local.get(src_filenames);

// Decompress buffers:
// This will run `decompression_function` on each chunk using separate Rayon threadpool:
let decompressor = ChannelProcessor::new();
let rx_of_decompressed_buffers = decompressor.map(rx_of_filled_buffers, decompression_function);

// Tell io_uring to re-use IO buffers as soon as decompression has finished for each chunk:
// This function passes-through each completed chunk:
rx_of_decompressed_buffers = uring_local.reuse_buffers(rx_of_decompressed_buffers);

// Reduce:
let rx_of_reduced_buffers = reduce(rx_of_decompressed_buffers, 10);

// Compress:
let rx_of_compressed_output_buffers = compressor.map(rx_of_reduced_buffers, compression_function);

// Write. Behind the scenes, this will be added to the same uring threadpool's list of operations, I guess?
uring_local.put(rx_of_compressed_output_buffers);
JackKelly commented 6 months ago

Writing the code in the comment above give me an idea that I've summarised in issue #106

JackKelly commented 6 months ago

UPDATE 2024-04-03: The code below is out-of-date! Please see this code sketch for the latest ideas: https://github.com/JackKelly/light-speed-io/blob/new-design-March-2024/src/new_design_march_2024.rs


Let me try again, now that I've figured out that we can just use iterators (see this code sketch)!

Structs

struct ByteRange<M> {
    byte_range: Range<isize>,

    // metadata is used to identify this byte range.
    // For example, in Zarr, this would be used to identify the
    // location at which this chunk appears in the merged array.
    metadata: M,
}

enum OperationKind {
    GetRanges,
    PutRanges,
}

struct Operation<M> {
    operation_kind: OperationKind,
    buffers: Option<Vec<AlignedBuffer>>, // For PutRanges
    byte_ranges: Option<Vec<ByteRange<M>>>,  // For GetRanges and PutRanges
    filename: CString,  // For GetRanges and PutRanges
}

impl<M> Operation<M> {
    /// If the user submits a GetRanges operation with an invalid filename then
    /// the user will receive a single Err(std::io::ErrorKind::NotFound) with context
    /// that describes the filename that failed.
    /// If a subset of the `byte_ranges` results in an error (e.g. reading beyond
    /// end of the file) then the user will receive a mixture of `Ok(Output::Buffer)`
    /// and `Err`, where the `Err` will include context such as the filename and byte_range.
    fn get_ranges(filename, byte_ranges) -> Self<M> {
    }

    fn put_ranges(filename, byte_ranges, buffers) -> Self<M> {
        // TODO: Maybe we also need a `slices: &[u8]` field, which gives one slice
        // per `byte_range`, whilst also having a `buffers` field to own the `AlignedBuffer`.
    }
}

struct OpGroup<M> {
    operations: Receiver<Operation<M>>,

    // Metadata for the whole group. Such as the filename of the merged output.
    metadata: M,
}

struct Output<M> {
    // Each `byte_range` within an `Operation::GetRanges` returns a `Buffer`.
    operation_kind: OperationKind,
    buffer: Option<AlignedBuffer>,
    byte_range: Option<ByteRange<M>>,
}

struct OutputGroup<GROUPMETA, OUTPUTMETA> {
    // We use a `Receiver` so we can process the next `Buffer` as soon as the producing
    // thread finishes each `Buffer`:
    // 
    outputs: Receiver<Result<Output<OUTPUTMETA>>>,

    // Metadata for the group (e.g. the output filename).
    metadata: <GROUPMETA>,
}

User code

const MAX_N_BUFFERS: usize = 1024;
let mut uring_local = IoUringLocal::new(MAX_N_BUFFERS);

let mut submission_queue: Sender<OpGroup> = uring_local.submission();

// Define operations to get a bunch of files:
let get_group_0 = OpGroup::new()
    .extend(!vec[
        Operation::get_ranges("foo.0.0", 0..-1),
        Operation::get_ranges("foo.0.1", 0..-1),
    ])
    .metadata(OutputFilename("foo_0"));

// Define operations to get a bunch of files:
let get_group_1 = OpGroup::new()
    .extend(!vec[
        Operation::get_ranges("foo.1.0", 0..-1),
        Operation::get_ranges("foo.1.1", 0..-1),
    ])
    .metadata(OutputFilename("foo_1"));

// Start loading the files in a separate threadpool:
submission_queue.send(get_group_0).unwrap();
submission_queue.send(get_group_1).unwrap();

// uring_local will load all operations from `get_group_0`. And then from `get_group_1`.
// Now we can wait on the completed items.

let completion_queue: Receiver<OutputGroup> = uring_local.completion();

let mut buffer_recycling_queue = uring_local.buffer_recycling_queue();

completion_queue.into_iter().par_bridge().for_each(|output_group: OutputGroup| {
    let out = output_group.outputs.into_iter().par_bridge()
        .map(|output| {
            assert_eq!(output.operation_kind, GetRanges);
            let decompressed = decompress(&output.buffer.unwrap());
            buffer_recycling_queue.send(output.buffer.take()).unwrap();
            decompressed
        })
        .reduce(reduce_func);
    let out = compress(out);

    // Write `out` to disk:
    let put_op = Operation::put_ranges(output_group.metadata.output_filename, 0..-1, out);
    let op_group = OpGroup::new().append(put_op);
    submission_queue.send(op_group);  // Does not block.
});
JackKelly commented 6 months ago

Actually, I don't think it's acceptable for the output channel to be a Receiver<Vec<Chunk>>. Because we want to start processing chunks as soon as they're ready. Maybe a channel of channels?! Or a channel of crossbeam queues?

But we probably do want the input channel to be a Sender<Vec<IoOperation>>. Or - perhaps better - a Sender<IoOpGroup> (so we can attach metadata for the whole group).

UPDATE: I've updated the code in the comment above

JackKelly commented 5 months ago

UPDATE 2024-04-03: The code above is out-of-date! Please see this code sketch for the latest ideas: https://github.com/JackKelly/light-speed-io/blob/new-design-March-2024/src/new_design_march_2024.rs

JackKelly commented 5 months ago

I need to think more about what the user code would look like (to satisfy the use-cases above) with the new code sketch. Some questions that spring to mind:

JackKelly commented 5 months ago

can we use a flat structure

We can't have a flat list, where list items define the end of groups, because Rayon processes things out-of-order. So the following code never seems to print 0, 1, 2. Instead it print 0, 1. Or even just 0.

use rayon::prelude::*;

#[derive(Debug)]
enum Operation {
    Get(u8),
    EndOfGroup(u8),
}

fn main() {
    let ops = vec![
        Operation::Get(0),
        Operation::Get(1),
        Operation::Get(2),
        Operation::EndOfGroup(0),
        Operation::Get(10),
        Operation::Get(11),
        Operation::EndOfGroup(1),
    ];

    // Oops: Rayon may process these items out-of-order. So we might start loading group 1 before
    // we hit the EndOfGroup(0).
    ops.into_par_iter()
        .map(|op| {
            if matches!(op, Operation::Get { .. }) {
                Some(op)
            } else {
                None
            }
        })
        .while_some()
        .for_each(|op| println!("{:?}", op));
}

So I think we may have to use a channel of channels for the completion queue!

JackKelly commented 5 months ago

I should test the channel of channels idea in my Rust playground

JackKelly commented 5 months ago

Yup, using channels of channels works for both submission and completion!

use rayon::prelude::*;

#[derive(Debug)]
enum Operation {
    Get(u8),
}

fn main() {
    let (completion_tx, completion_rx) = crossbeam::channel::bounded(4);

    {
        let (submission_tx, submission_rx) = crossbeam::channel::bounded(4);

        // Send the first group of operations:
        let (inner_submission_tx_0, inner_submission_rx_0) = crossbeam::channel::bounded(4);
        vec![
            Operation::Get(1),
            Operation::Get(2),
            Operation::Get(3),
            Operation::Get(4),
        ]
        .into_iter()
        .for_each(|op| inner_submission_tx_0.send(op).unwrap());
        submission_tx.send(inner_submission_rx_0).unwrap();
        drop(inner_submission_tx_0);

        // Send the second group of operations:
        let (inner_submission_tx_1, inner_submission_rx_1) = crossbeam::channel::bounded(4);
        vec![
            Operation::Get(6),
            Operation::Get(7),
            Operation::Get(8),
            Operation::Get(9),
        ]
        .into_iter()
        .for_each(|op| inner_submission_tx_1.send(op).unwrap());
        submission_tx.send(inner_submission_rx_1).unwrap();
        drop(inner_submission_tx_1);

        // Hang up the submission_tx, otherwise we'll never finish!
        drop(submission_tx);

        // "Process" the submission queue, and send data to the completion queue:
        submission_rx.into_iter().par_bridge().for_each(|inner| {
            let (inner_completion_tx, inner_completion_rx) =
                crossbeam::channel::bounded::<Operation>(4);
            inner
                .into_iter()
                .par_bridge()
                .for_each(|Operation::Get(x)| {
                    inner_completion_tx.send(Operation::Get(x * 10)).unwrap()
                });
            completion_tx.send(inner_completion_rx).unwrap();
        });
    }

    drop(completion_tx);

    completion_rx.into_iter().for_each(|inner| {
        println!("GROUP:");
        inner.into_iter().for_each(|op| println!("{op:?}"));
    });
}

From: https://github.com/JackKelly/rust-playground/blob/493da54d3f5bd2373501374f7cc71d70c7c8de4a/groups-within-iterators/src/main.rs