bikeshedder / deadpool

Dead simple pool implementation for rust with async-await
Apache License 2.0
1.08k stars 137 forks source link

Missing `Sync` somewhere along the chain of `Pool::get` #333

Closed samchouse closed 5 months ago

samchouse commented 5 months ago

db.get().await; causes problems with the future resulting in an error. I'm not sure exactly where the issue lies.

   Compiling deadpool v0.12.1 (/home/sam/deadpool)
   Compiling deadpool-postgres v0.14.0 (/home/sam/deadpool/postgres)
   Compiling adrastos_core v0.1.0 (/home/sam/Documents/projects/personal/adrastos/crates/core)
warning: unused import: `db::postgres::Database`
 --> crates/core/src/task_queue.rs:5:13
  |
5 | use crate::{db::postgres::Database, id::Id};
  |             ^^^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(unused_imports)]` on by default

warning: this function depends on never type fallback being `()`
   --> crates/core/src/auth/oauth2/mod.rs:129:5
    |
129 | /     pub async fn initialize_login(
130 | |         &self,
131 | |         provider: OAuth2Provider,
132 | |         redis_pool: &RedisPool,
133 | |     ) -> Result<(Url, CsrfToken), String> {
    | |_________________________________________^
    |
    = warning: this was previously accepted by the compiler but is being phased out; it will become a hard error in a future release!
    = note: for more information, see issue #123748 <https://github.com/rust-lang/rust/issues/123748>
    = help: specify the types explicitly
    = note: `#[warn(dependency_on_unit_never_type_fallback)]` on by default

warning: unused variable: `id`
  --> crates/core/src/task_queue.rs:30:22
   |
30 |                 for (id, task) in queue.write().await.0.iter() {
   |                      ^^ help: if this is intentional, prefix it with an underscore: `_id`
   |
   = note: `#[warn(unused_variables)]` on by default

warning: unused variable: `task`
  --> crates/core/src/task_queue.rs:30:26
   |
30 |                 for (id, task) in queue.write().await.0.iter() {
   |                          ^^^^ help: if this is intentional, prefix it with an underscore: `_task`

error[E0277]: `dyn Future<Output = Result<(), deadpool::managed::hooks::HookError<tokio_postgres::Error>>> + Send` cannot be shared between threads safely
    --> crates/core/src/task_queue.rs:49:9
     |
49   | /         Box::pin(async move {
50   | |             db.get().await;
51   | |         })
     | |__________^ `dyn Future<Output = Result<(), deadpool::managed::hooks::HookError<tokio_postgres::Error>>> + Send` cannot be shared between threads safely
     |
     = help: the trait `std::marker::Sync` is not implemented for `dyn Future<Output = Result<(), deadpool::managed::hooks::HookError<tokio_postgres::Error>>> + Send`, which is required by `{async block@crates/core/src/task_queue.rs:49:18: 51:10}: std::marker::Sync`
     = note: required for `Unique<dyn Future<Output = Result<(), deadpool::managed::hooks::HookError<tokio_postgres::Error>>> + Send>` to implement `std::marker::Sync`
note: required because it appears within the type `Box<dyn Future<Output = Result<(), deadpool::managed::hooks::HookError<tokio_postgres::Error>>> + Send>`
    --> /home/sam/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:236:12
     |
236  | pub struct Box<
     |            ^^^
note: required because it appears within the type `Pin<Box<dyn Future<Output = Result<(), deadpool::managed::hooks::HookError<tokio_postgres::Error>>> + Send>>`
    --> /home/sam/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/pin.rs:1090:12
     |
1090 | pub struct Pin<Ptr> {
     |            ^^^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/src/managed/hooks.rs:124:42
     |
124  |       ) -> Result<(), HookError<M::Error>> {
     |  __________________________________________^
125  | |         for hook in &self.vec {
126  | |             match hook {
127  | |                 Hook::Fn(f) => f(&mut inner.obj, &inner.metrics)?,
...    |
131  | |         Ok(())
132  | |     }
     | |_____^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/src/managed/mod.rs:383:62
     |
383  |       ) -> Result<Option<ObjectInner<M>>, PoolError<M::Error>> {
     |  ______________________________________________________________^
384  | |         let mut unready_obj = UnreadyObject {
385  | |             inner: Some(inner_obj),
386  | |             pool: &self.inner,
...    |
420  | |         Ok(Some(unready_obj.ready()))
421  | |     }
     | |_____^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/src/managed/mod.rs:321:92
     |
321  |       pub async fn timeout_get(&self, timeouts: &Timeouts) -> Result<W, PoolError<M::Error>> {
     |  ____________________________________________________________________________________________^
322  | |         let _ = self.inner.users.fetch_add(1, Ordering::Relaxed);
323  | |         let users_guard = DropGuard(|| {
324  | |             let _ = self.inner.users.fetch_sub(1, Ordering::Relaxed);
...    |
375  | |         .into())
376  | |     }
     | |_____^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/src/managed/mod.rs:311:63
     |
311  |       pub async fn get(&self) -> Result<W, PoolError<M::Error>> {
     |  _______________________________________________________________^
312  | |         self.timeout_get(&self.timeouts()).await
313  | |     }
     | |_____^
note: required because it's used within this `async` block
    --> crates/core/src/task_queue.rs:49:18
     |
49   |           Box::pin(async move {
     |  __________________^
50   | |             db.get().await;
51   | |         })
     | |_________^
     = note: required for the cast from `Pin<Box<{async block@crates/core/src/task_queue.rs:49:18: 51:10}>>` to `Pin<Box<dyn Future<Output = ()> + Send + std::marker::Sync>>`
     = note: the full name for the type has been written to '/home/sam/Documents/projects/personal/adrastos/target/debug/deps/adrastos_core-43f5e88ae6228d23.long-type-18325601849322697777.txt'
     = note: consider using `--verbose` to print the full type name to the console
     = note: the full name for the type has been written to '/home/sam/Documents/projects/personal/adrastos/target/debug/deps/adrastos_core-43f5e88ae6228d23.long-type-13642881050076820353.txt'
     = note: consider using `--verbose` to print the full type name to the console
     = note: the full name for the type has been written to '/home/sam/Documents/projects/personal/adrastos/target/debug/deps/adrastos_core-43f5e88ae6228d23.long-type-6699085670344616249.txt'
     = note: consider using `--verbose` to print the full type name to the console

error[E0277]: `dyn Future<Output = Result<(tokio_postgres::Client, tokio::task::JoinHandle<()>), tokio_postgres::Error>> + Send` cannot be shared between threads safely
    --> crates/core/src/task_queue.rs:49:9
     |
49   | /         Box::pin(async move {
50   | |             db.get().await;
51   | |         })
     | |__________^ `dyn Future<Output = Result<(tokio_postgres::Client, tokio::task::JoinHandle<()>), tokio_postgres::Error>> + Send` cannot be shared between threads safely
     |
     = help: the trait `std::marker::Sync` is not implemented for `dyn Future<Output = Result<(tokio_postgres::Client, tokio::task::JoinHandle<()>), tokio_postgres::Error>> + Send`, which is required by `{async block@crates/core/src/task_queue.rs:49:18: 51:10}: std::marker::Sync`
     = note: required for `Unique<dyn Future<Output = Result<(tokio_postgres::Client, tokio::task::JoinHandle<()>), tokio_postgres::Error>> + Send>` to implement `std::marker::Sync`
note: required because it appears within the type `Box<dyn Future<Output = Result<(tokio_postgres::Client, tokio::task::JoinHandle<()>), tokio_postgres::Error>> + Send>`
    --> /home/sam/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:236:12
     |
236  | pub struct Box<
     |            ^^^
note: required because it appears within the type `Pin<Box<dyn Future<Output = Result<(tokio_postgres::Client, tokio::task::JoinHandle<()>), tokio_postgres::Error>> + Send>>`
    --> /home/sam/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/pin.rs:1090:12
     |
1090 | pub struct Pin<Ptr> {
     |            ^^^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/postgres/src/lib.rs:149:60
     |
149  |       async fn create(&self) -> Result<ClientWrapper, Error> {
     |  ____________________________________________________________^
150  | |         let (client, conn_task) = self.connect.connect(&self.pg_config).await?;
151  | |         let client_wrapper = ClientWrapper::new(client, conn_task);
152  | |         self.statement_caches
153  | |             .attach(&client_wrapper.statement_cache);
154  | |         Ok(client_wrapper)
155  | |     }
     | |_____^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/src/managed/mod.rs:653:30
     |
653  |   ) -> Result<O, PoolError<E>> {
     |  ______________________________^
654  | |     match (runtime, duration) {
655  | |         (_, None) => future.await.map_err(Into::into),
656  | |         (Some(runtime), Some(duration)) => runtime
...    |
662  | |     }
663  | | }
     | |_^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/src/managed/mod.rs:427:62
     |
427  |       ) -> Result<Option<ObjectInner<M>>, PoolError<M::Error>> {
     |  ______________________________________________________________^
428  | |         let mut unready_obj = UnreadyObject {
429  | |             inner: Some(ObjectInner {
430  | |                 obj: apply_timeout(
...    |
455  | |         Ok(Some(unready_obj.ready()))
456  | |     }
     | |_____^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/src/managed/mod.rs:321:92
     |
321  |       pub async fn timeout_get(&self, timeouts: &Timeouts) -> Result<W, PoolError<M::Error>> {
     |  ____________________________________________________________________________________________^
322  | |         let _ = self.inner.users.fetch_add(1, Ordering::Relaxed);
323  | |         let users_guard = DropGuard(|| {
324  | |             let _ = self.inner.users.fetch_sub(1, Ordering::Relaxed);
...    |
375  | |         .into())
376  | |     }
     | |_____^
note: required because it's used within this `async` fn body
    --> /home/sam/deadpool/src/managed/mod.rs:311:63
     |
311  |       pub async fn get(&self) -> Result<W, PoolError<M::Error>> {
     |  _______________________________________________________________^
312  | |         self.timeout_get(&self.timeouts()).await
313  | |     }
     | |_____^
note: required because it's used within this `async` block
    --> crates/core/src/task_queue.rs:49:18
     |
49   |           Box::pin(async move {
     |  __________________^
50   | |             db.get().await;
51   | |         })
     | |_________^
     = note: required for the cast from `Pin<Box<{async block@crates/core/src/task_queue.rs:49:18: 51:10}>>` to `Pin<Box<dyn Future<Output = ()> + Send + std::marker::Sync>>`
     = note: the full name for the type has been written to '/home/sam/Documents/projects/personal/adrastos/target/debug/deps/adrastos_core-43f5e88ae6228d23.long-type-18325601849322697777.txt'
     = note: consider using `--verbose` to print the full type name to the console
     = note: the full name for the type has been written to '/home/sam/Documents/projects/personal/adrastos/target/debug/deps/adrastos_core-43f5e88ae6228d23.long-type-13642881050076820353.txt'
     = note: consider using `--verbose` to print the full type name to the console
     = note: the full name for the type has been written to '/home/sam/Documents/projects/personal/adrastos/target/debug/deps/adrastos_core-43f5e88ae6228d23.long-type-11138257683375059716.txt'
     = note: consider using `--verbose` to print the full type name to the console
     = note: the full name for the type has been written to '/home/sam/Documents/projects/personal/adrastos/target/debug/deps/adrastos_core-43f5e88ae6228d23.long-type-18437699527042099929.txt'
     = note: consider using `--verbose` to print the full type name to the console

For more information about this error, try `rustc --explain E0277`.
warning: `adrastos_core` (lib) generated 4 warnings
error: could not compile `adrastos_core` (lib) due to 2 previous errors; 4 warnings emitted
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};

use tokio::{sync::RwLock, time};

use crate::{db::postgres::Database, id::Id};

type TaskFuture = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;

pub struct TaskQueue(HashMap<String, Box<dyn Fn(String) -> TaskFuture + Send + Sync>>);

impl TaskQueue {
    pub fn new() -> Self {
        Self(HashMap::new())
    }

    pub fn add_task<T: Fn(String) -> TaskFuture + Send + Sync + 'static>(&mut self, task: T) {
        self.0.insert(Id::new().to_string(), Box::new(task));
    }

    // pub fn clear_task(&mut self, id: Id) {
    //     self.0.remove(&id);
    // }

    pub fn run(queue: Arc<RwLock<Self>>) -> tokio::task::JoinHandle<()> {
        tokio::spawn(async move {
            let mut interval = time::interval(tokio::time::Duration::from_millis(500));

            loop {
                interval.tick().await;
                for (id, task) in queue.write().await.0.iter() {
                    // task(id.clone());
                }
            }
        })
    }
}

impl Default for TaskQueue {
    fn default() -> Self {
        Self::new()
    }
}

fn a(db: deadpool_postgres::Pool) {
    let mut a = TaskQueue::new();
    a.add_task(move |id| {
        println!("{id}");
        let db = db.clone();
        Box::pin(async move {
            db.get().await;
        })
    });
}
bikeshedder commented 5 months ago

Could you provide a minimal reproduction example. A PR for that in form of a unit test would be highly appreciated!

samchouse commented 5 months ago

Done #334

samchouse commented 5 months ago

Actually I realized that I had something like this setup in Actix and the only difference between our types is this:

- Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
+ Pin<Box<dyn Future<Output = ()>>>;

I guess Send + Sync is unnecessary in this case however I'm not sure if this is still an issue with the library or the bounds are unnecessary in every case.

bikeshedder commented 5 months ago

The future returned by Pool::get() is + Send but not + Sync.

I do wonder why you need it to be + Sync? Futures can't be awaited or polled from multiple threads so I wonder why you need it to be + Sync in the first place?

After removing the + Sync markers the code compiles fine:

-    type ClosureFuture = Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync>>;
-    async fn test_closure<T: Fn(String) -> ClosureFuture + Send + Sync + 'static>(_: T) {
+    type ClosureFuture = Pin<Box<dyn std::future::Future<Output = ()> + Send>>;
+    async fn test_closure<T: Fn(String) -> ClosureFuture + Send + 'static>(_: T) {
samchouse commented 5 months ago

I'm not very good with async in Rust so I was just slapping Send + Sync everywhere. Since this is the case, everything seems to be fine with the lib and I'll close this issue. Sorry for the clutter!

bikeshedder commented 5 months ago

Adding Sync to a Future doesn't add anything. Quoting @Darksonn from the #tokio-users channel:

Generally it's nice for libraries to provide Sync futures as it avoids useless errors, but it's also a mistake for any library to require Sync.

Source: https://discord.com/channels/500028886025895936/500336333500448798/1252281055881334784

If you end up with some code that requires the future to be Sync you can put it inside a sync wrapper: https://github.com/tokio-rs/tokio/blob/master/tokio/src/util/sync_wrapper.rs

I'd be willing to change the deadpool crate so it returns Sync futures to satisfy code that uses futures wrong. A change like that must not change the hook and manager API though as I don't want to require those futures to be Sync.

Unless this becomes a major show stopper somewhere I won't be looking into that though. PRs welcome!

samchouse commented 5 months ago

Thanks for all the reference it's super helpful! This issue is caused by me, not another library requiring the trait, so I was able to just remove the Sync trait from that future.