steffengy / tiberius

TDS 7.4 (mssql / Microsoft SQL Server) async driver for rust. Fork at: https://github.com/prisma/tiberius
Apache License 2.0
150 stars 2 forks source link

Unable to send BoxableIo safely between threads #103

Open nevi-me opened 5 years ago

nevi-me commented 5 years ago

I have an application that uses tiberius and tower-grpc (with tower-hyper). Before tower-grpc was updated to 0.1, I used to use wait() as a work-around on my connection like below:

/// Create connection result, waiting for the future to complete
fn connect(
) -> Result<SqlConnection<Box<dyn BoxableIo>>Error> {
    let conn_str = "server=tcp:localhost,1433;integratedSecurity=false;".to_owned();

    SqlConnection::connect(conn_str.as_str()).wait()
}

Ever since the upgrade, wait()ing blocks indefinitely, so I'm forced to remove it. However, my code now can't compile because *const tiberius::query::QueryStream<Box<dyn tiberius::BoxableIo>> cannot be sent safely between threads.

`*const tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>` cannot be sent between threads safely
within `futures::future::and_then::AndThen<futures::future::and_then::AndThen<std::boxed::Box<dyn futures::future::Future<Item = tiberius::SqlConnection<std::boxed::Box<dyn tiberius::BoxableIo>>, Error = tiberius::Error> + std::marker::Send>, futures::future::map::Map<futures_state_stream::Collect<futures_state_stream::Map<tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>, [closure@src\database\mssql.rs:66:18: 66:83]>>, [closure@src\database\mssql.rs:68:18: 68:59]>, [closure@src\database\mssql.rs:61:19: 69:10 value:_]>, std::result::Result<std::option::Option<std::string::String>, tiberius::Error>, [closure@src\database\mssql.rs:71:19: 71:54]>`, the trait `std::marker::Send` is not implemented for `*const tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>`
required because it appears within the type `std::marker::PhantomData<*const tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>`
required because it appears within the type `tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>`
required because it appears within the type `tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>`
required because it appears within the type `futures_state_stream::Map<tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>, [closure@src\database\mssql.rs:66:18: 66:83]>`
required because it appears within the type `futures_state_stream::Collect<futures_state_stream::Map<tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>, [closure@src\database\mssql.rs:66:18: 66:83]>>`
required because it appears within the type `futures::future::map::Map<futures_state_stream::Collect<futures_state_stream::Map<tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>, [closure@src\database\mssql.rs:66:18: 66:83]>>, [closure@src\database\mssql.rs:68:18: 68:59]>`
required because it appears within the type `futures::future::chain::Chain<std::boxed::Box<dyn futures::future::Future<Item = tiberius::SqlConnection<std::boxed::Box<dyn tiberius::BoxableIo>>, Error = tiberius::Error> + std::marker::Send>, futures::future::map::Map<futures_state_stream::Collect<futures_state_stream::Map<tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>, [closure@src\database\mssql.rs:66:18: 66:83]>>, [closure@src\database\mssql.rs:68:18: 68:59]>, [closure@src\database\mssql.rs:61:19: 69:10 value:_]>`
required because it appears within the type `futures::future::and_then::AndThen<std::boxed::Box<dyn futures::future::Future<Item = tiberius::SqlConnection<std::boxed::Box<dyn tiberius::BoxableIo>>, Error = tiberius::Error> + std::marker::Send>, futures::future::map::Map<futures_state_stream::Collect<futures_state_stream::Map<tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>, [closure@src\database\mssql.rs:66:18: 66:83]>>, [closure@src\database\mssql.rs:68:18: 68:59]>, [closure@src\database\mssql.rs:61:19: 69:10 value:_]>`
required because it appears within the type `futures::future::chain::Chain<futures::future::and_then::AndThen<std::boxed::Box<dyn futures::future::Future<Item = tiberius::SqlConnection<std::boxed::Box<dyn tiberius::BoxableIo>>, Error = tiberius::Error> + std::marker::Send>, futures::future::map::Map<futures_state_stream::Collect<futures_state_stream::Map<tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>, [closure@src\database\mssql.rs:66:18: 66:83]>>, [closure@src\database\mssql.rs:68:18: 68:59]>, [closure@src\database\mssql.rs:61:19: 69:10 value:_]>, futures::future::result_::FutureResult<std::option::Option<std::string::String>, tiberius::Error>, [closure@src\database\mssql.rs:71:19: 71:54]>`
required because it appears within the type `futures::future::and_then::AndThen<futures::future::and_then::AndThen<std::boxed::Box<dyn futures::future::Future<Item = tiberius::SqlConnection<std::boxed::Box<dyn tiberius::BoxableIo>>, Error = tiberius::Error> + std::marker::Send>, futures::future::map::Map<futures_state_stream::Collect<futures_state_stream::Map<tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>, [closure@src\database\mssql.rs:66:18: 66:83]>>, [closure@src\database\mssql.rs:68:18: 68:59]>, [closure@src\database\mssql.rs:61:19: 69:10 value:_]>, std::result::Result<std::option::Option<std::string::String>, tiberius::Error>, [closure@src\database\mssql.rs:71:19: 71:54]>`
required for the cast to the object type `dyn futures::future::Future<Item = std::option::Option<std::string::String>, Error = tiberius::Error> + std::marker::Send`

I create my connection per the below, then any query that I try to create, results in this error.

/// Create connection result, waiting for the future to complete
fn connect(
) -> Box<dyn Future<Item = SqlConnection<Box<dyn BoxableIo>>, Error = Error> + Send> {
    let conn_str = "server=tcp:localhost,1433;integratedSecurity=false;".to_owned();

    SqlConnection::connect(conn_str.as_str())
}

// This query won't compile,
pub fn run_query(
    value: String,
) -> Box<dyn Future<Item = Option<String>, Error = Error> + Send + 'static> {
    let fut = connect()
        .and_then(|conn| {
            conn.query(
                "INSERT INTO table (value) OUTPUT INSERTED.* values (@P1)",
                &[&value.as_str()],
            )
            .map(move |row: QueryRow| (row.get::<&str, &str>("value")).to_string())
            .collect()
            .map(move |(a, _)| a.get(0).map(|r| r.clone())) // get value, lose connection
        })
        .and_then(|result: Option<String>| Ok(result));
    Box::new(fut) // this fails to compile
}

Making it return impl Future<> instead of Box<dyn Future<> + Send> only pushes the error further down to the eventual consumer.

Do you know what I'm doing wrong? BoxableIo itself supports Send, so the issues seems to be with QueryStream. Thanks

agalakhov commented 4 years ago

This error is caused by line 73 of stmt.rs which includes PhantomData with a pointer. Replacing PhantomData<*const R> with PhantomData<R> resolves the issue.