lancedb / lance

Modern columnar data format for ML and LLMs implemented in Rust. Convert from parquet in 2 lines of code for 100x faster random access, vector index, and data versioning. Compatible with Pandas, DuckDB, Polars, Pyarrow, and PyTorch with more integrations coming..
https://lancedb.github.io/lance/
Apache License 2.0
3.93k stars 216 forks source link

Move IO to separate scheduler #1712

Open wjones127 opened 11 months ago

wjones127 commented 11 months ago

Motivation

Right now, our IO parallelism is choppy and inconsistent. This is essentially due to three issues: ad-hoc parallelism settings, no queuing of IO tasks, and CPU-tasks blocking IO tasks.

Right now, our IO parallelism is determined ad-hoc as we call .buffered() and .buffer_unordered() throughout the codebase. This leads to a sort of tree where we read N fragments in parallel, N * M batches in parallel (each fragment reading M batches in parallel), and N * M * K columns in parallel. Each level of parallelism is usually set as num_cpus::get() or num_cpus::get() * 4, which is sometimes adequate for machines with large CPUs but often inadequate for smaller machines. This is a very indirect way to control the amount of allowed IO parallelism, and is not consistent throughput all code paths.

The .buffered() calls also cause the amount of IO parallelism to vary over time. After the first round of tasks, the system must wait for the first fragment's tasks to finish before a new fragment's worth of tasks can start. This causes a sort of sawtooth pattern in IO utilization:

image

In addition, we often interleave IO tasks and CPU-bound tasks. For example, while reading batches of data, we will apply filters or masks as we read. This often blocks new IO tasks from starting.

Solution

We can move IO into a separate scheduler:

                     SCHEDULER
                     ┌─────────────────────────────────────────┐
                     │                                         │
                     │    IO QUEUE                             │
                     │   ┌─────────┬─────┬─────────┐           │
                     │   │         │     │         │           │
     get_range ──────┼───► Request │ ... │ Request │           │
         ▲           │   │         │     │         │           │
         │           │   └─────────┴─────┴─────┬───┘           │
         │           │                         │               │
         │           │                         │               │
         │           │                         │ IO TASK POOL  │
         │           │                     ┌───▼────┐          │
         │           │                     │        │          │
         └───────────┼─────────────────────┤ Future │          │
                     │                     │        │          │
                     │                     ├────────┤          │
                     │                     │        │          │
                     │                     │ Future │          │
                     │                     │        │          │
                     │                     ├────────┤          │
                     │                     │        │          │
                     │                     │ Future │          │
                     │                     │        │          │
                     │                     └────────┘          │
                     │                                         │
                     └─────────────────────────────────────────┘

The right user facing knobs

There are three resources of concern here:

  1. The amount of memory we can use to buffer batches
  2. The number of IO requests we can make concurrently.
  3. The amount of scheduling overhead.

Right now, the {batch,fragment}_readahead} control both (1), (2), and (3). With the scheduler, we can separate them into a more global setting max_parallel_io, which directly controls (2), and {batch,fragment}_readahead, which will control (1) and (3).

Potential extensions

By bringing IO calls to a more central location, we can also do more intelligent management of IO. This includes:

westonpace commented 11 months ago

+1 although I don't think we can replace buffered with try_join_all because we still need the streaming interface (e.g. try_join_all, on a stream of batches, would load all batches in memory). We can replace it with something that has the same semantics but adds all calls to the I/O scheduler though.

What's the scope of the scheduler? 1 per operation? 1 per dataset? 1 per process?

wjones127 commented 11 months ago

What's the scope of the scheduler? 1 per operation? 1 per dataset? 1 per process?

I think I lean towards either 1 per dataset or 1 per process. For ease of implementation, I think it makes most sense to wrap an object store.

WDYT?

westonpace commented 11 months ago

Both 1 per dataset and 1 per process sound good to me. If I had to pick I'd go with 1 per process since the I/O constraints (memory, parallel I/O) are probably going to be machine-wide anyways. I'm not quite sure what it would look like wrapped around an object_store. Would you just use buffered and give it a really large count (e.g. instead of num_cpus something like MAX_SCHEDULED_TASKS)