oxidecomputer / async-bb8-diesel

Safe asynchronous access to Diesel and the bb8 connection manager
MIT License
12 stars 8 forks source link

Support for async transaction functions #32

Closed jmpesp closed 2 years ago

jmpesp commented 2 years ago

All transactions in omicron are currently required to be synchronous. It'd be nice to have an async version of diesel::connection::TransactionManager:

/// An async variant of [`diesel::connection::TransactionManager`].
#[async_trait]
pub trait TransactionManager<Conn>
where
    Conn: 'static + DieselConnection
{
    type TransactionStateData;

    async fn begin_transaction(conn: &mut Conn) -> QueryResult<()>;
    async fn rollback_transaction(conn: &mut Conn) -> QueryResult<()>;
    async fn commit_transaction(conn: &mut Conn) -> QueryResult<()>;
    async fn transaction_manager_status_mut(
        conn: &mut Conn
    ) -> &mut TransactionManagerStatus;

    async fn transaction<T, F, R, E>(conn: &mut Conn, callback: F) -> Result<R, E>
    where
        T: Future<Output = Result<R, E>> + Send + Sync,
        F: FnOnce(&mut Conn) -> T + Send + Sync,
        E: From<DieselError> + Send + Sync,
        R: Send + Sync
    {
        Self::begin_transaction(conn).await?;
        match callback(&mut *conn).await {
            Ok(value) => {
                Self::commit_transaction(conn).await?;
                Ok(value)
            }

            Err(user_error) => match Self::rollback_transaction(conn).await {
                Ok(()) => Err(user_error),
                Err(DieselError::BrokenTransactionManager) => {
                    // In this case we are probably more interested by the
                    // original error, which likely caused this
                    Err(user_error)
                }
                Err(rollback_error) => Err(rollback_error.into()),
            }
        }
    }
}

or an additional function async_transaction in the AsyncConnection trait, or something else entirely, in order to support moving more code into the transaction itself:

self.pool_authorized(opctx)
    .await?  
    .async_transaction(async move |conn| {
        opctx.authorize(...).await?;
        delete_old_query.execute_async(conn).await?;
        Ok(insert_new_query.get_results_async(conn).await?)
    })
    .await   
    .map_err(|e| public_error_from_diesel_pool(e, ErrorHandler::Server))

One of the motivators for this was opctx.authorize moving into the transaction function, either explicitly, or to be passed into a query fragment builder function like

    pub async fn silo_create_do_query(
        opctx: &OpContext,
        silo: Silo,
    ) -> impl RunnableQuery<Silo> {
        opctx.authorize(authz::Action::CreateChild, &authz::FLEET).await?;

        use db::schema::silo::dsl;
        diesel::insert_into(dsl::silo)
            .values(silo)
            .returning(Silo::as_returning())
    }

so that each caller wouldn't have to remember to do this authz check.

davepacheco commented 2 years ago

I think this fits into the general problem of structuring code so that we can generate queries that can be run by themselves or inserted into a transaction, while also doing things like authz checks when we run these queries. There's a risk here, though. Queries that you run inside a transaction can acquire exclusive locks on parts of the database. This can be a source of tail latency at any kind of scale. This gets worse when there's a partition or even just a node reboot that takes a minute to detect. Those locks can wind up held until the problem is detected and the transaction aborted. All that's to say that by making these async, it may make it easy to do a lot more stuff that we actually don't want to happen inside a transaction.

See also oxidecomputer/omicron#973 -- I think the long-term hope here is to use some kind of batched transaction instead.

smklein commented 2 years ago

I have a prototype implementation of this feature in https://github.com/oxidecomputer/async-bb8-diesel/pull/33 , and a demonstration of it being used in omicron here: https://github.com/oxidecomputer/omicron/pull/1621 . Ultimately I think these PRs are "worth it" - not dealing with sync interop is a major plus - but there are some risks to be aware of in this PR, especially with the example you mentioned @jmpesp .

Notably, the call to do authorization:

        opctx.authorize(authz::Action::CreateChild, &authz::FLEET).await?;

Could now be done in an asynchronous context with the addition of transaction_async, but it might not do what you think.

Note that the authz check does not take a pool, datastore, or connection as an argument. The opctx structure has a connection to the pool, so if it's using the DB, it's using it's own connection.

This might be somewhat of a problem - when we set up code like this:

    pool.transaction_async(|conn| async move {
       // Run your code here!
    })
    .await
    .unwrap();

We provide a single connection for your code to use, but if you ignore conn, it is possible to run arbitrary asynchronous code. You could get a different connection.