quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.64k stars 2.64k forks source link

Difficult to use transactions with blocking code and reactive SQL drivers #34106

Closed kenkania closed 1 month ago

kenkania commented 1 year ago

Description

We are using the reactive drivers but in a blocking fashion, ie pgPool.query("SELECT * FROM mytable").executeAndAwait(). IIUC, this should be very performant when using virtual threads (since only the JDBC/non-reactive drivers pin the carrier thread).

With non-reactive drivers, you can easily add a JTA annotation and begin/end a transaction in a call stack safely. With reactive drivers, it appears there are at least two common transaction options (below), but neither supports using blocking code within the transaction. Can this be improved, or perhaps am I missing some other easy alternative?

1) PgPool#withTransaction

(https://quarkus.io/guides/reactive-sql-clients#transactions)

The main problem here is that: a) you have to pass the allocated SqlConnection throughout the entire stack b) the passed-in function has to return a Uni (or future) and cannot block

vertx-sql-client has recently added ability to specify TransactionPropagation.CONTEXT to this method, which will store/lookup a SqlConnection in the Vertx context. This somewhat improves a), although it is difficult (or at least non-obvious) to test code that uses this in quarkus unit tests (complains with java.lang.NullPointerException: Cannot invoke "io.vertx.core.impl.ContextInternal.getLocal(Object)" because "context" is null). Regardless, even if that is resolved, b) is still an issue.

2) @Transactional on Uni-returning methods

(https://quarkus.io/guides/transaction#reactive-extensions) (https://quarkus.io/guides/context-propagation#usage-example-with-mutiny)

Implementation ideas

No response

quarkus-bot[bot] commented 1 year ago

/cc @tsegismont (reactive-sql-clients), @vietj (reactive-sql-clients)

kenkania commented 1 year ago

Perhaps related to https://github.com/quarkusio/quarkus/issues/34101

geoand commented 1 year ago

this should be very performant when using virtual threads (since only the JDBC/non-reactive drivers pin the carrier thread)

cc @cescoffier

RobertoUa commented 3 months ago

any news?

tsegismont commented 1 month ago

@kenkania @RobertoUa as an alternative, you could get a connection from the pool and manage the transaction manually:

    @Inject
    PgPool client;

    @RunOnVirtualThread
    void doSomethingInTransaction() {
        SqlConnection conn = null;
        try {
            conn = client.getConnectionAndAwait();
            Transaction transaction = conn.beginAndAwait();

            doStuffTheBlockingWayOnVirtualThread(conn);

            // Everything went well
            transaction.commitAndAwait();

        } catch (Throwable t) {
            // Something went wrong
            if (conn != null) {
                conn.closeAndForget();
            }
        }
    }
RobertoUa commented 1 month ago

I solved this issue in kotlin using propagated connections

@OptIn(ExperimentalCoroutinesApi::class)
suspend fun <T> PgPool.transaction(block: suspend SqlConnection.() -> T): T = coroutineScope {
    if (io.vertx.core.Vertx.currentContext() == null) { // usually there's no context when called directly from tests
        Log.info("Transaction called not from a vertx thread. Creating new context...")
        val vertx = VertxCoreRecorder.getVertx().get()
        val context = VertxContext.getOrCreateDuplicatedContext(vertx)
        context.dispatcher().invoke {
            this@transaction.withTransaction(TransactionPropagation.CONTEXT) { con ->
                async(context.dispatcher()) { con.block() }.asUni()
            }.awaitSuspending()
        }
    } else {
        this@transaction.withTransaction(TransactionPropagation.CONTEXT) { connection ->
            async { connection.block() }.asUni()
        }.awaitSuspending()
    }
}

fun getPropagatedConnection(): SqlConnection? =
    io.vertx.core.Vertx.currentContext()
        ?.getLocal<io.vertx.sqlclient.SqlConnection>(PoolImpl.PROPAGATABLE_CONNECTION)
        ?.let { SqlConnection(it) }

and then to use it:

class MyPgPool(pool: io.vertx.pgclient.PgPool) : PgPool(pool) {
    override fun preparedQuery(@Language("sql") sql: String?): PreparedQuery<RowSet<Row>> {
        return getPropagatedConnection()?.preparedQuery(sql) ?: super.preparedQuery(sql)
    }
}

Now I don't even need to pass a connection around and I can use suspend functions inside. I can just do

pgPool.transactional {
 query1() //this is kotlin suspend function which calls 'pgPool.preparedQuery()...awaitSuspending()'
 query2()
}
cescoffier commented 1 month ago

See @tsegismont example.

tsegismont commented 1 month ago

@RobertoUa thanks for sharing!