jklingsporn / vertx-jooq

A jOOQ-CodeGenerator to create vertx-ified DAOs and POJOs.
MIT License
385 stars 53 forks source link

Reactive QueryExecutor transaction rollback fails to close connection #197

Closed aleksandar78 closed 2 years ago

aleksandar78 commented 2 years ago

Hi,

we encounter very strange behavior when pg driver exception occurs inside transaction closure in ReactiveClassicGenericQueryExecutor.

Problem

We have some concurrent operations that try to insert a new row on the specific table. The constraints on that table will reject insert on some condition. When the system was under pressure due to the high number of operations after a very small amount of time (10 minutes) application went off. The first thing that we did is to limit the number of operations and we found that time required before the application was blocked changed to 12-24 hours. The problem was that we couldn't see any low-level error in log except "Duplicate keys etc" statement inside the Future.recover { } method.

Our code was something like this:

executor.transcation { tx ->
  tx.execute {  dsl ->
     dsl.insertInto() /* new row */
  }.compose {
     tx.execute { dsl -> 
        dsl.update() /* update if insert ends correctly */
    }
  }
}.recover { err -> 
  logger.error { err }
  Future.failedFuture(CustomException(err))
}

We use this pattern in every place where the transaction is required.

We add some monitoring on Postgres. When the application was blocked we had a certain number of connections with the following state: idle in transaction (aborted). This is a well-known error in Postgres which is a signal that something went wrong on the application side during the transaction error handling.

We tried to change code and use BEGIN/COMMIT/ROLLBACK in the explicit way by changing code pattern in something similar to following code:

executor.beginTransaction { tx ->
  tx.execute {  dsl ->
     dsl.insertInto() /* new row */
  }.compose {
     tx.execute { dsl -> 
        dsl.update() /* update if insert ends correctly */
    }
  }.compose {
    // commit 
    tx.commit()
  }.recover {
    // rollback
    tx.rollback()
  }
}.recover { err -> 
  logger.error { err }
  Future.failedFuture(CustomException(err))
}

Initially, we thought that the problem was disappeared as the Postgres monitor didn't show idle in transaction (aborted) anymore. However, the application keeps blocking after the same amount of time!

We tried to rewrite the method only with pg-reactive without jooq-vertx by using the new method PgPool.withTransaction { } that handles in the user transparently manner all transaction commit/rollback logic. The test we made was with thousands of concurrent operations every minute. The application never blocks and the Postgres monitor shows only active/idle connections which is correct.

Going by the exclusion we made some reverse engineering inside ReactiveClassicGenericQueryExecutor.transaction { } and PgPool.withTransaction { } methods as shown in the vertx source code:

default <T> Future<@Nullable T> withTransaction(Function<SqlConnection, Future<@Nullable T>> function) {
  return getConnection()
    .flatMap(conn -> conn
      .begin()
      .flatMap(tx -> function
        .apply(conn)
        .compose(
          res -> tx
            .commit()
            .flatMap(v -> Future.succeededFuture(res)),
          err -> {
            if (err instanceof TransactionRollbackException) {
              return Future.failedFuture(err);
            } else {
              return tx
                .rollback()
                .compose(v -> Future.failedFuture(err), failure -> Future.failedFuture(err));
            }
          }))
      .onComplete(ar -> conn.close()));
}

The main difference was that the last one, which was added in vertx 4, closes the connection when the current transaction operation is completed, regardless of commit/rollback. In ReactiveClassicGenericQueryExecutor.transaction { } we didn't find any rollback handling at all.

Solution

We created our implementation of ReactiveClassicGenericQueryExecutor:

/**
 * This specialization has been made due to: transaction rollback problem and JOOQ parser exception
 */
class CustomReactiveClassicGenericQueryExecutor(private val configuration: Configuration, delegate: SqlClient) :
  ReactiveClassicGenericQueryExecutor(configuration, delegate) {

  /**
   * super.execute(queryFunction) can throw an Exception when JOOQ fails to parse query inside createQuery(queryFunction).
   * As vertx Future is Monad in reactive world only Future should be returned. 
   */
  override fun execute(queryFunction: Function<DSLContext, out Query>): Future<Int> {
    return try {
      super.execute(queryFunction)
    } catch (e: Throwable) {
      Future.failedFuture(e)
    }
  }

  /**
   * super.query(queryFunction) can throw an Exception when JOOQ fails to parse query inside createQuery(queryFunction).
   * As vertx Future is Monad in reactive world only Future should be returned.
   */
  override fun <R : Record?> query(queryFunction: Function<DSLContext, out ResultQuery<R>>): Future<QueryResult> {
    return try {
      super.query(queryFunction)
    } catch (e: Throwable) {
      Future.failedFuture(e)
    }
  }

  /**
   * This method has been overridden with new PgPool.withTransaction method that
   * internally handles transaction logic and keep connection pool free of invalid connections.
   */
  override fun <U : Any?> transaction(transaction: Function<ReactiveClassicGenericQueryExecutor, Future<U>>): Future<U> {
    if (delegate !is Pool) {
      return Future.failedFuture(IllegalStateException("pool not given"))
    }

    return delegate.withTransaction { conn ->
      val txExecutor = ReactiveClassicGenericQueryExecutor(configuration, conn)
      transaction.apply(txExecutor)
    }
  }
}

As you can see we override the transaction method and the implementation is free of Transaction object handling. This code runs in our production environment without problem. The other 2 methods (execute and query), as you can see in the comments, are have been overridden due to JOOQ parser exception that throws an error, and Future never ends. I think that is very important in vertx to not use try/catch outside Future flow.

I'm quite sure that it could be improved but it could be a good starting point.

Context

Vertx: 4.2.3 Jooq: 3.15.5 Vertx-jooq: 6.4.0 JDK: 11

I couldn't try with the older version of vertx (ver. 3) to see if something changed on the PgPool level, as the problem could be related to its implementation that has been enhanced/improved a lot in vertx 4.

jklingsporn commented 2 years ago

Thank you very much for your detailed report. Looks like quite a serious bug. Let me see if I can create a reproducer in the tests somehow so we can verify the fix.

aleksandar78 commented 2 years ago

@jklingsporn it's enough to set low pool connection (ex. 5) and make 10 requests of "insert into" with the same key or something similar.

jklingsporn commented 2 years ago

This has been fixed in the branch https://github.com/jklingsporn/vertx-jooq/tree/release6.4.1 It is not yet released but I'd be happy if you can try it out and confirm the issue is fixed for you.

aleksandar78 commented 2 years ago

I'll give it a try in the next few days. Thanks