jake-low / osmx-rs

Rust port of OSMExpress, a fast database file format for OpenStreetMap
Apache License 2.0
13 stars 0 forks source link

Add `par_iter()` method #1

Open jake-low opened 5 months ago

jake-low commented 5 months ago

Based on my tests, it should be possible to speed up full table scans in OSMX by about 3x using parallel iteration. But more work should be done to validate this result in different environments before committing to add the feature. Also, lmdb-rs's interface makes it challenging to implement a par_iter() method, and I'm not sure how to overcome that hurdle. Lots more details below.


Here is a program which reads all nodes from an OSMX database sequentially, and finds the one with the most tags on it.

/// Example program which finds the node with the most tags and prints its ID
/// and tag count.
///
/// Usage: most_tags_seq OSMX_FILE
use std::error::Error;
use std::path::PathBuf;

fn main() -> Result<(), Box<dyn Error>> {
    let args: Vec<String> = std::env::args().collect();
    let file_path = PathBuf::from(&args[1]);

    let db = osmx::Database::open(&file_path)?;
    let txn = osmx::Transaction::begin(&db)?;

    let nodes = txn.nodes()?;

    let mut best_count = 0;
    let mut best_id: Option<u64> = None;

    for (id, node) in nodes.iter() {
        let count = node.tags().count();
        if count > best_count {
            best_count = count;
            best_id = Some(id);
        }
    }

    println!("best node: {} with {} tags", best_id.unwrap(), best_count);
    Ok(())
}

This program scans the nodes table on a single thread. Under the hood, each time you call next() on the iterator returned by nodes.iter(), the underlying LMDB cursor is advanced to the next item in the table.

Here is a different program which does the same thing, but with optional parallelism. It does not use osmx-rs, instead it uses lmdb-rs directly.

/// Example program which finds the node with the most tags and prints its ID
/// and tag count. Like most_tags_seq, but uses multiple threads to read faster.
///
/// Usage: most_tags_par NUM_THREADS OSMX_FILE
use std::error::Error;
use std::path::PathBuf;
use std::thread;

use lmdb::{Cursor, Transaction};

fn main() -> Result<(), Box<dyn Error>> {
    let args: Vec<String> = std::env::args().collect();

    let num_threads = str::parse::<usize>(args[1].as_str())?;
    let file_path = PathBuf::from(&args[2]);

    let env = lmdb::Environment::new()
        .set_flags(
            lmdb::EnvironmentFlags::NO_SUB_DIR
                | lmdb::EnvironmentFlags::NO_READAHEAD
                | lmdb::EnvironmentFlags::NO_SYNC,
        )
        .set_max_dbs(10)
        .set_map_size(50 * 1024 * 1024 * 1024) // 50 GiB
        .open(file_path.as_ref())?;

    let nodes = env.open_db(Some("nodes"))?;

    let highest_node_id: u64 = get_last_key(&env, nodes);
    let nodes_per_thread = f64::ceil(highest_node_id as f64 / num_threads as f64) as u64;
    eprintln!("highest node id: {}", highest_node_id);

    thread::scope(|s| {
        let env = &env;
        let mut handles: Vec<thread::ScopedJoinHandle<(u64, usize)>> = vec![];

        for start in (0..highest_node_id).step_by(nodes_per_thread as usize) {
            let end = start + nodes_per_thread;
            eprintln!("starting thread for {}..{}", start, end);
            let h = s.spawn(move || do_scan(env, nodes, start, end).unwrap());
            handles.push(h);
        }

        let (best_id, best_count) = handles
            .into_iter()
            .map(|h| h.join().unwrap())
            .reduce(|a, b| std::cmp::max_by(a, b, |a, b| a.1.cmp(&b.1).then(b.0.cmp(&a.0))))
            .unwrap();
        println!("best node: {} with {} tags", best_id, best_count);
    });

    Ok(())
}

fn do_scan(
    env: &lmdb::Environment,
    db: lmdb::Database,
    start: u64,
    end: u64,
) -> Result<(u64, usize), Box<dyn Error>> {
    let txn = env.begin_ro_txn()?;
    let mut cursor = txn.open_ro_cursor(db)?;

    let mut best_count = 0;
    let mut best_id: Option<u64> = None;

    for (raw_key, raw_val) in cursor.iter_from(start.to_ne_bytes()) {
        let id = u64::from_le_bytes(raw_key.try_into().expect("key with incorrect length"));
        if id >= end {
            break;
        }

        let node = osmx::Node::try_from(raw_val).ok().unwrap();

        let count = node.tags().count();
        if count > best_count {
            best_count = count;
            best_id = Some(id);
        }
    }

    Ok((best_id.unwrap(), best_count))
}

pub fn get_last_key(env: &lmdb::Environment, db: lmdb::Database) -> u64 {
    let txn = env.begin_ro_txn().unwrap();
    let cursor = txn.open_ro_cursor(db).unwrap();
    let (raw_key, _) = cursor.get(None, None, lmdb_sys::MDB_LAST).unwrap();
    let id = u64::from_le_bytes(raw_key.unwrap().try_into().unwrap());
    id
}

The second program is a bit longer because it doesn't use all of the abstractions provided byosmx-rs, but how it works is basically the same, with the only difference being the parallelism. Instead of scanning the nodes table on one thread, we get the last (highest) node ID in the table, then divide the range from 0 to that node ID into sub-ranges, and spawn a thread to scan each sub range. This assumes that IDs in the table are distributed pretty uniformly between 0 and the max ID, which for OSM nodes is a reasonable assumption.

On my laptop (2018 MacBook Pro), the parallel version is significantly faster. Here are the results I get using hyperfine:

$ hyperfine --warmup 3 -N "target/release/examples/most_tags_"{seq,par 1,par 2, par 4, par 8, par 16,par 32}" ../washington.osmx"
Benchmark 1: target/release/examples/most_tags_seq ../washington.osmx
  Time (mean ± σ):     605.8 ms ±  23.6 ms    [User: 547.1 ms, System: 54.9 ms]
  Range (min … max):   583.0 ms … 666.4 ms    10 runs

Benchmark 2: target/release/examples/most_tags_par 1 ../washington.osmx
  Time (mean ± σ):     583.3 ms ±  30.5 ms    [User: 521.4 ms, System: 58.3 ms]
  Range (min … max):   554.8 ms … 638.8 ms    10 runs

Benchmark 3: target/release/examples/most_tags_par 2 ../washington.osmx
  Time (mean ± σ):     355.6 ms ±   5.4 ms    [User: 508.2 ms, System: 63.7 ms]
  Range (min … max):   350.6 ms … 367.7 ms    10 runs

Benchmark 4: target/release/examples/most_tags_par 4 ../washington.osmx
  Time (mean ± σ):     265.4 ms ±   8.2 ms    [User: 525.6 ms, System: 68.7 ms]
  Range (min … max):   258.3 ms … 284.7 ms    10 runs

Benchmark 5: target/release/examples/most_tags_par 8 ../washington.osmx
  Time (mean ± σ):     192.1 ms ±  30.9 ms    [User: 734.4 ms, System: 90.8 ms]
  Range (min … max):   160.1 ms … 281.1 ms    18 runs

Benchmark 6: target/release/examples/most_tags_par 16 ../washington.osmx
  Time (mean ± σ):     176.2 ms ±   8.4 ms    [User: 822.2 ms, System: 96.1 ms]
  Range (min … max):   165.6 ms … 191.5 ms    17 runs

Benchmark 7: target/release/examples/most_tags_par 32 ../washington.osmx
  Time (mean ± σ):     172.6 ms ±   6.7 ms    [User: 891.3 ms, System: 105.3 ms]
  Range (min … max):   164.0 ms … 186.5 ms    17 runs

Summary
  target/release/examples/most_tags_par 32 ../washington.osmx ran
    1.02 ± 0.06 times faster than target/release/examples/most_tags_par 16 ../washington.osmx
    1.11 ± 0.18 times faster than target/release/examples/most_tags_par 8 ../washington.osmx
    1.54 ± 0.08 times faster than target/release/examples/most_tags_par 4 ../washington.osmx
    2.06 ± 0.09 times faster than target/release/examples/most_tags_par 2 ../washington.osmx
    3.38 ± 0.22 times faster than target/release/examples/most_tags_par 1 ../washington.osmx
    3.51 ± 0.19 times faster than target/release/examples/most_tags_seq ../washington.osmx

Note that most_tags_par with just one thread is slightly faster than most_tags_seq, because the latter uses osmx::Database which opens all ten database tables, while the former only opens the nodes table.

The more important takeaway is that using more than one thread significantly speeds up scanning the table, though the gains plateau pretty quickly (16 threads is only a hair faster than 8, and 32 is no faster than 16 once you account for the error bars).

This might be surprising to you; it surprised me initially. LMDB is a memory-mapped database; scanning a table is equivalent to reading a contiguous range of the file from disk. By using multiple threads, we are (somewhat) randomizing the access pattern for blocks in the underlying file (since thread 3 might read a block halfway through the file right after thread 0 reads the first block in the file). Aren't random reads slower than sequential reads, even on an SSD?

It turns out (as best I understand it) that SSDs support some parallelism at the hardware level. The controller maintains a queue of pending I/O operations, and can make progress on reading several of the queued operations at once. So having several threads read in parallel means the OS can keep the SSD busier, and thereby increase total throughput.

This raises other questions for me that I don't have the answers to. Top among them is: why doesn't the OS queue multiple I/O requests for successive blocks when reading sequentially, if this would apparently increase throughput? The above tests were conducted using LMDB's NO_READAHEAD option, which the docs say this about:

MDB_NODRAHEAD: Turn off readahead. Most operating systems perform readahead on read requests by default. This option turns it off if the OS supports it. Turning it off may help random read performance when the DB is larger than RAM and system RAM is full.

So without this option, it sounds like the OS should be fetching several successive blocks of the file whenever there is a page fault. Wouldn't this have the same effect that our parallel threads do, i.e. utilizing the SSD's queue more and therefore increasing total throughput?

I tried removing the NO_READAHEAD option both in the example program and in the osmx-rs source. It had no measurable effect on single-threaded sequential scan performance. Maybe the results would be different on another operating system?


Another issue is that there is a possible bug in the parallel example program above. Each thread creates its own read transaction, which I believe means they aren't guaranteed to all see a consistent view of the data if some other process is writing at the same time. This is a common use case for OSMX databases, where you might have a cron job to periodically download changes from the OSM Replication API and use them to update your local dataset in place.

LMDB's Transactions use thread-local state by default, which means they cannot be moved to a different thread than they were created on.[^1] In Rust, this translates to Transactions being !Send and !Sync.

[^1]: It's possible to pass the MDB_NOTLS option when creating an environment, which makes it safe to pass read-only transactions between threads (though if two threads are using the same transaction in parallel, you still need to use a mutex to synchronize access to the transaction's methods). But the lmdb-rs crate doesn't provide types that can be used with this option; even if you open the Environment with the NOTLS option, transactions are still !Send, preventing them from being moved from one thread to another.

The LMDB C API supports nested transactions, which I think are intended (among other things) to solve this problem of wanting to have multiple parallel readers see a consistent snapshot of the data. I haven't tested this, but I believe to do this you open the environment and create a root transaction and then pass a pointer to both the environment and the root transaction to each worker thread. The worker thread isn't allowed to use that transaction directly, but it is allowed to create a new nested transaction by calling mdb_txn_begin and passing in the env pointer and the root txn pointer, obtaining a new transaction which is safe to use (since it was created on the current thread) and sees the same snapshot of the data that the parent transaction sees. The program must not use the parent transaction while it has open child transactions, and must not close the parent transaction until all nested transactions are finished.

However, the lmdb-rs crate does not seem to support this.

  1. It exposes a begin_nested_txn() method only on RwTransaction but not on RoTransaction.
  2. You can't call begin_nested_txn() on a thread other than the one that owns the transaction, since transactions are !Send. So it's impossible to obtain a nested transaction that's bound to a different thread than its parent transaction.

One solution I investigated to address this was porting osmx-rs to use lmdb-zero which seems to offer a more complex but also more flexible API. But I'm not sure it would solve this particular issue, since lmdb-zero's transactions are still !Send. Attempting to port osmx-rs to use lmdb-zero was also challenging since the lifetimes of various types are different. In particular lmdb-zero uses an Accessor object to read and write values, and the buffers it returns are bound to the lifetime of the Accessor rather than the Transaction. I think this helps make read-write transactions safe since you can't create a writer if there are outstanding readers, but it made porting osmx-rs's get() and iter() methods hard.

Another option I'm now considering is implementing osmx-rs directly on top of the lmdb-sys crate which provides low level, unsafe access to the LMDB C API. I think this would solve the problems I've run into with lifetimes and thread safety annotations, but it would make correctly implementing the library internals more challenging and somewhat more verbose too.


That's about as far as I've gone with this investigation for now. If you're interested in this use case for osmx-rs, or if you have tips on how to solve the problems above, feel free to post a comment.

jake-low commented 5 months ago

The benchmark programs above are available in the experiments/parallel-read branch, under the examples/ directory.