oracle / oracle-r2dbc

R2DBC Driver for Oracle Database
https://oracle.com
Other
194 stars 40 forks source link

NullPointer when using R2DBC Pool 0.9.0.RELEASE with version 0.4.0 #64

Closed sgtcortez closed 2 years ago

sgtcortez commented 2 years ago

Hi, I am facing a weird problem using:

Java Version: Oracle JDK 17

implementation 'io.r2dbc:r2dbc-pool:0.9.0.RELEASE'
implementation 'io.r2dbc:r2dbc-spi:0.9.0.RELEASE'
runtimeOnly 'com.oracle.database.r2dbc:oracle-r2dbc:0.4.0'

First of all, I was using the version 0.1.0 with R2DBC POOL, and, it works when executing a single statement, but, with paralell calls, I face the problem with: Multiple subscribers ...

So, I came here, and, read that thats a know problem with version 0.1.0. Then, I upgrade to the version 0.4.0 which, is the latest.
But, trying to execute the same query(which works with version 0.1.0) , but, I am receiving a NullPointer inside oracle r2dbc classes.

StackTrace:

java.lang.NullPointerException: Cannot invoke "java.util.ArrayDeque.size()" because "this.implicitResultSetStatements" is null
    at oracle.jdbc.driver.OracleStatement.getMoreResults(OracleStatement.java:5851)
    at oracle.jdbc.driver.OracleStatementWrapper.getMoreResults(OracleStatementWrapper.java:298)
    at oracle.r2dbc.impl.OracleStatementImpl$JdbcStatement.lambda$getResults$4(OracleStatementImpl.java:1053)
    at oracle.r2dbc.impl.AsyncLock.lambda$get$2(AsyncLock.java:161)
    at oracle.r2dbc.impl.AsyncLock.unlock(AsyncLock.java:122)
    at oracle.r2dbc.impl.AsyncLock$UsingConnectionSubscriber.terminate(AsyncLock.java:510)
    at oracle.r2dbc.impl.AsyncLock$UsingConnectionSubscriber.onComplete(AsyncLock.java:496)
    at reactor.core.publisher.StrictSubscriber.onComplete(StrictSubscriber.java:123)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
    at org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber.onComplete(FlowAdapters.java:228)
    at oracle.jdbc.internal.CompletionStageUtil$IteratorSubscription.emitComplete(CompletionStageUtil.java:681)
    at oracle.jdbc.internal.CompletionStageUtil$IteratorSubscription.emitItems(CompletionStageUtil.java:628)
    at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$10(PhysicalConnection.java:11713)
    at java.base/java.security.AccessController.doPrivileged(AccessController.java:399)
    at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$11(PhysicalConnection.java:11711)
    at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

How I execute the query:

return Mono.usingWhen(
        connectionPool.create(),
        connection -> Mono.from(connection.createStatement(QUERY)
                .bind(0, destinationState)
                .bind(1, subsidiaryId)
                .bind(2, itemId)
                .execute()
        ),
        Connection::close,
        ((connection, throwable) -> connection.close()),
        Connection::close
)
        .flatMapMany(it -> it.map(mapper))
        .next();

How I create the connection pool:

public ConnectionPool connectionFactory() {
    return new ConnectionPool(ConnectionPoolConfiguration
            .builder()
            .connectionFactory(ConnectionFactories.get(
                    ConnectionFactoryOptions
                            .builder()
                            .from(ConnectionFactoryOptions.parse(url))
                            .option(ConnectionFactoryOptions.USER, user)
                            .option(ConnectionFactoryOptions.PASSWORD, password)
                            .option(ConnectionFactoryOptions.DRIVER, DRIVER)
                            .option(Option.valueOf("applicationName"), "catalog-service-app")
                            .build()
                    )
            )
            .initialSize(INITIAL_CONNECTIONS)
            .maxSize(maxConnections)
            .maxIdleTime(Duration.ofSeconds(maxIdleTime))
            .validationQuery(VALIDATION_QUERY)
            .build()
    );
}

I thought that I would be a problem with dependency versions, but, I checked the dependencies and, I am using the correct ones.
Please, tell me where I am making a mistake

Dependencies:
image

sgtcortez commented 2 years ago

By the way, even this simple statement throws the same null pointer

    Mono.from(connectionPool.create())
            .flatMapMany(connection ->
                    Flux.from(connection.createStatement("SELECT 1 FROM dual").execute())
                            .flatMap(result ->
                                    result.map((row, metadata) -> row.get(0, Integer.class)))
            ).subscribe(s -> System.out.println("Value: " + s ));
    return Mono.empty();
sgtcortez commented 2 years ago

I will make a test with java 11. I am not sure if oracle jdbc already supports java 17

Michael-A-McMahon commented 2 years ago

Thanks for all these details, @sgtcortez. This NPE looks like the same issue we saw here: #63 Make sure you have the 21.3.0.0 version of Oracle JDBC on the classpath. The 21.1 version has a bug in Statement.getMoreResults() that will trigger an NPE.

sgtcortez commented 2 years ago

Thank you @Michael-A-McMahon. I will make this test, and, I will come back here with the result.

sgtcortez commented 2 years ago

Hi, @Michael-A-McMahon I tested, and, now I am facing another problem ...

oracle.r2dbc.impl.OracleR2dbcExceptions$OracleR2dbcException: Closed Statement: getStatement
    at oracle.r2dbc.impl.OracleR2dbcExceptions.toR2dbcException(OracleR2dbcExceptions.java:217)
    at oracle.r2dbc.impl.OracleR2dbcExceptions.fromJdbc(OracleR2dbcExceptions.java:282)
    at oracle.r2dbc.impl.OracleReactiveJdbcAdapter.publishRows(OracleReactiveJdbcAdapter.java:747)
    at oracle.r2dbc.impl.OracleResultImpl$ResultSetResult.publishSegments(OracleResultImpl.java:479)
    at oracle.r2dbc.impl.OracleResultImpl.publishSegments(OracleResultImpl.java:150)
    at oracle.r2dbc.impl.OracleResultImpl.map(OracleResultImpl.java:219)
    at br.com.dimed.catalogservice.repository.TaxRuleRepository.lambda$fetchTaxRule$6(TaxRuleRepository.java:116)
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:163)
    at reactor.core.publisher.MonoUsingWhen$MonoUsingWhenSubscriber.deferredComplete(MonoUsingWhen.java:278)
    at reactor.core.publisher.FluxUsingWhen$CommitInner.onComplete(FluxUsingWhen.java:540)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
    at reactor.pool.SimpleDequePool.lambda$maybeRecycleAndDrain$19(SimpleDequePool.java:513)
    at reactor.core.publisher.LambdaMonoSubscriber.onComplete(LambdaMonoSubscriber.java:135)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:145)
    at reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102)
    at reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102)
    at oracle.r2dbc.impl.AsyncLock$UsingConnectionSubscriber.onComplete(AsyncLock.java:497)
  // This one works
    Mono.from(connectionPool.create())
            .flatMapMany(connection ->
                    Flux.from(connection.createStatement("SELECT 1 FROM dual").execute())
                            .flatMap(result ->
                                    result.map((row, metadata) -> row.get(0, Integer.class)))
            ).subscribe(s -> System.out.println("Value: " + s ));

   // This one does not work
    return Mono.usingWhen(
            connectionPool.create(),
            connection -> Mono.from(connection.createStatement(QUERY)
                    .bind(0, destinationState)
                    .bind(1, subsidiaryId)
                    .bind(2, itemId)
                    .bind(3, destinationState)
                    .bind(4, itemId)
                    .bind(5, destinationState)
                    .bind(6, itemId)
                    .execute()
            ),
            Connection::close,
            ((connection, throwable) -> connection.close()),
            Connection::close
    )
            .flatMapMany(it -> it.map(mapper))
            .next();

I am not sure, but, might call close before create.

Dependencies:

implementation 'io.r2dbc:r2dbc-pool:0.9.0.RELEASE'
implementation 'io.r2dbc:r2dbc-spi:0.9.0.RELEASE'
implementation 'com.oracle.database.jdbc:ojdbc11:21.3.0.0'
runtimeOnly 'com.oracle.database.r2dbc:oracle-r2dbc:0.4.0'
Michael-A-McMahon commented 2 years ago

We have:

            .flatMapMany(it -> it.map(mapper))

And it is downstream of the usingWhen publisher, so the Connection::close happens before the invocation of map on the Result. For Oracle R2DBC, a Result is not longer valid after the connection is closed.

You could try moving the flatMapMany operator into the connection using function, like this:

...
.execute()
.flatMapMany(it -> it.map(mapper))

The main thing to understand is that anything inside of the connection -> using lambda will happen before the Connection::close. It is within this scope that you want to get all your database calls done. If you know JDBC, then you can think of this scope to be the equivalent of:

try (Connection connection = dataSource.getConnection()) {
... Connection, and any Statements or ResultSets it creates are only valid inside of this try block
}
sgtcortez commented 2 years ago

@Michael-A-McMahon thank you for your help.
This was the first time using r2dbc with oracle, I had used only postgres, and the block of code above always worked for me ...

With your help, I changed the code, and now, It works ! My new code:

public Mono<TaxRuleDto> fetchTaxRule(
        final long itemId,
        final long subsidiaryId,
        final String destinationState) {

    return Flux.usingWhen(
            connectionPool.create(),
            connection -> Mono.from(connection.createStatement(QUERY)
                    .bind(0, destinationState)
                    .bind(1, subsidiaryId)
                    .bind(2, itemId)
                    .bind(3, destinationState)
                    .bind(4, itemId)
                    .bind(5, destinationState)
                    .bind(6, itemId)
                    .execute()
            ).flatMapMany(it -> it.map(mapper)),
            Connection::close,
            ((connection, throwable) -> connection.close()),
            Connection::close
    ).next();

This query can return only one row, but, I could make it work with Mono, so, for now, It is fine!

Again, thanks for your time & help.

vedhavi commented 2 years ago

Hi All, The solution doesnt seem to be working when using Spring Data r2DBC. Using the 21.3.0.0 driver, the connection doesnt close. Please do not say that its an issue from Spring Data. I started this issue where connection pool doesnt work for Oracle R2DBC and ended up here. Could someone tell whats the solution if we use Spring reactive repositories + Transaction + Pooling + Oracle. I have gone through all the solution and finally stumbled upon this page. It would be highly appreciated if I could get some help.

Michael-A-McMahon commented 2 years ago

Hi @vedhavi. Thanks for bringing this to my attention. Can you give me a bit more details about this? In which case do you see a connection not being closed? If you can share a bit a code that reproduces the issue, that would be excellent.