JackKelly / light-speed-io

Read & decompress many chunks of files at high speed
MIT License
61 stars 0 forks source link

Define custom `IoUringLocal::get*` methods which use `io_uring` under the hood. #30

Closed JackKelly closed 9 months ago

JackKelly commented 10 months ago

Methods to implement:

Maybe it'd work something like this:

struct WorkerThread {
    handle: Handle,
    sender: mpsc::Sender,  // Channel to send ops to the worker thread
}

struct IoUringLocal {
    local_file_system: LocalFileSystem, // So we can delegate methods to LFS
    worker_thread: WorkerThread,
}

TODO:

JackKelly commented 10 months ago

Maybe the "IO backend" should always accept a channel of Operations and return a Stream of Outputs.

Then we can easily use that backend for BatchObjectStore. And we can probably also use that backend elegantly for ObjectStoreToThread, although I need to think more about that. And maybe rename ObjectStoreToThread to ObjectStoreToStream?

The Stream backend could keep track of a generic struct which, for the ObectStoreToStream would keep track of the SharedState (slightly related to #26). Or maybe we don't even need a struct: what other fields would we need to keep track of per instruction? Maybe we just need a generic type parameter (so we can keep that type in a vec or map).

Although need to think about what happens if we optimise the reads. But maybe I'll ignore that for now!

JackKelly commented 10 months ago

Getting there!

Some next steps: UPDATE: Moved next steps into the comment at the top of this issue

maximedion2 commented 9 months ago

Methods to implement:

* `get`

* `get_range`

* `get_ranges`

Maybe it'd work something like this:

struct WorkerThread {
    handle: Handle,
    sender: mpsc::Sender,  // Channel to send ops to the worker thread
}

struct IoUringLocal {
    local_file_system: LocalFileSystem, // So we can delegate methods to LFS
    worker_thread: WorkerThread,
}
* `IoUringLocal::new` would start a new operating system thread that owns an `io_uring` instance.

  * Perhaps the default for `io_uring` would be for the kernel to have a thread which checks the submission queue, so we don't have to do a system call for every file.

* Calling `io_uring_local.get()` (on the main thread) would:

  * create an instance of a custom `Future`, which defaults to return `Pending` when polled, and which contains the details of the IO operation.
  * Wrap this `Future` in a `Mutex` inside an `Arc`.
  * A clone of this `Arc<Mutex<Future>>` would be sent to the worker thread via a channel.
  * Another clone would be returned from `get()`. (See the custom `Future` in the Rust paperback book). (I think we need to wrap the Future in an Arc because my code, and the executor will both try to read the `Future`)

* The io_uring `user_data` would be a raw pointer to the `Arc`. (I'm not sure if we should use [`Arc::into_raw`](https://doc.rust-lang.org/std/sync/struct.Arc.html#method.into_raw) and [`Arc::from_raw`](https://doc.rust-lang.org/std/sync/struct.Arc.html#method.from_raw); or [`Arc::as_ptr`](https://doc.rust-lang.org/std/sync/struct.Arc.html#method.as_ptr) (which gets a raw pointer to the data)).

  * Or, perhaps safer (but slower?) keep a map of Futures in flight, and delete those map entries when the CQE arrives. Send an ID through the user_data.
  * Or, perhaps better, have a `Vec<Option<&Future>>`. That `Vec` will be the length of the SQ. Reuse entries in the `Vec` when CQEs arrive.

    * How to know which `Vec` entries can be re-used (because CQEs might arrive in any order)? Have a simple ring buffer of the `Vec` indicies that are available. I _think_ we can use [`VecDeque`](https://doc.rust-lang.org/std/collections/vec_deque/struct.VecDeque.html) as a ring buffer. (If not, could just have a fixed-length `Vec` with a `front: usize` and `back: usize` variables, which index into the `Vec`. Or use [`ringbuf::LocalRb`](https://lib.rs/crates/ringbuf).).

* The worker thread would work like [my `io_uring` code works right now](https://github.com/JackKelly/light-speed-io/milestone/1):

  * Keep the SQ topped up, but don't have more than, say, 64 SQEs in flight at any one time.
  * Just before submitting an SQE to io_uring, check what type of operation it is. For example, if this is a `read`, then we'd stat the file, create a buffer, then submit a chain of open-read-close ops to io_uring (see [Chain `<open><read><close>` when we have many files to open #1](https://github.com/JackKelly/light-speed-io/issues/1)) (so I guess we'd need to wait for no more than 61 SQEs to be in flight, so we can submit 3 SQEs?).

    * Need to think how to elegantly represent all `ObjectStore` operations, perhaps re-using `object_store` structs like  [`GetResult`](https://docs.rs/object_store/latest/object_store/struct.GetResult.html) and io_uring structs like [`Entry`](https://docs.rs/io-uring/latest/io_uring/squeue/struct.Entry.html)
    * In the MVP, let's `stat` using the "normal" blocking method. But, in the future, it'd be nice to use io_uring to `stat`, too. Not sure if we should use one `io_uring` for `stat`ing, and another for `read`ing? Or just use a single io_uring?
  * Block waiting for at least one CQE. (Blocking is fine because we're on a dedicated OS thread)
  * When a CQE arrives:

    * deref the `Future` (or find it in the map or Vec)
    * set the payload in the `Future`
    * tell the `Future` to return `Ready` when next polled
    * call `Future.wake()`
    * make sure the `Arc` reference count is correct

TODO:

* [x]  Understand `ObjectStore::get*` return types

  * [x]  Read about `Pin` (in Rust paperback book)

* [x]  Do I need to define my own `Future`?

* [x]  Prepare the `squeue::Entry` in `prep_operation_for_io_uring`.

* [x]  [Keep track of the `OperationWithCallbacks` in flight in `thread.rs` #42](https://github.com/JackKelly/light-speed-io/issues/42)

* [x]  Call `execute_callback` when the data arrives from io_uring

* [x]  Test!

* [ ]  [Get filesize using io_uring #41](https://github.com/JackKelly/light-speed-io/issues/41)

* [ ]  [Chain `<open><read><close>` when we have many files to open #1](https://github.com/JackKelly/light-speed-io/issues/1)

This is really interesting! I've been looking at the other comments you made, related to this, as well as the implementation (which is still a work in progress of course), I had a couple high level questions, just to make sure I follow (again, at a high level, the finer details are quite tricky I'm sure).

First, it sounds like if this the solution you go with, ObjectStore::get() (implemented for local files) would use IoUringLocal::get() under the hood, but I was wondering where exactly would the, say, 64 in flight SQEs limit be enforced? Would it be by the caller of ObjectStore::get(), somewhere in between the ObjectStore and the IoUringLocal, or within IoUringLocal? If it's not the first option, that is, this limit is opaque to the caller of ObjectStore::get(), then somewhere inside the method calls, something would keep track of what was submitted to the SQ, but also of what hasn't yet been submitted, and as you said would keep the SQ topped off by submitting to the SQ as soon as something is picked up from the CQ? And the Futures returned by ObjectStore::get() would not know anything about this right, by which I mean that regardless of if an IO operation is waiting to be submitted to the SQ, or if it's in the SQ but has yet to finish, the Futures would poll Pending either way?

My other question is also somewhat general, but motivated by a specific use case. When submitting a lot of IO operations in a batch, and assuming a lot (64, 128, something like that) of operations are pushed to the SQ at once, can there ever be a way to preserve some sort of order in which operations will complete? The use case I have in mind is consuming data from a Zarr store with more than one array. Say I have a store with N "variables" (each stored in its own 2D array, with all N arrays being exactly of the same size, since they represent different variables on the same spatial grid), and let's say I have a use case where I can't do anything with "data chunk x.y" until I have the data for chunk files x.y for all N variables (there are N such files), could the approach described here be problematic? What I mean is, if I use ObjectStore::get() to effectively submit N batches of IO operations, one per variable, giving me N batches of Futures, if there is no guaranteed order, it could take a while before I have a single "full chunk" ready, especially if N is large (say 10, or 20), since the same x.y file would need to complete its IO operation across all N variables before I could do anything. Does my question make sense here? Could be that I misunderstood something and that this is actually not a problem at all, or that at this early stage you're not really considering cases like this. I'm basically just trying to think about eventual use cases I'd like to deal with, and trying to see if it "plays well" with what you have in mind (admittedly not your concern at all, just trying to wrap my head around your proposal).

JackKelly commented 9 months ago

Hi @maximedion2! Thanks so much for your interest in this project! You're the first person (outside of my colleagues in Open Climate Fix) to comment on this repo :slightly_smiling_face:

The first thing to mention is that the API is still very much in flux. I'm working on this code quite actively at the moment, and almost every day of work on this project brings new insights into how this could work.

A quick bit of news: Just today I benchmarked this (very early) code. As far as I can tell, LSIO's get is already 4x faster than object_store::LocalFileSystem::get :slightly_smiling_face: (on the very limited benchmark I ran today. More details here.) And there are lots more ways to optimise.

OK, to answer your questions:

where exactly would the, say, 64 in flight SQEs limit be enforced?

It's enforced within LSIO, in this (messy) code.

And the Futures returned by ObjectStore::get() would not know anything about this right, by which I mean that regardless of if an IO operation is waiting to be submitted to the SQ, or if it's in the SQ but has yet to finish, the Futures would poll Pending either way?

Yes, exactly, the Future will poll::Pending until the operation emerges from io_uring's completion queue, and as processes by LSIO.

You can also see how this works, from the user's perspective, in this simple benchmark code. We first get a vector of 1,000 filenames.

Then we fill a second vector with the 1,000 Futures returned by get(filename). This loops takes about 1 ms to complete. This submits all 1,000 operations to a channel. The io_uring_local thread pops operations from this channel, and keeps io_uring's SQ topped up with 32 SQEs.

can there ever be a way to preserve some sort of order in which operations will complete?

Great question! The short answer is that, if I extend the ObjectStore API, then I'd plan to enable users to submit operations in groups. LSIO would guarantee that all operations in group n would complete before starting group n+1. Would that work for your use-case? (I ramble a bit about the idea of groups in issue #25, although some of the content in that issue is now out-of-date!)

maximedion2 commented 9 months ago

Of course! This is a pretty interesting project, I'm trying to learn as much as I can from it and to see if anything here can apply to this (slightly pie-in-the-sky) project I'm working on (not nearly as much as I'd like)! Good to know early benchmarks are promising!

Great question! The short answer is that, if I extend the ObjectStore API, then I'd plan to enable users to submit operations in groups. LSIO would guarantee that all operations in group n would complete before starting group n+1. Would that work for your use-case? (I ramble a bit about the idea of groups in issue https://github.com/JackKelly/light-speed-io/issues/25, although some of the content in that issue is now out-of-date!)

Hmm yeah that makes sense. Can I ask though, under the hood, would groups be handled by only submitting one group of operations at a time to the SQ (assuming the group fits in the 32 operations, if not it could be split)? If someone calls this method for grouped operations with, say, 100 groups with only 2 operations per group, would that basically kill the potential performance gains from using io_uring? That would be fine, to be clear, I'm mostly asking because I'm working on something where producing a stream of data directly based on chunks in a store seems like a logical think to do, but if, for example, getting the most out of io_uring requires effectively grouping chunks together, I might refactor things now, very early in my project, while it's easy. Sharding is also something that could lead to chunk grouping anyway, and I'm looking into that right now.

BTW, sorry I have a quick, unrelated question, since I think you and your team use Zarr a lot. What Python package do you currently use for it? I'm looking into version 3, and I'm trying to produce some sample data to help with testing my project, but zarr-python seems to produce things that don't quite line up with the specs here, so I was just curious to see what you guys use (you mentioned using sharded Zarr v3 before I think).

JackKelly commented 9 months ago

under the hood, would groups be handled by only submitting one group of operations at a time to the SQ... If someone calls this method for grouped operations with, say, 100 groups with only 2 operations per group, would that basically kill the potential performance gains from using io_uring?

Great question!

The short answer is: I don't yet know the performance implications of submitting many groups, where each group has a small number of operations. But I'm very keen to make it work well :slightly_smiling_face:. I know some people need every file to be delivered to their application in strict order, and so some people will have 1 operation per group!

I haven't yet figured out exactly how I'll implement this. But I've started a new issue (#68) to start fleshing out some ideas about implementing groups.

What Python package do you currently use for [Zarr]?

In Open Climate Fix, we mostly use Zarr-Python v2, and we use zarr arrays which conform to v2 of the Zarr file format specification.

It is a bit confusing but it's important to distinguish between the version of the Zarr-Python package versus the version of the Zarr file format spec.

My understanding is that Zarr-Python v2 doesn't fully conform to v3 of the Zarr file format spec. But folks are working hard on version 3.0 of the Zarr-Python package. And Zarr-Python v3.0 should fully support v3 of the Zarr file format spec!

If you want a Python package that outputs data that conforms to v3 of the Zarr file format spec, then try Zarrita.

maximedion2 commented 9 months ago

I haven't yet figured out exactly how I'll implement this. But I've started a new issue (https://github.com/JackKelly/light-speed-io/issues/68) to start fleshing out some ideas about implementing groups.

Ah, I see, so there's an io_uring mechanism to effectively allow things to complete in batches, that makes sense. But yes I assume it's impossible to get a clear performance picture without actually trying it. For what it's worth, in my view it's perfectly acceptable for users to have to consider performance trade-offs when they fetch batches, if for example it's really hard to get good performance of single operation groups. As I said I'm mostly just trying to stay up to date with the latest developments, to make sure whatever work I do is compatible with the latest and greatest! This is all very interesting, looking forward to learning more about it, I'll keep an eye out for any new developments in this repo!

Oh and thanks for the info on Zarr-Python, I briefly checked and Zarrita does indeed conform to the specs, that's good to know.