msupply-foundation / open-msupply

Open mSupply represents our most recent advancement in the Logistics Management Information System (LMIS), expanding on more than two decades of development inherited from the well-established legacy of the original mSupply.
https://msupply.foundation/open-msupply/
Other
20 stars 12 forks source link

Diesel async #78

Open andreievg opened 2 years ago

andreievg commented 2 years ago

Currently we use synchronous services and synchronous diesel operations, this would be problematic if we have more than number of core concurrent requests (they will start being blocking). Also tokio runtime is kind of tailored for 'tasks' running on thread vs one operation running in thread. Diesel is working on making official async library (but I don't think pool connection is implemented for it yet).

There is a way to run diesel operations somewhat asynchronously (with spawn_blocking, which puts the task on a worker thread pool). An example prototype of this was made, to show how we can use async now, with also minimum changes when we do change official diesel async.


type DBConnection = PgConnection;
type ConnectionPool = Pool<ConnectionManager<DBConnection>>;
type PooledConnection = r2d2::PooledConnection<ConnectionManager<DBConnection>>;

pub struct StorageConnection {
    pub connection: Arc<Mutex<PooledConnection>>,
}

#[derive(Debug)]
pub enum TransactionError<E> {
    FailedToStart(Error),
    FailedToCommit(Error),
    FailedToRollback(Error),
    Inner(E),
}

impl StorageConnection {
    pub async fn con<T, E, F>(&self, f: F) -> Result<T, E>
    where
        F: FnOnce(&PooledConnection) -> Result<T, E> + Send + 'static,
        T: Send + 'static,
        E: Send + 'static,
    {
        let connection = self.connection.clone();
        tokio::task::spawn_blocking(move || {
            let connection = connection.lock().unwrap();
            f(&connection)
        }
        .await
        .unwrap()
    }

    pub async fn transaction<'a, T, E, F, Fut>(&self, f: F) -> Result<T, TransactionError<E>>
    where
        F: FnOnce() -> Fut,
        Fut: Future<Output = Result<T, E>>,
    {
        { // Scope. here makes sure we release mutex guard
            let connection = self.connection.lock().unwrap();
            let transaction_manager = connection.transaction_manager();
            transaction_manager
                .begin_transaction(connection.deref())
                .map_err(|error| TransactionError::FailedToStart(error))?;
        }

        let f_result = f().await;

        {  // Scope. here makes sure we release mutex guard
            let connection: &PooledConnection = &self.connection.lock().unwrap();
            let transaction_manager = connection.transaction_manager();
            match f_result {
                Ok(value) => {
                    transaction_manager
                        .commit_transaction(connection)
                        .map_err(|error| TransactionError::FailedToCommit(error))?;
                    Ok(value)
                }
                Err(e) => {
                    transaction_manager
                        .rollback_transaction(connection)
                        .map_err(|error| TransactionError::FailedToRollback(error))?;
                    Err(TransactionError::Inner(e))
                }
            }
        }
    }
}

Use it this way


pub struct ServiceInput {
    a: String,
    b: bool,
}

#[async_trait::async_trait]
pub trait Service1Trait: Sync + Send {
    async fn do_service1(
        &self,
        connection: &StorageConnection,
        store_id: &str,
        input: ServiceInput,
    );
}

struct Service1;
#[async_trait::async_trait]
impl Service1Trait for Service1 {
    async fn do_service1(&self, s_con: &StorageConnection, store_id: &str, input: ServiceInput) {
        s_con
            .transaction(|| async {
                Repo::do_repo_things(s_con, store_id, input.a).await.unwrap();

                // Inner transaction
                s_con
                    .transaction(|| async {
                        s_con
                            .con(|connection| diesel::sql_query("select 1").execute(connection))
                            .await
                            .unwrap();

                        Ok(()) as Result<(), TransactionError<()>>
                    })
                    .await
                    .unwrap();

                Ok(()) as Result<(), TransactionError<()>>
            })
            .await
            .unwrap();
    }
}

pub struct Repo;

impl Repo {
    pub async fn do_repo_things(s_con: &StorageConnection, store_id: &str, b: String) -> QueryResult<usize> {
        s_con
            .con(|connection| diesel::sql_query("select 1").execute(connection))
            .await
    }
}

There is quite a lot of work to refactor existing code to be async:

It's probably 1 or 2 days of full of refactor work. But I think it needs to be done sooner then later

@clemens-msupply do you agree with the solution ?

clemens-msupply commented 2 years ago

Just an idea: there is a logging crate https://docs.rs/diesel_logger/latest/src/diesel_logger/lib.rs.html#25-29 which creates a custom connection. If we could use a similar custom connection to execute the query on the thread pool... Think we still need to wrap the underlying connection into a Mutex though (?).

Still if we want to use async diesel in the future we should still migrate the repos to make them async.

andreievg commented 2 years ago

Just an idea: there is a logging crate https://docs.rs/diesel_logger/latest/src/diesel_logger/lib.rs.html#25-29 which creates a custom connection. If we could use a similar custom connection to execute the query on the thread pool... Think we still need to wrap the underlying connection into a Mutex though (?).

Have you tried this, can it be sent between threads ?

I tried quite a lot of things, i can't remember them all, but this seemed like the only way to do it.

clemens-msupply commented 2 years ago

Relatively sure I wrapped the sqlite connection into a Mutex when playing with async repository methods to make them Send. I forgot the reason why this was needed though (Loaders?)...

andreievg commented 2 years ago

As per verbal discussion:

PooledConnection or underlying connection isn't send as far as I know, so can't really pass them around threads.

If we could use a similar custom connection to execute the query on the thread pool

Spawning (executing on thread pool) would need to happen within async method

Other things discussed:

I am pretty sure that in order to allow for transactions in async fashion (being able to start a transaction and then in a nested async method start inner transactions etc..), interface for StorageConnection would need to be as per above example

clemens-msupply commented 1 year ago

Two ideas:

#[on_io_thread_pool]
async fn repo_method(con: &mut StorageConnection) -> Result<> {
   diesel::sql_query("select 1").execute(con.connection)
} 
andreievg commented 1 year ago

Since diesels connections become mutable anyway we can use Rust Exclusive for wrapping the connection to avoid the Arc<Mutex<>>

Can you do an example with Exclusive please ? (same requirement, async repository methods and async service, with service taking reference to connection, and of course transactions + inner transactions with async closure)

Also, is there a reason why Exclusive is still considered unstable ? (are they just doing due diligence before merging, if so do you know at what stage of that they are on? ). And is there any other dangers of using nightly ?

clemens-msupply commented 1 year ago

I have a small working example here: https://github.com/openmsupply/open-msupply/compare/develop...2023-06-26-async-exclusive

Good news: Exclusive works fine Bad news: there are some other issues with lifetimes for the mut connection. I.e. when having an async closure callback in the transaction, Rust fails to see that the mut reference is only borrowed for the time of the async callback call and thus the connection can't be used anymore to finish the transaction.

As a workaround the callback returns the connection along with the result back to the caller. (comment out line 99 to see the original issue) However, this is not really nice. Arc might be slower and unneeded, but it would be nicer to use...

To avoid nightly features I just copied the code for Exclusive... couldn't find much of the status, there only seems to be some naming discussions going on.

clemens-msupply commented 2 months ago
* I dislike the extra closure for `StorageConnection::con()` a little bit. For this reason I was wondering if we could use a macro to annotate the repository methods instead? i.e. put the content of `con()` into the macro. For example,

This could be done like here: https://github.com/insou22/measure-time-demo/blob/main/src/lib.rs

For example,

#[on_io_thread_pool]
async fn repo_method(con: &Arc<StorageConnection>) -> Result<..> {
   diesel::sql_query("select 1").execute(con.connection)
} 

would expand to:

async fn repo_method(con: &Arc<StorageConnection>) -> Result<..> {
    let con = con.clone();
    tokio::task::spawn_blocking(|| {
       diesel::sql_query("select 1").execute(con.lock().connection()))
    }
     .await
} 
andreievg commented 2 months ago

@clemens-msupply, would that not still allow you to deadlock ?

I am a bit concerned about using macros together with diesel, was wondering what an error would look like (would it be obfuscated even more because it's within generated code ?).

The same argument could be applied to the macros we created for diesel filters, but I guess they just using templating vs manipulating TokenStream.

If closure is really cumbersome to look at, can do helper methods with similar types to execute and first etc, here is a base crate for the code in the first comment: checkingdieselasync.zip

And here is the diff implementing execute_sql and first_sql, also available as git patch: closure_helper.git.zip

diff --git a/src/main.rs b/src/main.rs
index 919ca09..5d99860 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -6,6 +6,8 @@ use std::{

 use diesel::{
     connection::{AnsiTransactionManager, TransactionManager},
+    dsl::Limit,
+    query_dsl::methods::{self, LoadQuery},
     r2d2::{self, ConnectionManager},
     result::Error,
     PgConnection, QueryResult, RunQueryDsl,
@@ -148,18 +150,38 @@ impl Repo {
         s_con: &StorageConnection,
         store_id: &str,
         b: String,
-    ) -> QueryResult<usize> {
-        // Cant move reference to async closure
-        let store_id = store_id.to_string();
-        s_con
-            .con(move |connection| {
-                let _ = format!("{} {}", store_id, b);
-                diesel::sql_query("select 1").execute(connection)
-            })
-            .await
+    ) -> QueryResult<Option<Check>> {
+        let query = diesel::sql_query("select 1");
+        let _ = execute_sql(s_con, query).await;
+
+        // Still have to translate to
+        let query = check::dsl::check.filter(check::dsl::id.eq(store_id.to_string()));
+        first_sql(s_con, query).await.optional()
     }
 }

+// Types copied from: https://github.com/diesel-rs/diesel/blob/f2eb9b242d66813a197b29ac058eb86f5c5ec9ba/diesel/src/query_dsl/mod.rs#L1776-L1782
+async fn first_sql<'query, I, U>(s_con: &StorageConnection, i: I) -> QueryResult<U>
+where
+    I: methods::LimitDsl + RunQueryDsl<PgConnection> + std::marker::Send + 'static,
+    U: std::marker::Send + 'static,
+    Limit<I>: LoadQuery<'query, PgConnection, U>,
+{
+    s_con
+        .con(move |connection| methods::LimitDsl::limit(i, 1).get_result(connection))
+        .await
+}
+
+// Types copied from: https://github.com/diesel-rs/diesel/blob/f2eb9b242d66813a197b29ac058eb86f5c5ec9ba/diesel/src/query_dsl/mod.rs#L1428-L1433
+async fn execute_sql<I>(s_con: &StorageConnection, i: I) -> QueryResult<usize>
+where
+    I: methods::ExecuteDsl<PgConnection> + RunQueryDsl<PgConnection> + std::marker::Send + 'static,
+{
+    s_con
+        .con(move |connection| RunQueryDsl::execute(i, connection))
+        .await
+}
+
 fn main() {
     println!("Hello, world!");
 }

It should be possible to make our own trait from the above and implement it for T, so that we can do


        diesel::sql_query("select 1").execute_sql(s_con).await;
        check::dsl::check.filter(check::dsl::id.eq(store_id.to_string())).first_sql(s_con).await.optional()
clemens-msupply commented 2 months ago

sorry meant to clarify yesterday, my comment is misleading. Was just following up on the idea to annotate a method to "asyncify" it. However, think accessing the connection in a closure is better.

Great idea to define an extension trait like that! this should help porting the code over.