davidmoten / rxjava-jdbc

Efficient execution and functional composition of database calls using jdbc and RxJava Observables
Apache License 2.0
806 stars 113 forks source link

Example to handle failures in Transaction: Getting connection-timeout for subsequent requests #72

Open ssumit opened 7 years ago

ssumit commented 7 years ago

Transation 'firstTransaction' has two insert queries Q1 and Q2. In the following example query Q2 fails due to primary key constraint violation.

      @Test
    public void test() {
        Observable<Integer> firstTransactionFirstPart = database.update(queryThatInsertsinTableWithPrimaryKeyA)
                .dependsOn(database.beginTransaction())
                .parameters(new Object[0])
                .count();
        Observable<Integer> firstTransactionSecondPart = database.update(queryThatInsertsinTableWithPrimaryKeyA)
                .dependsOn(firstTransactionFirstPart)
                .parameters(new Object[0])
                .count();
        Observable<Boolean> firstTransaction = database.commit(firstTransactionSecondPart);
        firstTransaction.subscribe(...);//logging

If we attempt to run some another subsequent transaction, we get an exception

     com.github.davidmoten.rx.jdbc.exceptions.SQLRuntimeException: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30000ms.
at com.github.davidmoten.rx.jdbc.ConnectionProviderPooled.get(ConnectionProviderPooled.java:92)
at com.github.davidmoten.rx.jdbc.ConnectionProviderSingletonManualCommit.get(ConnectionProviderSingletonManualCommit.java:43)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.getConnection(QueryUpdateOnSubscribe.java:127)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:80)
at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:22)
at rx.Observable.unsafeSubscribe(Observable.java:8460)

Max pool size is 1. (it is reproducible on even for other values). Full method impl Versions: rxjava-jdbc: 0.7.2 , 0.7.3 Hikari: 2.5.1 postgresql: 9.4.1212 Most likely m doing wrong, but m not able to figure out :(

ssumit commented 7 years ago

If i set hikari data source explicitly with leakDetection threshold = 2000 millis, i get the following exceptn and then SQLTransientConnectionException for the next transaction as before

[jdbc:postgresql://localhost:5432/kronos?user=postgres housekeeper] WARN  [c.z.h.p.ProxyLeakTask] {{appResourceId,sumit@dev}} - Connection leak detection triggered for org.postgresql.jdbc.PgConnection@7bc9e6ab, stack trace follows java.lang.Exception: Apparent connection leak detected
    at com.github.davidmoten.rx.jdbc.ConnectionProviderFromDataSource.get(ConnectionProviderFromDataSource.java:30)
    at com.github.davidmoten.rx.jdbc.ConnectionProviderSingletonManualCommit.get(ConnectionProviderSingletonManualCommit.java:43)
    at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.getConnection(QueryUpdateOnSubscribe.java:127)
    at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:80)
    at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:22)
    at rx.Observable.unsafeSubscribe(Observable.java:8460)
    at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
    at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:76)
    at rx.internal.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:55)
    at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45)
    at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)

Also, how to recover from this situation, since that connection cannot be used anymore.

ssumit commented 7 years ago

Reopening as I cant find community to help me debug this. i have tried two more implementations:

        Observable<Boolean> firstTransaction =
                Observable
                        .just(1)
                        .compose(database.beginTransactionOnNext_())
                        .compose(database.update(query).dependsOnTransformer())
                        .compose(database.update(query).dependsOnTransformer())
                        .compose(database.commitOnNext_());

and

        Observable<Boolean> firstTransaction =
                Observable
                        .just(1)
                        .compose(database.beginTransactionOnNext_())
                        .map(toEmpty())
                        .compose(database.update(query).parameterListTransformer())
                        .compose(RxUtil.<Integer> flatten())
                        .map(toEmpty())
                        .compose(database.update(query).parameterListTransformer())
                        .compose(RxUtil.<Integer> flatten())
                        .compose(database.commitOnNext_());

Even using above styles is giving me same result.

ssumit commented 7 years ago

As expected, the issue is that the sql query in QueryUpdateOnSubscribe object should be a commit or a rollback and then it would call Util::closeQuietly(connection) directly. However, in the case of failure, none of them is getting called as I am not able to handle failure (tried onErrorResumeNext()/onExceptionResumeNext)

Internal Details: Issue: ProxyConnection::close is not getting called Debugger based observation for two update queries in a transaction where the second update is getting failed:

Current flow (important steps): QueryUpdateTransformerFromObservable::call QueryUpdateOnSubscribe::call QueryUpdateOnSubscribe::performUpdate QueryUpdateTransformerFromObservable::call QueryUpdateOnSubscribe::performUpdate this results in SQL Exception which it catches and we go back to QueryUpdateOnSubscribe::call in the catch block we end up calling Util::closeQuietly(PrepareStatement) and then "if not commit and if not rollback" then closeQuietlyIfAutoCommit. (which is getting called but the connection is not auto-commit in this case) Hence Util::closeQuietly(connection) is not getting called

davidmoten commented 7 years ago

Thanks for the report. I'm a bit limited in time at the moment but I'd like to have a look at it soon.

ssumit commented 7 years ago

so i have applied a patch locally that works for my use-case now, basically on getting exception Database::endTransactionObserve & Database::endTransactionSubscribe is invoked and the current connection provider is lost. So any subsequent rollback is treated differently and will block on getting connection (assuming there is just 1 connection). Thus I made changes in QueryUpdateOnSubscribe and QuerySelectOnSubscribe, they basically close the connection if auto-commit is false. (additionally Util.rollback is called but hikariCP ProxyConnection seems to handle these cases anyway). Therefore i do not have to call rollback explicitly on error.

zsiegel commented 7 years ago

I think I am running into the same issue as well. I have the following...

        Observable<Boolean> firstTransaction =
            Observable
                .just(1)
                .compose(db.beginTransactionOnNext_())
                .compose(db.update("insert into table1").dependsOnTransformer())
                .compose(db.update("insert into table2 query exception").dependsOnTransformer())
                .compose(db.commitOnNext_());

        firstTransaction.count().subscribe(next -> {}, err -> {}, () -> {});

        System.out.println("DONE");
        db.update("delete from table1").execute();
        db.update("delete from table2").execute();
        System.out.println("CLEARED");

When I run this query 1 completes but I deliberately cause a query exception on the table 2 operations. When I do this the thread hangs at the first delete table statement and never completes.

Looking into the MySQL transactions table I see an outstanding tx (which I believe is the operation that did not close/rollback properly) and I see a transaction for the "delete from table" that is waiting.

I used SELECT * FROM information_schema.INNODB_TRX; to look into the current transactions although that may not be 100% correct.

zsiegel commented 7 years ago

Upon further debugging it seems this for me is related to autocommit.

Since the first query completes successfully and autocommit is on by default the connection and transaction are closed and then a new connection but no transaction is made :(. Not sure how to go about dealing with this. It feels like the nature of the compose does not play well with auto commit.

ssumit commented 7 years ago

Your observations are in sync with mine. As per my understanding, the auto-commit is off in case of transactions and on for normal queries. We only need to handle the case when auto-commit is off. In case of exception inside a transaction, the catch block in QueryUpdateOnSubscribe::call catches it. We need to make sure that the db connection is then closed. Once the connection is closed, subsequent queries won't wait/timeout.

You can try this: Edit call method in QueryUpdateOnSubscribe.java Inside the catch block, add the following before the existing code:

                if (!state.con.getAutoCommit()) {
                    performRollbackForcefully(state);
                }

and then define a private method like following

    private void performRollbackForcefully(State state) {
        debug("rolling back forcefully");
        query.context().endTransactionObserve();
        Conditions.checkTrue(!Util.isAutoCommit(state.con));
        Util.rollback(state.con);
        if (state.closed.compareAndSet(false, true)) {
            Util.closeQuietly(state.ps);
            Util.closeQuietly(state.con);
        }
        debug("rolled back");
    }
jacinpoz commented 7 years ago

I think I am running into the same issue. Does the catch block look like this after your changes?

catch (Throwable e) {
            try {
                if (!state.con.getAutoCommit()) {
                    performRollbackForcefully(state);
                }
            } catch (SQLException e1) {
                e1.printStackTrace();
            }
            query.context().endTransactionObserve();
            query.context().endTransactionSubscribe();
            try {
                close(state);
            } finally {
                handleException(e, subscriber);
            }
        }

My tests seem to behave much better after this change.