r2dbc / r2dbc-mssql

R2DBC Driver for Microsoft SQL Server using TDS (Tabular Data Stream) Protocol
Apache License 2.0
178 stars 32 forks source link

Using publishOn/subscribeOn hangs up select queries #244

Closed prishedko closed 2 years ago

prishedko commented 2 years ago

Bug Report

Versions

Current Behavior

I've modified test example from ticket 216 to build a bit more complex reactive flow like:

    private void runTest(ConnectionFactory connectionFactory) {
        var results = Mono.from(connectionFactory.create())
                .flatMapMany(c -> readMessages()
                        .flatMap(v -> query(c))
                        .flatMap(v -> query(c))
                        .collectList()
                        .flatMap(v -> Mono.from(c.close()).then(Mono.just(v)))
                        .subscribeOn(Schedulers.single()))
                .collectList()
                .timeout(Duration.ofMillis(TIMEOUT_MS))
                .blockOptional();
        assertTrue(results.isPresent() && !results.get().isEmpty());
    }

    private Flux<Integer> readMessages() {
        return Flux.range(0, MSG_COUNT).subscribeOn(messageHandlerScheduler);
    }

    private Flux<Object> query(Connection c) {
        return Flux.from(c.createStatement("select * from test.Test").execute())
                .flatMap(r -> r.map((row, m) -> row.get(0)))
                .subscribeOn(Schedulers.elastic())
                .publishOn(messageHandlerScheduler);
    }

Execution of such flow hangs up with pretty high probability in most cases. In the attached tests (see below) it looks like:

Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 40.128 sec <<< FAILURE!
shouldNotHangUpWithSmallPool(io.r2dbc.mssql.testcase.MsSqlTest)  Time elapsed: 30.045 sec  <<< ERROR!
reactor.core.Exceptions$ReactiveException: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 30000ms in 'collectList' (and no fallback has been configured)
    at reactor.core.Exceptions.propagate(Exceptions.java:392)
    at reactor.core.publisher.BlockingOptionalMonoSubscriber.blockingGet(BlockingOptionalMonoSubscriber.java:121)
    at reactor.core.publisher.Mono.blockOptional(Mono.java:1752)
    at io.r2dbc.mssql.testcase.AbstractTest.runTest(AbstractTest.java:70)
    at io.r2dbc.mssql.testcase.AbstractTest.shouldNotHangUpWithSmallPool(AbstractTest.java:52)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
    at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
    at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
    at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
    at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:242)
    at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:137)
    at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
    at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
    at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
    at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
    at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingOptionalMonoSubscriber.blockingGet(BlockingOptionalMonoSubscriber.java:123)
        ... 39 more

And sometime (though pretty rare) it fails with:

19:12:03.948 [reactor-tcp-epoll-3] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
io.netty.util.IllegalReferenceCountException: refCnt: 0
    at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454)
    at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1440)
    at io.netty.buffer.AbstractByteBuf.readByte(AbstractByteBuf.java:730)
    at io.netty.buffer.AbstractByteBuf.readUnsignedByte(AbstractByteBuf.java:744)
    at io.r2dbc.mssql.message.tds.Decode.uByte(Decode.java:49)
    at io.r2dbc.mssql.message.type.Length.decode(Length.java:137)
    at io.r2dbc.mssql.codec.AbstractCodec.decode(AbstractCodec.java:99)
    at io.r2dbc.mssql.codec.DefaultCodecs.doDecode(DefaultCodecs.java:196)
    at io.r2dbc.mssql.codec.DefaultCodecs.decode(DefaultCodecs.java:191)

There is the full log for IllegalReferenceCountException case.

I tested it with r2dbc-postgresql and also with using r2dbc-pool - there is no such bug with PostgreSQL and using of the pool only decreases probability of the bug.

Steps to reproduce

To reproduce the issue, please run mssql_tests.sh or mssql_snapshot_isolation_tests.sh from the attached maven project.

mp911de commented 2 years ago
   .flatMap(v -> query(c))
   .flatMap(v -> query(c))

introduces concurrency to a single connection. Please ensure to not run queries concurrently and please avoid nested queries on the same connection since response processing is a stream and your application wanting to consume the next result while the previous result is not fully consumed can lock up the process entirely.

prishedko commented 2 years ago

Looks like it's the case. Thank you for the fast reply!