diesel-rs / diesel

A safe, extensible ORM and Query Builder for Rust
https://diesel.rs
Apache License 2.0
12.67k stars 1.06k forks source link

R2d2 connection pool may hang on droped packages #3016

Closed weiznich closed 2 years ago

weiznich commented 2 years ago

Setup

Versions

Feature Flags

Problem Description

Cargo.toml

[package]
name = "crates-io-hang-reproduction"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
diesel = { version = "1.4.8", features = ["r2d2", "postgres"] }
dotenv = "0.15.0"
parking_lot = "0.11.2"
thiserror = "1.0.30"
url = "2.2.2"

main.rs:

use std::time::{Instant, Duration};

use diesel::RunQueryDsl;

#[macro_use]
extern crate diesel;

pub mod db {
    use diesel::prelude::*;
    use diesel::r2d2::{self, ConnectionManager, CustomizeConnection};
    use parking_lot::{ReentrantMutex, ReentrantMutexGuard};
    use std::sync::Arc;
    use std::{ops::Deref, time::Duration};
    use thiserror::Error;
    use url::Url;

    #[derive(Clone)]
    pub enum DieselPool {
        Pool {
            pool: r2d2::Pool<ConnectionManager<PgConnection>>,
        },
        Test(Arc<ReentrantMutex<PgConnection>>),
    }

    impl DieselPool {
        pub(crate) fn new(
            url: &str,
            config: r2d2::Builder<ConnectionManager<PgConnection>>,
        ) -> Result<DieselPool, PoolError> {
            let manager = ConnectionManager::new(connection_url(url));

            // For crates.io we want the behavior of creating a database pool to be slightly different
            // than the defaults of R2D2: the library's build() method assumes its consumers always
            // need a database connection to operate, so it blocks creating a pool until a minimum
            // number of connections is available.
            //
            // crates.io can actually operate in a limited capacity without a database connections,
            // especially by serving download requests to our users. Because of that we don't want to
            // block indefinitely waiting for a connection: we instead need to wait for a bit (to avoid
            // serving errors for the first connections until the pool is initialized) and if we can't
            // establish any connection continue booting up the application. The database pool will
            // automatically be marked as unhealthy and the rest of the application will adapt.
            let pool = DieselPool::Pool {
                pool: config.build_unchecked(manager),
            };
            match pool.wait_until_healthy(Duration::from_secs(5)) {
                Ok(()) => {}
                Err(PoolError::UnhealthyPool) => {}
                Err(err) => return Err(err),
            }

            Ok(pool)
        }

        pub(crate) fn new_test(url: &str) -> DieselPool {
            let conn = PgConnection::establish(&connection_url(url))
                .expect("failed to establish connection");
            conn.begin_test_transaction()
                .expect("failed to begin test transaction");
            DieselPool::Test(Arc::new(ReentrantMutex::new(conn)))
        }

        pub fn get(&self) -> Result<DieselPooledConn<'_>, PoolError> {
            match self {
                DieselPool::Pool { pool } => {
                    if let Some(conn) = pool.try_get() {
                        Ok(DieselPooledConn::Pool(conn))
                    } else if !self.is_healthy() {
                        Err(PoolError::UnhealthyPool)
                    } else {
                        Ok(DieselPooledConn::Pool(pool.get()?))
                    }
                }
                DieselPool::Test(conn) => Ok(DieselPooledConn::Test(conn.lock())),
            }
        }

        pub fn state(&self) -> PoolState {
            match self {
                DieselPool::Pool { pool, .. } => {
                    let state = pool.state();
                    PoolState {
                        connections: state.connections,
                        idle_connections: state.idle_connections,
                    }
                }
                DieselPool::Test(_) => PoolState {
                    connections: 0,
                    idle_connections: 0,
                },
            }
        }

        pub fn wait_until_healthy(&self, timeout: Duration) -> Result<(), PoolError> {
            match self {
                DieselPool::Pool { pool, .. } => match pool.get_timeout(timeout) {
                    Ok(_) => Ok(()),
                    Err(_) if !self.is_healthy() => Err(PoolError::UnhealthyPool),
                    Err(err) => Err(PoolError::R2D2(err)),
                },
                DieselPool::Test(_) => Ok(()),
            }
        }

        fn is_healthy(&self) -> bool {
            self.state().connections > 0
        }
    }

    #[derive(Debug, Copy, Clone)]
    pub struct PoolState {
        pub connections: u32,
        pub idle_connections: u32,
    }

    pub enum DieselPooledConn<'a> {
        Pool(r2d2::PooledConnection<ConnectionManager<PgConnection>>),
        Test(ReentrantMutexGuard<'a, PgConnection>),
    }

    impl Deref for DieselPooledConn<'_> {
        type Target = PgConnection;

        fn deref(&self) -> &Self::Target {
            match self {
                DieselPooledConn::Pool(conn) => conn.deref(),
                DieselPooledConn::Test(conn) => conn.deref(),
            }
        }
    }

    pub fn connect_now() -> ConnectionResult<PgConnection> {
        let url = connection_url(&std::env::var("DATABASE_URL").unwrap());
        PgConnection::establish(&url)
    }

    pub fn connection_url(url: &str) -> String {
        let mut url = Url::parse(url).expect("Invalid database URL");
        if dotenv::var("HEROKU").is_ok() && !url.query_pairs().any(|(k, _)| k == "sslmode") {
            url.query_pairs_mut().append_pair("sslmode", "require");
        }
        url.into()
    }

    #[derive(Debug, Clone, Copy)]
    pub struct ConnectionConfig {
        pub statement_timeout: u64,
        pub read_only: bool,
    }

    impl CustomizeConnection<PgConnection, r2d2::Error> for ConnectionConfig {
        fn on_acquire(&self, conn: &mut PgConnection) -> Result<(), r2d2::Error> {
            use diesel::sql_query;

            sql_query(format!(
                "SET statement_timeout = {}",
                self.statement_timeout * 1000
            ))
            .execute(conn)
            .map_err(r2d2::Error::QueryError)?;
            if self.read_only {
                sql_query("SET default_transaction_read_only = 't'")
                    .execute(conn)
                    .map_err(r2d2::Error::QueryError)?;
            }
            Ok(())
        }
    }

    #[cfg(test)]
    pub(crate) fn test_conn() -> PgConnection {
        let conn = PgConnection::establish(&crate::env("TEST_DATABASE_URL")).unwrap();
        conn.begin_test_transaction().unwrap();
        conn
    }

    #[derive(Debug, Error)]
    pub enum PoolError {
        #[error(transparent)]
        R2D2(#[from] r2d2::PoolError),
        #[error("unhealthy database pool")]
        UnhealthyPool,
    }
}

table! {
    users {
        id -> Integer,
        name -> Text,
    }
}

fn main() {
    let db_url = std::env::var("DATABASE_URL").unwrap();
    let pool = self::db::DieselPool::new(&db_url, Default::default()).unwrap();
    loop {
        println!("Start loop");
        let loop_start = Instant::now();
        let conn = pool.get().unwrap();
        println!("Took {} ms to get connection", loop_start.elapsed().as_millis());

        let _: (i32, String) = users::table.first(&*conn).unwrap();
        println!("Sleep 1s");
        std::thread::sleep(Duration::from_secs(1));
        println!("Finish sleep");
    }
}

Using the following reproduction steps produces an application freeze:

  1. DATABASE_URL=… cargo run
  2. sudo iptables -A OUTPUT -p tcp --dport 5432 -j DROP

What is the expected output?

I expect a panic due to the unwrap in line 199.

What is the actual output?

The output stops + the application seems to hang

Steps to reproduce

Using the following reproduction steps produces an application freeze:

  1. DATABASE_URL=… cargo run
  2. sudo iptables -A OUTPUT -p tcp --dport 5432 -j ACCEPT

Checklist

weiznich commented 2 years ago

This affected crates.io so I've marked this as high priority. cc @pietroalbini

There is relevant discussion at https://gitter.im/diesel-rs/diesel?at=61ddafba9b470f38975fb9a2

As far as already know the issue seems to be that libpq does not finish our connection test query here: https://github.com/diesel-rs/diesel/blob/09f8bcd78e3fb634ed69c66f46695b821a5b3822/diesel/src/r2d2.rs#L90

I'm really unsure how and where we should fix this. Libpq does not seem to offer any useful option for timeouts here (at least not without using their async interface, which would require rewriting the whole postgres connection implementation). Possible ideas are:

  1. Enforce the timeout in r2d2. That would likely require a separate thread for the connection check, which would slow down things drastically. Also this would raise the question what should happen with a stuck thread there, as there is no real way to cancel an os thread.
  2. Find a better way to do the connection health check in diesel itself. Unsure how that would look like.
weiznich commented 2 years ago

Using the tcp_user_timeout libpq url parameter seems to work for the reproduction given above:

DATABASE_URL="postgres://localhost/diesel_test?tcp_user_timeout=2000" cargo run + dropping the packages via iptables while the test application is running causes a panic as expected.

Edit: Turns out that was an false positive due to other changes. Unfortunately those make the pool not detect broken connection at all. The described behaviour happened on the first use :disappointed:

weiznich commented 2 years ago

Relevant r2d2 issue: https://github.com/sfackler/r2d2/issues/116

weiznich commented 2 years ago
  1. Enforce the timeout in r2d2. That would likely require a separate thread for the connection check, which would slow down things drastically. Also this would raise the question what should happen with a stuck thread there, as there is no real way to cancel an os thread.

I've tried this and it seems like it does not work as we are not able to get the connection into the thread at all. is_valid gets a &mut PgConnection but using std::thread::spawn requires 'static to be able to access values.

Seems like there is probably no easy way to solve this at all.

pietroalbini commented 2 years ago

Thanks for looking into this! I will probably have more time to spend on investigating this either tomorrow or Friday.

weiznich commented 2 years ago

I've opened #3017 with an potential fix, but I'm not to happy about that change.

pietroalbini commented 2 years ago

Smaller program reproducing this bug:

use diesel::pg::PgConnection;
use diesel::prelude::*;

const CONNECTION_URL: &str = "postgres://pietro:pietro@localhost/cratesio";

fn main() {
    let _ = packet_loss(false);
    if std::env::args().any(|arg| arg == "reset") {
        return;
    }

    let conn = PgConnection::establish(CONNECTION_URL).unwrap();

    assert!(example_query(&conn));
    packet_loss(true).unwrap();
    assert!(!example_query(&conn));
    packet_loss(false).unwrap();
    assert!(example_query(&conn));
}

fn example_query(conn: &PgConnection) -> bool {
    println!("running example query...");
    diesel::sql_query("SELECT 1;").execute(conn).is_ok()
}

fn packet_loss(enable: bool) -> Result<(), ()> {
    std::process::Command::new("iptables")
        .arg(if enable { "-A" } else { "-D" })
        .args(&["OUTPUT", "-p", "tcp", "-m", "tcp", "--dport", "5432", "-j", "DROP"])
        .status()
        .map_err(|_| ())?;
    println!("packet loss: {enable:?}");
    Ok(())
}

You can run it on a Linux system with root privileges:

sudo -E ~/.cargo/bin/cargo run

Note that the program will modify the firewall rules automatically. They will reset once the computer reboots, but you can also reset them manually by running:

sudo -E ~/.cargo/bin/cargo run reset
weiznich commented 2 years ago

@pietroalbini Thanks for the minimized example. I think I've already found a potential solution with #3017. Can you provide a few information about the needs of the crates.io team here:

pietroalbini commented 2 years ago

I'll check with the rest of the team and get back to you in the coming days!

cbeck88 commented 2 years ago

Just want to comment, that we are also users of diesel r2d2 postgres, and we are also affected by this issue! would love to have a fix upstream

weiznich commented 2 years ago

@garbageslam Please don't comment on issues if you do not have anything other to say that this affects you as well and that you would like to see someone else fixing it for you.

weiznich commented 2 years ago

I had another look at this this week. Let me shortly summarize the results here:

Given those facts I would prefer to just use the features libpq provides here as that seems to work.

cc @pietroalbini

jgreat commented 2 years ago

Sorry to jump in, but I believe we have been experiencing this problem with the Azure PostgreSQL service. We see the PG server side drop TCP sessions and our system goes into the TCP retry loop, which on a fast network can take up to 15 minutes to complete with default linux kernel settings. We were surprised that our apps were blocking on that timeout before diesel/r2d2 timeouts were taking affect.

Setting TCP_USER_TIMEOUT to 200ms seems really aggressive, but that might be an example vs a recommendation. The TCP_USER_TIMEOUT will take precedence over other TCP timeouts like keepalive and TCP retry settings. TCP retries start a just over 200ms on a low latency network, so if you've just lost a packet or two the system wouldn't have time to recover. If you set this to be really aggressive, and your db connection went idle, you would probably end up killing the connection before a reasonable keepalive period would pass. If you're pooling I think that would cause the connections to flap and might exhaust TCP connection limits on the server side. At very least you would end up paying the overhead of establishing the TCP/TLS session a lot more often.

If the workaround is to "fail fast" at the TCP layer, we are accomplishing that by reducing the number of TCP retries from the default of 15 to 3 or 4 by setting net.ipv4.tcp_retries2=4 at the system level. This isn't ideal since its a system wide setting that requires elevated privileges. More annoying if you're using containers since it need to be set in each container.

Here's a good article on linux tcp retries and how the timeout works.

Our wish is that the diesel/r2d2 wouldn't be blocked by the network IO and would move on to retry on a healthy connection after a timeout period so we wouldn't have to do system level changes to reduce the impact of a less than ideal network connection.

In any case, thanks for working on this! :+1:

weiznich commented 2 years ago

Thanks for your comment here.

We were surprised that our apps were blocking on that timeout before diesel/r2d2 timeouts were taking affect.

Diesel and r2d2 are two different crates. r2d2 already has an issue for this: https://github.com/sfackler/r2d2/issues/116. The short answer is: They cannot do much there, as there is no way to stop in the middle of executing stuff.

Setting TCP_USER_TIMEOUT to 200ms seems really aggressive, but that might be an example vs a recommendation. The TCP_USER_TIMEOUT will take precedence over other TCP timeouts like keepalive and TCP retry settings. TCP retries start a just over 200ms on a low latency network, so if you've just lost a packet or two the system wouldn't have time to recover. If you set this to be really aggressive, and your db connection went idle, you would probably end up killing the connection before a reasonable keepalive period would pass. If you're pooling I think that would cause the connections to flap and might exhaust TCP connection limits on the server side. At very least you would end up paying the overhead of establishing the TCP/TLS session a lot more often.

If the workaround is to "fail fast" at the TCP layer, we are accomplishing that by reducing the number of TCP retries from the default of 15 to 3 or 4 by setting net.ipv4.tcp_retries2=4 at the system level. This isn't ideal since its a system wide setting that requires elevated privileges. More annoying if you're using containers since it need to be set in each container.

Here's a good article on linux tcp retries and how the timeout works.

To be clear: tcp_user_timeout=200 is just an example. In the end you need to adjust that value to your needs, not just because I wrote that value works in my tests. I've chosen 200, just because I did not want to wait that long 🤷 . I've also verified now that this works just fine with larger value, even without changing something at the operating system level.

Our wish is that the diesel/r2d2 wouldn't be blocked by the network IO and would move on to retry on a healthy connection after a timeout period so we wouldn't have to do system level changes to reduce the impact of a less than ideal network connection.

To be clear here: There is really not much we can do here. You cannot just abort operations at operating system level, so if something is blocking it just blocks till it's done. That's how these things are designed. If you don't want that you need to use an async approach. For diesel that would be diesel-async. That allow you to control such things in detail. And to be clear here: There is exactly one reason I haven't closed that issue as "won't fix", works as designed yet. That's because the crates.io folks do require a solution with sync diesel here for technical reasons.

pietroalbini commented 2 years ago

So long story short setting DATABASE_URL="postgres://localhost/diesel_test?tcp_user_timeout=200" works for me, so not sure what went wrong with that earlier on.

I played around with tcp_user_timeout, and indeed it allows to configure the overall timeout to whatever value we find acceptable in production. That sounds great, if you want to close the issue please do!

By the way, I also remember trying tcp_user_timeout back then in my investigation, and I also apparently did something wrong when I tested it. Somehow reassuring I wasn't alone in trying it wrong :sweat_smile: