launchbadge / sqlx

🧰 The Rust SQL Toolkit. An async, pure Rust SQL crate featuring compile-time checked queries without a DSL. Supports PostgreSQL, MySQL, and SQLite.
Apache License 2.0
13.51k stars 1.28k forks source link

Consider making `PoolOptions::__fair` public/not-hidden #2166

Open birkmose opened 2 years ago

birkmose commented 2 years ago

Is your feature request related to a problem? Please describe. Currently, PoolOptions::__fair is hidden in the docs, and according to the comments in the code:

"Currently only exposed for benchmarking; fair = true seems to be the superior option in most cases."

However, using sqlx on configurations without work-stealing - the first-come-first-serve order can be detrimental to throughput. In my use case (sqlx on actix-web), I observed a large increase in throughput (3x) during heavy load with a large number of concurrent clients, when fair is set to false.

Describe the solution you'd like For some workloads fair = true, is actually better if throughput is more important than fairness. It would be nice if the fair option could be made part of the public API.

Additional context I tried to create an example that illustrates this issue - the example is a bit "extreme/worst-case scenario", but it illustrated my point (I hope). The idea is basically to have two single-threaded tokio runtimes, doing a large number of concurrent requests on each runtime. One runtime is extremely CPU-bound (to illustrate a non-uniform workload), and the other runtime is purely I/O bound.

With fair=true we can observe how the runtime that has an extremely CPU-bound workload, will starve the purely I/O-bound runtime:

Number of symmetric queries: 102
Number of asymmetric queries: 100

With fair=false we observe a large increase in throughput (in real-life scenarios the difference is of course a lot less, but I did observe a 3x increase in throughput for my workload):

Number of symmetric queries: 38735
Number of asymmetric queries: 100

The example code:

use std::{
    sync::atomic::AtomicU32,
    thread::{self, JoinHandle},
    time::{Duration, Instant},
};

use futures::{stream, StreamExt};
use sqlx::{
    postgres::{PgConnectOptions, PgPoolOptions},
    Pool, Postgres,
};
use tokio::time::sleep;

enum Load {
    Symmetric,
    Asymmetric,
}

static COUNT_SYMMETRIC: AtomicU32 = AtomicU32::new(0);
static COUNT_ASYMMETRIC: AtomicU32 = AtomicU32::new(0);

async fn query(pool: Pool<Postgres>, load_type: Load) {
    let mut conn = pool.acquire().await.unwrap();
    let _ = sqlx::query!("SELECT 1 as foo")
        .fetch_one(&mut conn)
        .await
        .unwrap();

    match load_type {
        Load::Symmetric => COUNT_SYMMETRIC.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
        Load::Asymmetric => COUNT_ASYMMETRIC.fetch_add(1, std::sync::atomic::Ordering::Relaxed),
    };
}

fn spawn_runtime(pool: Pool<Postgres>, load_type: Load) -> JoinHandle<()> {
    thread::spawn(move || {
        let runtime = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap();

        match load_type {
            Load::Symmetric => runtime.block_on(symmetric_load(pool)),
            Load::Asymmetric => runtime.block_on(asymmetric_load(pool)),
        }
    })
}

async fn symmetric_load(pool: Pool<Postgres>) {
    stream::iter(0..100_000)
        .for_each_concurrent(50, |_| async {
            query(pool.clone(), Load::Symmetric).await;
        })
        .await;
}

async fn asymmetric_load(pool: Pool<Postgres>) {
    stream::iter(0..100_000)
        .for_each_concurrent(50, |_| async {
            query(pool.clone(), Load::Asymmetric).await;
            // Busy spin the CPU for 100msec - don't do this at home!
            let timer = Instant::now();
            while timer.elapsed().as_millis() < 100 {}
        })
        .await;
}

#[tokio::main]
async fn main() {
    let connect_options = PgConnectOptions::new();
    let pool_options = PgPoolOptions::new().__fair(true).max_connections(10); // Set fair to true/false
    let pool = pool_options.connect_with(connect_options).await.unwrap();

    let _ = spawn_runtime(pool.clone(), Load::Symmetric);
    let _ = spawn_runtime(pool, Load::Asymmetric);

    sleep(Duration::from_secs(10)).await;

    println!(
        "Number of symmetric queries: {}",
        COUNT_SYMMETRIC.load(std::sync::atomic::Ordering::Relaxed)
    );
    println!(
        "Number of asymmetric queries: {}",
        COUNT_ASYMMETRIC.load(std::sync::atomic::Ordering::Relaxed)
    );
}
abonander commented 2 years ago

The pool used to be unfair, and that had problems too: https://github.com/smol-rs/async-channel/issues/6

birkmose commented 2 years ago

Unfairness definitely comes with its own problems. The point I was making, was that for some setups/workloads optimizing for throughput is preferred. I think it makes a lot of sense to have the default setting for the pool as fair, but I feel it would still make sense to give consumers of the pool, the option to configure it for unfairness if it suits their needs? (I.e make that option public, documented, and not hidden).