launchbadge / sqlx

🧰 The Rust SQL Toolkit. An async, pure Rust SQL crate featuring compile-time checked queries without a DSL. Supports PostgreSQL, MySQL, and SQLite.
Apache License 2.0
13.1k stars 1.24k forks source link

DB ERROR: pool timed out while waiting for an open connection in thread #3241

Open benjamingb opened 4 months ago

benjamingb commented 4 months ago

Bug Description

When I try to make a query within a thread I get the following message DB ERROR: pool timed out while waiting for an open connection, it doesn't matter if I increase the waiting time or the connection pool it always does the same

Minimal Reproduction

pub fn process_tracker_entities(
    db_pool: DbPool,
    rx_db: std::sync::mpsc::Receiver<TestMsg>,
) -> UseCaseResult<()> {
    thread::Builder::new().spawn(move || -> UseCaseResult<()> {
        let rt = runtime::Builder::new_current_thread()
            .enable_all()
            .build()?;

        rt.block_on(async move {
            let od_tracker_db_model = OdTracksDbModel::init(&db_pool);
            // test conections
            if let Err(err) = &od_tracker_db_model.find(1).await {
                eprintln!("Error in database: {}", err);
            }

            for fwo in rx_db.iter() {}

            Ok(())
        })
    })?;

    Ok(())
}

This thread is invoked from another thread that is in an actor


    spawn(async move {
                let mut actor_state = actor_state.lock().await;
                let result = run_video_processing_engine(&ctx_pool, msg.0).await;
                if let Err(err) = result {
                    println!("Error  engine: {}", err);
                    actor_addr.do_send(StopProcessing);
                }
                actor_state.is_running = false;
            });

This in turn invokes the other thread

pub async fn run_video_processing_engine<'a>(
    db_pool: &'a DbPool,
    stream_entity_id: StreamEntityId,
) -> UseCaseResult<()> {
    let (tx_capture, rx_capture) = new_capture_channel();
    let (tx_db, rx_db) = sync_channel::<TestMsg>(1);

 // code
    process_tracker_entities(db_pool.clone(), rx_db)?;

// code

    Ok(())
}

Here I create my connection

#[actix_web::main]
async fn main() -> std::io::Result<()> {

    let ctx_pool = DBConnection::create_pool()
        .await
        .expect("Failed to connect to Postgres.");

    let engine_actor = EngineActor::new(ctx_pool.clone()).start();

    HttpServer::new(move || {
       // .. code
    })
    .bind(("127.0.0.1", 3383))?
    .run()
    .await
}

This is how I define my connection

pub struct DBConnection;
impl DBConnection {
    pub async fn create_pool() -> InfraResult<Pool<Postgres>> {
        let config = DBConfig::new();
        let config_clone = config.clone();

        let max_connections = if cfg!(test) {
            5
        } else {
            config.pool_max_connections
        };

        PgPoolOptions::new()
            .after_connect(move |conn, _| {
                let query = format!("SET search_path = '{}';", &config.search_path.as_str());

                Box::pin(async move {
                    conn.execute(&*query).await?;
                    Ok(())
                })
            })
            .acquire_timeout(std::time::Duration::from_secs(config.timeout))
            .max_connections(max_connections)
            .connect_with(Self::db_options(&config_clone))
            .await
            .map_err(InfraError::from)
    }

    fn db_options(config: &DBConfig) -> PgConnectOptions {
        let ssl_mode = match config.require_ssl {
            true => PgSslMode::Require,
            _ => PgSslMode::Prefer,
        };

        PgConnectOptions::new()
            .host(&config.host)
            .username(&config.user)
            .password(&config.pass)
            .port(config.port)
            .ssl_mode(ssl_mode)
            .database(&config.db_name)
    }
}

Info

benjamingb commented 4 months ago

I solved it by following this guide. https://github.com/launchbadge/sqlx/discussions/3232

Is that the correct way to do it when working with threads? Why does the PgPoolOptions configuration not work like deadpool does?

maxcountryman commented 3 months ago

This seems like a bug. I recently started seeing this error after bumping sqlx and otherwise cannot identify any code changes that seem possibly related.

juliuskreutz commented 2 months ago

I can reproduce this issue (sometimes) by having two threads that use the same connection at the same time. Even when one thread finishes the other one times out after the configured 30 seconds, even though the connection should be free again.

turingbuilder commented 2 months ago

We have also encountered this issue with DB ERROR: pool timed out while waiting for an open connection while using:

sqlx = { version = "=0.7.4", features = [ "runtime-tokio", "postgres", "chrono" ] }

Eventually we found the discussion at: https://github.com/launchbadge/sqlx/discussions/3232

And based on the comment from @maxcountryman, gave pinning to 0.7.3 a try. Our dependency now looks like:

sqlx = { version = "=0.7.3", features = [ "runtime-tokio", "postgres", "chrono" ] }

After some testing we have not been able to reproduce the issue with 0.7.3. Either the issue really is not present in 0.7.3 or 0.73 causes the problem to happen with much less frequency with our workload.

ssaavedra commented 1 month ago

This is also happening to me in 0.7.4 (and not on 0.7.3) when using sqlx with sqlite and accessing it from a Rocket API. My API receives 1 call per second, and after 1~2 minutes the database pool times out as well. Downgrading to =0.7.3 fixes this issue for me at the moment. Could not test 0.8.0 yet because a version of rocket_db_pools has not been released yet that supports it (without conflicting sqlite3 link versions).

efrain2007 commented 3 weeks ago

@benjamingb How did you solve it with actix web?