pacman82 / odbc-api

ODBC (Open Database Connectivity) bindings for Rust.
MIT License
120 stars 23 forks source link

Async query execution in spawned task #276

Open lanklaas opened 1 year ago

lanklaas commented 1 year ago

Hello,

I am trying to use the execute polling method on a connection, but the compiler keeps saying that the connection is not send, even though I promoted it to send.

This is the error

error: future cannot be sent between threads safely
   --> src/main.rs:17:18
    |
17  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `odbc_api::Connection<'_>`, the trait `std::marker::Sync` is not implemented for `*mut odbc_api::odbc_sys::Dbc`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/connection.rs:149:85
    |
142 |           &self,
    |           ----- has type `&odbc_api::Connection<'_>` which is not `Send`
...
146 |       ) -> Result<Option<CursorPolling<StatementImpl<'_>>>, Error> {
    |  __________________________________________________________________-
147 | |         let query = SqlText::new(query);
148 | |         let lazy_statement = move || self.allocate_statement();
149 | |         execute_with_parameters_polling(lazy_statement, Some(&query), params, sleep).await
    | |                                                                                     ^^^^^^ await occurs here, with `&self` maybe used later
150 | |     }
    | |_____- `&self` is later dropped here

Some testing code that produces it. I used the example for promote_to_send and made it async.

use lazy_static::lazy_static;
use odbc_api::Environment;
use std::{thread, time::Duration};
lazy_static! {
    static ref ENV: Environment = unsafe { Environment::new().unwrap() };
}

#[tokio::main]
async fn main() {
    const MSSQL: &str = "Driver={ODBC Driver 17 for SQL Server};\
    Server=localhost;\
    UID=SA;\
    PWD=My@Test@Password1;\
";
    let conn = ENV.connect_with_connection_string("MSSQL").unwrap();
    let conn = unsafe { conn.promote_to_send() };
    let handle = tokio::task::spawn(async move {
        let exec = conn.execute_polling("SELECT 1", (), || {
            tokio::time::sleep(Duration::from_secs(1))
        });
        if let Some(cursor) = exec.await.unwrap() {
            // dbg!(cursor);
        }
        // if let Some(cursor) = conn.execute("SELECT 1", ()).unwrap() {
        //     // dbg!(cursor);
        // }
    });
    handle.await;
}

const CREATE: &str = r#"SELECT 1"#;

Cargo.toml

[package]
name = "spark-odbc"
version = "0.1.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
lazy_static = "1.4.0"
# odbc="*"
# odbc-safe="*"
# threadpool = "1.8.1"
odbc-api="*"
tokio = { version = "1.21.2", features = ["macros", "rt-multi-thread", "time"] }

Rust info:

rustc 1.64.0 (a55dd71d5 2022-09-19)
stable-x86_64-unknown-linux-gnu

Is there a way around this or should I go back to the sync API?

pacman82 commented 1 year ago

Hi @lanklaas , thanks for giving the async API a spin. It comes with a bunch of caveats, but in this case it's just something in the error message you have seemed to miss.

help: within odbc_api::Connection<'_>, the trait std::marker::Sync

So, your closure is not Send because the Connection is not Sync. Connections could never be sync unless they are wrapped in a Mutex. Why does it need to be Sync? Well:

future is not Send as this value is used across an await

This means in a multithreaded async runtime another thread might pick up the task. There is little I can think of right now, to make this more convinient (at least within a zero cost abstraction), because ODBC has this annoying property of maintaining mutable state for error messages.

However, I feel for your code might benefit from allocating statement handles explicitly. Check out PreparedPolling. See: https://docs.rs/odbc-api/latest/odbc_api/struct.PreallocatedPolling.html#method.execute

This might even get rid of the unsafe code completly. Would need to try later to know for sure though.

Cheers, Markus

lanklaas commented 1 year ago

Thanks for the quick response. I tried to put it in a Mutex as well as a Arc, but I still got the issue. Should the mutex work here or will it never work across an await?

Updated code:

use lazy_static::lazy_static;
use odbc_api::Environment;
use std::{
    sync::{Arc, Mutex},
    thread,
    time::Duration,
};
lazy_static! {
    static ref ENV: Environment = unsafe { Environment::new().unwrap() };
}

#[tokio::main]
async fn main() {
    const MSSQL: &str = "Driver={ODBC Driver 17 for SQL Server};\
    Server=localhost;\
    UID=SA;\
    PWD=My@Test@Password1;\
";
    let conn = ENV.connect_with_connection_string("MSSQL").unwrap();
    let conn = Arc::new(Mutex::new(unsafe { conn.promote_to_send() }));
    let conn = Arc::clone(&conn);
    let handle = tokio::task::spawn(async move {
        let conn = Arc::clone(&conn);
        let lock = conn.lock().unwrap();
        let exec = lock.execute_polling("SELECT 1", (), || {
            tokio::time::sleep(Duration::from_secs(1))
        });
        if let Some(cursor) = exec.await.unwrap() {
            // dbg!(cursor);
        }
        // if let Some(cursor) = conn.execute("SELECT 1", ()).unwrap() {
        //     // dbg!(cursor);
        // }
    });
    handle.await;
}

The full error:

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl std::future::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, odbc_api::force_send_sync::Send<odbc_api::Connection<'_>>>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:28:35
    |
24  |         let lock = conn.lock().unwrap();
    |             ---- has type `std::sync::MutexGuard<'_, odbc_api::force_send_sync::Send<odbc_api::Connection<'_>>>` which is not `Send`
...
28  |         if let Some(cursor) = exec.await.unwrap() {
    |                                   ^^^^^^ await occurs here, with `lock` maybe used later
...
34  |     });
    |     - `lock` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `odbc_api::Connection<'_>`, the trait `std::marker::Sync` is not implemented for `*mut odbc_api::odbc_sys::Dbc`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/connection.rs:149:85
    |
142 |           &self,
    |           ----- has type `&odbc_api::Connection<'_>` which is not `Send`
...
146 |       ) -> Result<Option<CursorPolling<StatementImpl<'_>>>, Error> {
    |  __________________________________________________________________-
147 | |         let query = SqlText::new(query);
148 | |         let lazy_statement = move || self.allocate_statement();
149 | |         execute_with_parameters_polling(lazy_statement, Some(&query), params, sleep).await
    | |                                                                                     ^^^^^^ await occurs here, with `&self` maybe used later
150 | |     }
    | |_____- `&self` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `odbc_api::Connection<'_>`, the trait `std::marker::Sync` is not implemented for `*mut odbc_api::odbc_sys::Env`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/connection.rs:149:85
    |
142 |           &self,
    |           ----- has type `&odbc_api::Connection<'_>` which is not `Send`
...
146 |       ) -> Result<Option<CursorPolling<StatementImpl<'_>>>, Error> {
    |  __________________________________________________________________-
147 | |         let query = SqlText::new(query);
148 | |         let lazy_statement = move || self.allocate_statement();
149 | |         execute_with_parameters_polling(lazy_statement, Some(&query), params, sleep).await
    | |                                                                                     ^^^^^^ await occurs here, with `&self` maybe used later
150 | |     }
    | |_____- `&self` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl std::future::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `*mut odbc_api::odbc_sys::Stmt`
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl std::future::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `*mut std::ffi::c_void`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/execute.rs:159:70
    |
153 |           while let Some(blob_ptr) = stmt.param_data().into_result(&stmt)? {
    |                          -------- has type `*mut std::ffi::c_void` which is not `Send`
...
159 |                   wait_for(|| stmt.put_binary_batch(batch), &mut sleep)
    |  ______________________________________________________________________^
160 | |                     .await
    | |__________________________^ await occurs here, with `blob_ptr` maybe used later
...
163 |           }
    |           - `blob_ptr` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl std::future::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `*mut &mut dyn odbc_api::parameter::Blob`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/execute.rs:159:70
    |
155 |               let blob_ptr: *mut &mut dyn Blob = transmute(blob_ptr);
    |                   -------- has type `*mut &mut dyn odbc_api::parameter::Blob` which is not `Send`
...
159 |                   wait_for(|| stmt.put_binary_batch(batch), &mut sleep)
    |  ______________________________________________________________________^
160 | |                     .await
    | |__________________________^ await occurs here, with `blob_ptr` maybe used later
...
163 |           }
    |           - `blob_ptr` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn odbc_api::parameter::Blob`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/execute.rs:159:70
    |
156 |               let blob_ref = &mut *blob_ptr;
    |                                   --------- has type `&mut dyn odbc_api::parameter::Blob` which is not `Send`
...
159 |                   wait_for(|| stmt.put_binary_batch(batch), &mut sleep)
    |  ______________________________________________________________________^
160 | |                     .await
    | |__________________________^ await occurs here, with `*blob_ptr` maybe used later
...
163 |           }
    |           - `*blob_ptr` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `odbc_api::handles::StatementRef<'_>`, the trait `std::marker::Sync` is not implemented for `*mut odbc_api::odbc_sys::Stmt`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/execute.rs:159:70
    |
153 |            while let Some(blob_ptr) = stmt.param_data().into_result(&stmt)? {
    |            -                                                        ----- has type `&odbc_api::handles::StatementRef<'_>` which is not `Send`
    |   _________|
    |  |
154 |  |             // The safe interfaces currently exclusively bind pointers to `Blob` trait objects
155 |  |             let blob_ptr: *mut &mut dyn Blob = transmute(blob_ptr);
156 |  |             let blob_ref = &mut *blob_ptr;
...    |
159 |  |                 wait_for(|| stmt.put_binary_batch(batch), &mut sleep)
    |  |______________________________________________________________________^
160 | ||                     .await
    | ||__________________________^ await occurs here, with `&stmt` maybe used later
161 |  |                     .into_result(&stmt)?;
162 |  |             }
163 |  |         }
    |  |_________- `&stmt` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`
pacman82 commented 1 year ago

May I ask why you spawn a task with tokio::task::spawn?

lanklaas commented 1 year ago

It is the closest I could get the example to the actual code. I have a lib crate that compiles fine with the async API, but when I use it in my bin crate which spawns tasks for multiple kafka listeners I get this issue

pacman82 commented 1 year ago

Sorry for the confusion with the Mutex. It won't help you in this situation, because it is not only the Connection itself which needs to be Sync. The futures emitted by this crate are not Sync which is what tokio::spawn requires, plain and simple in its signature. Could they be Sync? Honestly I do not know yet, need to think about this. At least I feel I won't have a quick fix for this. If you want to move forward with your crate I would advice to stick around with the synchronous API for now.

lanklaas commented 1 year ago

Ok thanks for the help! Will use sync for now. I will leave the ticket open, but you can close it if you like

pacman82 commented 1 year ago

Thank you too. Especially for the minimal reproducing example. I'll leave it open until it's either working or I understand precisly why it can not.

pacman82 commented 1 month ago

A little update. This works today using tokio::task::spawn_local instead of tokio::spawn. No mutex around the connection required.

I am also currently evaluating the safety of making statements Send, which should make this work with just plain tokio::spawn.

Best, Markus