quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.86k stars 2.7k forks source link

transaction timeout on Reactive flavor + Narayana #30754

Open pjgg opened 1 year ago

pjgg commented 1 year ago

Describe the bug

After making several transactions with Narayana + reactive flavor API the DB connection is blocked and throw the following exception:

ERROR [org.hib.rea.errors] (vert.x-eventloop-thread-1) HR000057: Failed to execute statement [select accountent0_.id as id1_0_, accountent0_.accountNumber as accountn2_0_, accountent0_.amount as amount3_0_, accountent0_.createdAt as createda4_0_, accountent0_.lastName as lastname5_0_, accountent0_.name as name6_0_, accountent0_.updatedAt as updateda7_0_ from account accountent0_ where accountent0_.accountNumber=$1]: could not execute query: java.util.concurrent.CompletionException: io.vertx.core.impl.NoStackTraceThrowable: Timeout
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:636)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
    at io.vertx.core.Future.lambda$toCompletionStage$3(Future.java:386)
    at io.vertx.core.impl.future.FutureImpl$3.onFailure(FutureImpl.java:153)
    at io.vertx.core.impl.future.FutureBase.emitFailure(FutureBase.java:75)
    at io.vertx.core.impl.future.FutureImpl.tryFail(FutureImpl.java:230)
    at io.vertx.core.impl.future.Mapping.onFailure(Mapping.java:45)
    at io.vertx.core.impl.future.FutureImpl$ListenerArray.onFailure(FutureImpl.java:268)
    at io.vertx.core.impl.future.FutureBase.emitFailure(FutureBase.java:75)
    at io.vertx.core.impl.future.FutureImpl.tryFail(FutureImpl.java:230)
    at io.vertx.core.impl.future.PromiseImpl.tryFail(PromiseImpl.java:23)
    at io.vertx.core.impl.future.PromiseImpl.onFailure(PromiseImpl.java:54)
    at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:43)
    at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)
    at io.vertx.sqlclient.impl.pool.SqlConnectionPool$1PoolRequest.lambda$null$1(SqlConnectionPool.java:218)
    at io.vertx.core.net.impl.pool.SimpleConnectionPool$Cancel.run(SimpleConnectionPool.java:676)
    at io.vertx.core.net.impl.pool.CombinerExecutor.submit(CombinerExecutor.java:50)
    at io.vertx.core.net.impl.pool.SimpleConnectionPool.execute(SimpleConnectionPool.java:245)
    at io.vertx.core.net.impl.pool.SimpleConnectionPool.cancel(SimpleConnectionPool.java:638)
    at io.vertx.sqlclient.impl.pool.SqlConnectionPool$1PoolRequest.lambda$onEnqueue$2(SqlConnectionPool.java:215)
    at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:932)
    at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:903)
    at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55)
    at io.vertx.core.impl.DuplicatedContext.emit(DuplicatedContext.java:158)
    at io.vertx.core.impl.ContextInternal.emit(ContextInternal.java:194)
    at io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:921)
    at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:153)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.vertx.core.impl.NoStackTraceThrowable: Timeout

If I remove the annotation @transaction then is working as expected

these are the extensions that I am using:

quarkus-reactive-pg-client
quarkus-resteasy-reactive-jackson
quarkus-hibernate-reactive-panache
quarkus-narayana-jta

Based on the official documentation seems that the only thing that I should do is to add the annotation @transaction. Do you know if there is something else that needs to be configured?.

Expected behavior

No response

Actual behavior

No response

How to Reproduce?

git clone --branch QUARKUS-2492-reactive_02 git@github.com:pjgg/quarkus-test-suite.git
cd quarkus-test-suite/sql-db/reactive-narayana-transactions
mvn quarkus:dev

And then make the following query....around 5 times until block the thread

curl -X POST -H "Content-type:application/json" -d '{"accountTo": "SK0389852379529966291984", "accountFrom": "FR9317569000409377431694J37", "amount": "10"}' http://localhost:8080/transfer/transaction
yuhaibohotmail commented 1 year ago

As far as I am concerned, reactive-pg-client don't support jta.

quarkus-bot[bot] commented 1 year ago

/cc @tsegismont (reactive-sql-clients), @vietj (reactive-sql-clients)

geoand commented 1 year ago

cc @Sanne @DavideD

yrodiere commented 3 months ago

As far as I am concerned, reactive-pg-client don't support jta.

Indeed Reactive SQL clients don't support JTA.

That should probably be documented here though, WDYT @tsegismont ?

yrodiere commented 3 months ago

Related: #30753

tsegismont commented 3 months ago

@yrodiere indeed, we could improve this section https://quarkus.io/guides/reactive-sql-clients#transactions, which shows how to create transaction boundaries using the client#WithTransaction() method.

Before doing this, I think we should clarify if transaction demarcation with annotations is working or not. Because it is said they work in another part of the doc: https://quarkus.io/guides/transaction#reactive-extensions

I'm not familiar with this part of the code, do you know who is?

tsegismont commented 3 months ago

Hey @FroMage , while we're at this, the HR with Panache doc has a section about transactions: https://quarkus.io/guides/hibernate-reactive-panache#transactions

It promotes the usage of io.quarkus.hibernate.reactive.panache.common.WithTransaction, which is now deprecated in favor of org.hibernate.reactive.mutiny.Mutiny.SessionFactory#withTransaction(java.util.function.Function<org.hibernate.reactive.mutiny.Mutiny.Session,io.smallrye.mutiny.Uni<T>>). And this HR helper is not mentioned in the doc.

FroMage commented 3 months ago

It promotes the usage of io.quarkus.hibernate.reactive.panache.common.WithTransaction, which is now deprecated

I don't see where it's deprecated :-/

tsegismont commented 3 months ago

It promotes the usage of io.quarkus.hibernate.reactive.panache.common.WithTransaction, which is now deprecated

I don't see where it's deprecated :-/

My bad, I made a confusion with another (unrelated) piece of code.

FroMage commented 3 months ago

You had me scared :)

micheljung commented 3 months ago

As far as I am concerned, reactive-pg-client don't support jta.

WHAT! I just spend DAYS refactoring away from JetBrains Exposed because it doesn't support JTA and thought "oh, let's make everything reactive while I'm at it" and as I'm almost done, I read that JTA isn't supported? 🥲🥲🥲🥲

Sure, I could simply use client.withTransaction{ } but the application, following hexagonal architecture, is set up such that entrypoints (e.g. HTTP, which open the transaction) don't know anything about the database layer. Therefore, they don't know PgPool. Opening the Transaction in the DB layer is too late.

@Transactional would perfectly allow the entrypoints create a transaction without needing to know anything about the underlying SQL library.

Is there another way this could be achieved? Or can I donate to get JTA support? :)

vietj commented 3 months ago

Interacting with PostgreSQL an coordinate with JTA could be manually achieved I think.

The transaction id of JTA (xid) is sent to Postgresql when preparing a transaction, e.g. PREPARE TRANSACTION $s where s is a string created from the xid, in PostgreSQL JDBC driver it is the function xidToString and to understand how it works, the PGXAConnection can be examined to understand how it actually works.

We could improve the vertx PG client by specifying such transaction ID when creating a transaction. Most of the difficult work would be then to integrate the PG connection with the JTA manager and the fact that it might require thread local associations to work. That is maybe something that could be achieved in Quarkus or in Hibernate Reactive.

Sanne commented 3 months ago

Opening the Transaction in the DB layer is too late.

Could you elaborate on this? Why would it be too late? If you only interact with the DB in the DB layer, it's not particularly useful to start it earlier.

(e.g. HTTP, which open the transaction) don't know anything about the database layer.

When people use such a pattern, it gets quite tricky for the application to react gracefully to an error triggered at transaction commit: you might want to render a different page, redirect, or perhaps retry the operation - which is often needed if one wants to use optimistic locking. You can't retry the operation effectively if you're not in control of the transaction boundaries.

micheljung commented 3 months ago

Thanks for your responses.

Could you elaborate on this? Why would it be too late? If you only interact with the DB in the DB layer, it's not particularly useful to start it earlier.

You may want to call two services in one transaction.

Interface Layer

@Path("")
class SomeEndpoint(
  private val serviceA: ServiceA,
  private val serviceB: ServiceB,
) {

  /** One method calling two services in one transaction. */
  @POST
  @Transactional
  fun someAction(): Uni<Unit> {
      return serviceA.act()
        .flatMap { serviceB.act() }
  }
}

Business Layer

@ApplicationScoped
class ServiceA (
  private val repositoryA: RepositoryA,
) {
  fun act() = repositoryA.act()
}

interface RepositoryA {
 fun act(): Uni<Unit>
}

@ApplicationScoped
class ServiceB (
  private val repositoryB: RepositoryB,
) {
  fun act() = repositoryB.act()
}

interface RepositoryB {
 fun act(): Uni<Unit>
}

Infrastructure Layer

@ApplicationScoped
class ReactiveSqlRepositoryA (
  private val client: PgPool,
) : RepositoryA {
  fun act() = client.query(...).execute()
}

@ApplicationScoped
class ReactiveSqlRepositoryB (
  private val client: PgPool,
) : RepositoryB {
  fun act() = client.query(...).execute()
}

If something goes wrong, you can still react to it, redirect, or retry.

If instead, you open your transaction in ReactiveSqlRepositoryA or ReactiveSqlRepositoryB, you have two separate transactions. If one fails but the other doesn't, you're in a messed up state.

Sanne commented 3 months ago

Ok, thanks for clarifying. So what you want is not necessarily JTA, but to coordinate two different resources via one transaction. Yes this is a reasonable request and we should be able to implement it, but it's not a supported feature today.

micheljung commented 3 months ago

To be precise: Sure, what I want is to coordinate two operations in one transaction. I could already do that by using client.withTransaction() and then passing down the connection from top to bottom, but that's what I want to avoid.

Instead, I would like to use transactions in a resource-agnostic way; to my understanding, that's exactly what JTA is for. I don't need XA if that's what you meant.

micheljung commented 3 months ago

@FroMage I saw you dug into JTA & Reactive in https://github.com/hibernate/hibernate-reactive/issues/25 and wrote a JTA-aware PgPool in Panache-RX. Could you give us some input on what's missing for Reactive SQL Clients to support JTA transactions (namely @Transactional)?

gavinking commented 3 months ago

I thought the problem here was that Narayana itself doesn't work in a reactive way. Has that changed since I last asked >5 years ago?

micheljung commented 3 months ago

I thought the problem here was that Narayana itself doesn't work in a reactive way. Has that changed since I last asked >5 years ago?

Maybe that's why Hibernate Reactive uses @WithTransaction instead of @Transactional. That would also be okay for me. I don't need JTA specifically, just some agnostic transaction handling.

However, Quarkus' transaction documentation implies that @Transactional works with reactive. It sounds like one needs to use context-propagation for this to work, which makes sense, but it's not clear to me how to set things up. It does say something about ThreadContext.TRANSACTION - The JTA Transaction context so it sounds like it's already implemented somehow. It really comes down to #30753 I guess :)

Update So if I understand it correctly, the Narayana transaction works fine with reactive (when using smallrye-context-propagation, see https://github.com/smallrye/smallrye-context-propagation/blob/main/jta/src/main/java/io/smallrye/context/jta/context/propagation/JtaContextProvider.java) but vertx-sql-client doesn't use the JTA API.

yrodiere commented 3 months ago

Before doing this, I think we should clarify if transaction demarcation with annotations is working or not. Because it is said they work in another part of the doc: https://quarkus.io/guides/transaction#reactive-extensions

I'm not familiar with this part of the code, do you know who is?

I think @FroMage worked on the relevant feature (context propagation). I remember @Sanne talking to me about it, and from what I can see in the history, @geoand and @cescoffier were involved as well.

But last I heard about context propagation, there were some caveats... so I'm not sure what the status is exactly. That was a few years ago though -- I'm not following reactive dev work as closely as I should :/

Update So if I understand it correctly, the Narayana transaction works fine with reactive (when using smallrye-context-propagation, see https://github.com/smallrye/smallrye-context-propagation/blob/main/jta/src/main/java/io/smallrye/context/jta/context/propagation/JtaContextProvider.java) but vertx-sql-client doesn't use the JTA API.

Thanks for sharing your findings @micheljung.

@vietj does it seem feasible to make transaction support in vertx pluggable, and have Quarkus plug in a behavior that relies on JTA through the transaction obtained from context propagation?

Or does my suggestion make no sense at all? :-)

FroMage commented 3 months ago

I thought the problem here was that Narayana itself doesn't work in a reactive way. Has that changed since I last asked >5 years ago?

That was indeed the problem, and it's still the case.

the Narayana transaction works fine with reactive but vertx-sql-client doesn't use the JTA API

Indeed, correct.

@micheljung In https://github.com/quarkusio/quarkus/issues/30754#issuecomment-2307475459 I fail to see why you need @Transactional and not @WithTransaction? This would give you a common transaction between both service calls, exactly as it would with @Transactional.

micheljung commented 3 months ago

@FroMage I use Reactive SQL Client, not Hibernate/Panache, and it doesn't come with @WithTransaction. However, I wrote my own @WithTransaction solution to solve this. I will share it once I'm back home :)

micheljung commented 3 months ago

The interceptor and annotation are like the ones from hibernate-reactive-panache-common. This lives in the module common.


import io.smallrye.mutiny.Uni
import jakarta.annotation.Priority
import jakarta.interceptor.AroundInvoke
import jakarta.interceptor.Interceptor
import jakarta.interceptor.InterceptorBinding
import jakarta.interceptor.InvocationContext
import java.lang.annotation.Inherited

// TODO because Quarkus Reactive SQL clients don't support JTA yet, this interface serves as an
//  abstraction for the transaction API.
//  See https://github.com/quarkusio/quarkus/issues/30754#issuecomment-2307475459
interface TransactionOperations {

  fun <RESULT> withTransaction(block: () -> Uni<RESULT>): Uni<RESULT>

}

/**
 * When a [Uni]-returning method is annotated with this annotation, it will be executed within a
 * transaction. If a used on a class, only methods returning a [Uni] are affected.
 *
 * [Mutli]-returning methods are not supported because this would keep the transaction open until
 * the stream is processed, which is generally not desired.
 */
@InterceptorBinding
@Retention(AnnotationRetention.RUNTIME)
@Target(
  AnnotationTarget.CLASS,
  AnnotationTarget.FUNCTION,
)
@Inherited
annotation class WithTransaction

@WithTransaction
@Interceptor
@Priority(Interceptor.Priority.PLATFORM_BEFORE + 200)
internal class WithTransactionInterceptor(
  private val transactionOperations: TransactionOperations,
) {

  @AroundInvoke
  fun intercept(context: InvocationContext): Any = when (context.method.returnType) {
    Uni::class.java -> transactionOperations.withTransaction<Any> { proceedUni(context) }
    else -> context.proceed()
  }

  private fun <T> proceedUni(context: InvocationContext): Uni<T> = try {
    context.proceed() as Uni<T>
  } catch (e: Exception) {
    Uni.createFrom().failure(e)
  }
}

In the infrastructure layer, I implement TransactionOperations to start the transaction and store it in the Vert.x context. The custom TransactionSqlClient then delegates to this transaction.

import com.example.backend.reactiveSql.TransactionOperations
import io.smallrye.mutiny.Uni
import io.vertx.mutiny.core.Vertx
import io.vertx.mutiny.pgclient.PgPool
import io.vertx.mutiny.sqlclient.*
import io.vertx.sqlclient.PrepareOptions
import jakarta.enterprise.context.ApplicationScoped

@ApplicationScoped
internal class PgPoolTransactionOperations(
  private val client: PgPool,
) : TransactionOperations {

  val connection: PgPool
    get() = context().get(CONNECTION_KEY) as PgPool

  override fun <RESULT> withTransaction(block: () -> Uni<RESULT>): Uni<RESULT> =
    client.withTransaction { connection ->
      val context = context()
      context.putLocal(CONNECTION_KEY, connection)
      block()
    }

  companion object {
    private const val CONNECTION_KEY = "pgpool.connection"

    val pgPoolConnection: SqlClient
      get() = ((context().getLocal(CONNECTION_KEY) as SqlClient?)
        ?: error("No transaction ist available. Make sure @WithTransaction is used."))

    private fun context() = Vertx.currentContext() ?: error("No Vertx context found")
  }
}

/** An [SqlClient] that runs within an existing transaction. */
@ApplicationScoped
@Suppress("SqlSourceToSinkFlow")
internal class TransactionSqlClient : SqlClient(null) {

  override fun getDelegate(): io.vertx.sqlclient.SqlClient =
    PgPoolTransactionOperations.pgPoolConnection.delegate

  override fun query(sql: String): Query<RowSet<Row>> =
    PgPoolTransactionOperations.pgPoolConnection.query(sql)

  override fun preparedQuery(sql: String): PreparedQuery<RowSet<Row>> =
    PgPoolTransactionOperations.pgPoolConnection.preparedQuery(sql)

  override fun preparedQuery(sql: String, options: PrepareOptions): PreparedQuery<RowSet<Row>> =
    PgPoolTransactionOperations.pgPoolConnection.preparedQuery(sql, options)

  override fun close(): Uni<Void> = PgPoolTransactionOperations.pgPoolConnection.close()

  override fun closeAndAwait(): Void = PgPoolTransactionOperations.pgPoolConnection.closeAndAwait()

  override fun closeAndForget() {
    PgPoolTransactionOperations.pgPoolConnection.closeAndForget()
  }
}

Now, the repositories can inject the SqlClient as they normally would:

@ApplicationScoped
class ReactiveSqlRepositoryB (
  private val client: SqlClient,
) : RepositoryB {
  fun act() = client.query(...).execute()
}

and the endpoint is in control of the transaction boundary:

@Path("")
@WithTransaction
class SomeEndpoint(
  private val serviceA: ServiceA,
  private val serviceB: ServiceB,
) {

  @POST
  fun someAction(): Uni<Unit> {
      return serviceA.act()
        .flatMap { serviceB.act() }
  }
}

and the business logic layer is free from Reactive SQL Client code :)

cescoffier commented 1 day ago

@FroMage what do you think about that solution?

FroMage commented 1 day ago

Well, of course we can move a @WithTransaction lower down the stack from HR/Panache to the reactive-sql module.

How this integrates with the HR/Panache version, though, I'm not sure. Here the transaction is scoped to the SqlClient, apparently, and I'm not entirely sure where it's closed, but I suppose it's request-scoped. In the case of HR/Panache it's scoped to the HR Session.

Also, there's the question of whether such a high-level feature (annotations and interceptors) belong in the lower-level module.

If we had such an annotation in the lower module, I'm not sure how we could make sure that it would behave differently depending on whether HR/Panache was present.

If it had a different name, that could make sense to have both. How would we name it, though? @WithSqlTransaction?