sfackler / r2d2

A generic connection pool for Rust
Apache License 2.0
1.51k stars 82 forks source link

Pool is never dropped, so all connections are left open #22

Closed jonhoo closed 8 years ago

jonhoo commented 8 years ago

I'm having an issue with a long-running application that the connections made by r2d2 are not dropped after I drop the last reference to the pool. After digging through the code, I found that new_inner creates a second copy of the Arc here, which is then moved into the closure passed to read_connections. As far as I can tell, that closure is never dropped, which means the second reference to the Arc is never dropped, which in turn prevents the SharedPool the Arc wraps from being dropped. Since the SharedPool is never dropped, the workers are never stopped, and so the connections are all left open indefinitely.

With some debug statements inserted into r2d2 (patch below), I get the following output for my application:

new_inner::step s: 1, w: 0
new_inner::step s: 51, w: 0
pool worker starting
pool worker starting
pool worker starting
pool worker got job
...
pool worker got job
pool worker waiting
pool worker waiting
pool worker waiting
new_inner::step s: 1, w: 0
new_inner::step s: 2, w: 0
new pool (starts at s: 2, w: 0)
pool worker waiting
cloning pool (now s: 3, w: 0)
pool worker waiting
pool worker waiting
dropping instance of pool (was s: 3, w: 0)
cloning pool (now s: 3, w: 0)
cloning pool (now s: 4, w: 0)
dropping instance of pool (was s: 4, w: 0)
cloning pool (now s: 4, w: 0)
cloning pool (now s: 5, w: 0)
dropping instance of pool (was s: 5, w: 0)
cloning pool (now s: 5, w: 0)
cloning pool (now s: 6, w: 0)
dropping instance of pool (was s: 6, w: 0)
cloning pool (now s: 6, w: 0)
dropping instance of pool (was s: 6, w: 0)
dropping instance of pool (was s: 5, w: 0)
dropping instance of pool (was s: 4, w: 0)
dropping instance of pool (was s: 3, w: 0)
dropping instance of pool (was s: 2, w: 0)
all done -- sleeping...

Notice in particular how, by the time Pool::new returns, the Arc already has two strong references, and how, when my application drops its last reference (right before the "all done" line), there is still a leftover strong reference.

Debug patch:

diff --git a/src/lib.rs b/src/lib.rs
index a620344..6df3284 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -40,6 +40,7 @@
 //! ```
 #![warn(missing_docs)]
 #![doc(html_root_url="https://sfackler.github.io/r2d2/doc/v0.6.3")]
+#![feature(arc_counts)]

 #[macro_use]
 extern crate log;
@@ -175,10 +177,13 @@ struct SharedPool<M>
     thread_pool: ScheduledThreadPool,
 }

 impl<M> Drop for SharedPool<M> where M: ManageConnection
 {
     fn drop(&mut self) {
+        println!("dropping shared pool");
         self.thread_pool.clear();
+        println!("dropped shared pool");
     }
 }

@@ -258,14 +263,33 @@ fn reap_connections<M>(shared: &Arc<SharedPool<M>>)
 pub struct Pool<M: ManageConnection>(Arc<SharedPool<M>>);

 /// Returns a new `Pool` referencing the same state as `self`.
 impl<M> Clone for Pool<M> where M: ManageConnection
 {
     fn clone(&self) -> Pool<M> {
-        Pool(self.0.clone())
+        use std::sync;
+        let p = Pool(self.0.clone());
+        println!("cloning pool (now s: {}, w: {})",
+                 sync::Arc::strong_count(&self.0),
+                 sync::Arc::weak_count(&self.0));
+        p
     }
 }

+impl<M> Drop for Pool<M>
+    where M: ManageConnection
+{
+    fn drop(&mut self) {
+        use std::sync;
+        println!("dropping instance of pool (was s: {}, w: {})",
+                 sync::Arc::strong_count(&self.0),
+                 sync::Arc::weak_count(&self.0));
+    }
+}
+
+
 impl<M> fmt::Debug for Pool<M> where M: ManageConnection + fmt::Debug
 {
     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
         let inner = self.0.internals.lock().unwrap();
@@ -289,7 +314,15 @@ impl<M> Pool<M> where M: ManageConnection
     pub fn new(config: Config<M::Connection, M::Error>,
                manager: M)
                -> Result<Pool<M>, InitializationError> {
-        Pool::new_inner(config, manager, 30)
+        use std::sync;
+        let p = Pool::new_inner(config, manager, 30);
+        if p.is_ok() {
+            let p = p.as_ref().unwrap();
+            println!("new pool (starts at s: {}, w: {})",
+                     sync::Arc::strong_count(&p.0),
+                     sync::Arc::weak_count(&p.0));
+        }
+        p
     }

     // for testing
@@ -297,6 +330,7 @@ impl<M> Pool<M> where M: ManageConnection
                  manager: M,
                  reaper_rate: i64)
                  -> Result<Pool<M>, InitializationError> {
+        use std::sync;
         let internals = PoolInternals {
             conns: VecDeque::with_capacity(config.pool_size() as usize),
             num_conns: 0,
@@ -311,6 +345,10 @@ impl<M> Pool<M> where M: ManageConnection
             cond: Condvar::new(),
         });

+        println!("new_inner::step s: {}, w: {}",
+                 sync::Arc::strong_count(&shared),
+                 sync::Arc::weak_count(&shared));
+
         let initial_size = shared.config.min_idle().unwrap_or(shared.config.pool_size());
         {
             let mut inner = shared.internals.lock().unwrap();
@@ -320,6 +358,10 @@ impl<M> Pool<M> where M: ManageConnection
             drop(inner);
         }

+        println!("new_inner::step s: {}, w: {}",
+                 sync::Arc::strong_count(&shared),
+                 sync::Arc::weak_count(&shared));
+
         if shared.config.initialization_fail_fast() {
             let end = SteadyTime::now() + cvt(shared.config.connection_timeout());
             let mut internals = shared.internals.lock().unwrap();
@@ -336,12 +378,20 @@ impl<M> Pool<M> where M: ManageConnection
             }
         }

+        println!("new_inner::step s: {}, w: {}",
+                 sync::Arc::strong_count(&shared),
+                 sync::Arc::weak_count(&shared));
+
         if shared.config.max_lifetime().is_some() || shared.config.idle_timeout().is_some() {
             let s = shared.clone();
             shared.thread_pool
                   .run_at_fixed_rate(Duration::seconds(reaper_rate), move || reap_connections(&s));
         }

+        println!("new_inner::step s: {}, w: {}",
+                 sync::Arc::strong_count(&shared),
+                 sync::Arc::weak_count(&shared));
+
         Ok(Pool(shared))
     }

diff --git a/src/task.rs b/src/task.rs
index 51c5841..b99e9e1 100644
--- a/src/task.rs
+++ b/src/task.rs
@@ -75,8 +75,11 @@ pub struct ScheduledThreadPool {

 impl Drop for ScheduledThreadPool {
     fn drop(&mut self) {
+        println!("pool dropping");
         self.shared.inner.lock().unwrap().shutdown = true;
+        println!("pool notifying");
         self.shared.cvar.notify_all();
+        println!("pool notified");
     }
 }

@@ -165,12 +168,14 @@ impl Worker {
     }

     fn run(&mut self) {
+        println!("pool worker starting");
         loop {
             match self.get_job() {
                 Some(job) => self.run_job(job),
                 None => break,
             }
         }
+        println!("pool worker stopping");
     }

     fn get_job(&self) -> Option<Job> {
@@ -190,6 +195,8 @@ impl Worker {
                 Some(e) => Need::WaitTimeout(e.time - now),
             };

+            println!("pool worker waiting");
+
             inner = match need {
                 Need::Wait => self.shared.cvar.wait(inner).unwrap(),
                 Need::WaitTimeout(t) => {
@@ -206,6 +213,7 @@ impl Worker {
     }

     fn run_job(&self, job: Job) {
+        println!("pool worker got job");
         match job.type_ {
             JobType::Once(f) => f.invoke(()),
             JobType::FixedRate { mut f, rate } => {
sfackler commented 8 years ago

Hmm, yeah I can see why that would happen - r2d2 was designed under the assumption that the pool is going to exist for the lifetime of the process. Should be fixable, maybe via weak references in the background worker tasks or an explicit shutdown call in Pool's destructor.

jonhoo commented 8 years ago

Yeah, we only started running into this when running benchmarks; each benchmark constructs one instance of our application, and each instance starts its own pool (naturally, because it doesn't know about the other pools). One way to fix this would be to have SharedPool::Drop wait for all the workers to finish before returning. Then, read_connections could hold a *const (thus allowing the Arc to be dropped) while letting the workers keep using it all the way until they return.

I don't know if I feel sufficiently confident about the intricacies of the code to try and write a PR for this without some guidance. Any chance you'll be able to give it a whirl?

sfackler commented 8 years ago

Yep, I should be able to poke at it tonight.

jonhoo commented 8 years ago

@sfackler: great, thanks!

sfackler commented 8 years ago

Released v0.6.4