micronaut-projects / micronaut-data

Ahead of Time Data Repositories
Apache License 2.0
462 stars 195 forks source link

[Data R2dbc]: Connection has been closed BEFORE send operation #2373

Closed hantsy closed 11 months ago

hantsy commented 1 year ago

Expected Behavior

I used R2dbcOperations.withConnection to build a Repository like this.

@Singleton
@RequiredArgsConstructor
public class CustomerDaoWithR2dbcOperations implements CustomerDao {
    public static final BiFunction<Row, RowMetadata, Customer> MAPPING_FUNCTION = (row, rowMetadata) -> {
        var id = row.get("id", UUID.class);
        var name = row.get("name", String.class);
        var age = row.get("age", Integer.class);
        var street = row.get("street", String.class);
        var city = row.get("city", String.class);
        var zip = row.get("zip", String.class);
        var version = row.get("age", Long.class);
        return new Customer(id, name, age, new Address(street, city, zip), version);
    };
    private final R2dbcOperations r2dbcOperations;

    @Override
    public Flux<Customer> findAll() {
        var sql = "SELECT *  FROM  customers ";
        return Flux.from(
                r2dbcOperations.withConnection(connection -> Flux
                        .from(connection.createStatement(sql).execute())
                        .flatMap(r -> r.map(MAPPING_FUNCTION))
                )
        );
    }

    @Override
    public Mono<Customer> findById(UUID id) {
        var sql = "SELECT *  FROM  customers WHERE id=$1 ";
        return Mono.from(
                r2dbcOperations.withConnection(connection -> Mono
                        .from(connection.createStatement(sql)
                                .bind(0, id)
                                .execute()
                        )
                        .flatMap(r -> Mono.from(r.map(MAPPING_FUNCTION)))
                        .switchIfEmpty(Mono.empty())
                )
        );
    }

}

The complete CustomerDaoWithR2dbcOperations is here.

And the CustomerDaoWithR2dbcOperationsTest failed now.

Actual Behaviour

11:37:00.910 [reactor-tcp-nio-3] WARN  i.r.p.client.ReactorNettyClient - Connection Error
reactor.netty.channel.AbortedException: Connection has been closed BEFORE send operation
    at reactor.netty.channel.AbortedException.beforeSend(AbortedException.java:59)
    at reactor.netty.channel.ChannelOperations.send(ChannelOperations.java:290)
    at reactor.netty.NettyOutbound.send(NettyOutbound.java:86)
    at io.r2dbc.postgresql.client.ReactorNettyClient.lambda$new$3(ReactorNettyClient.java:174)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
    at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerNext(FluxConcatMapNoPrefetch.java:258)
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:863)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2545)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2341)
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2215)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:178)
    at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4495)
    at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onNext(FluxConcatMapNoPrefetch.java:206)
    at reactor.core.publisher.SinkManyUnicast.drainRegular(SinkManyUnicast.java:282)
    at reactor.core.publisher.SinkManyUnicast.drain(SinkManyUnicast.java:364)
    at reactor.core.publisher.SinkManyUnicast.request(SinkManyUnicast.java:430)
    at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerComplete(FluxConcatMapNoPrefetch.java:274)
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:887)
    at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:230)
    at reactor.core.publisher.SinkManyUnicast.checkTerminated(SinkManyUnicast.java:389)
    at reactor.core.publisher.SinkManyUnicast.drainRegular(SinkManyUnicast.java:274)
    at reactor.core.publisher.SinkManyUnicast.drain(SinkManyUnicast.java:364)
    at reactor.core.publisher.SinkManyUnicast.tryEmitComplete(SinkManyUnicast.java:203)
    at reactor.core.publisher.SinkManySerialized.tryEmitComplete(SinkManySerialized.java:64)
    at reactor.core.publisher.InternalManySink.emitComplete(InternalManySink.java:68)
    at io.r2dbc.postgresql.ExtendedFlowDelegate$ExtendedFlowOperator.close(ExtendedFlowDelegate.java:337)
    at io.r2dbc.postgresql.ExtendedFlowDelegate.lambda$fetchAll$3(ExtendedFlowDelegate.java:143)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:146)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onError(FluxDoFinally.java:122)
    at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:213)
    at reactor.core.publisher.FluxCreate$BaseSink.error(FluxCreate.java:474)
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:802)
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.error(FluxCreate.java:747)
    at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:237)
    at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:176)
    at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.emit(ReactorNettyClient.java:687)
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:939)
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:813)
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:719)
    at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
    at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:292)
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:401)
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:411)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:113)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:454)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)
11:37:00.921 [io-executor-thread-1] DEBUG c.e.CustomerDaoWithR2dbcOperationsTest - generated id: 53c2dc87-bd7c-4c7c-9cfc-578fa3910cde
11:37:00.923 [io-executor-thread-1] DEBUG i.m.d.r.o.DefaultR2dbcRepositoryOperations - Creating a new Connection for DataSource: default
11:37:01.060 [reactor-tcp-nio-4] DEBUG i.m.d.r.o.DefaultR2dbcRepositoryOperations - Closing Connection for DataSource: default
11:37:01.068 [io-executor-thread-1] WARN  i.m.t.s.AbstractReactorTransactionOperations - Rolling back transaction on error: java.lang.AssertionError: expectation "consumeNextWith" failed (expected: onNext(); actual: onComplete())
io.micronaut.transaction.test.DefaultTestTransactionExecutionListener$UncheckedException: java.lang.AssertionError: expectation "consumeNextWith" failed (expected: onNext(); actual: onComplete())
    at io.micronaut.transaction.test.DefaultTestTransactionExecutionListener.lambda$interceptTest$0(DefaultTestTransactionExecutionListener.java:95)
    at io.micronaut.transaction.TransactionCallback.apply(TransactionCallback.java:37)
    at io.micronaut.transaction.sync.SynchronousTransactionOperationsFromReactiveTransactionOperations.lambda$execute$0(SynchronousTransactionOperationsFromReactiveTransactionOperations.java:75)
    at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:47)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4495)
    at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:126)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.AssertionError: expectation "consumeNextWith" failed (expected: onNext(); actual: onComplete())

Steps To Reproduce

  1. clone https://github.com/hantsy/micronaut-sandbox/tree/master/data-r2dbc
  2. Run CustomerDaoWithR2dbcOperationsTest

Environment Information

Windows 10 pro Java 17

Example Application

https://github.com/hantsy/micronaut-sandbox/tree/master/data-r2dbc

Version

4.0.0

dstepanov commented 11 months ago

I don't think anything can be fixed here, the main problem is that you are using Mono.from which is canceling the upstream and that is causing all kind of problems. The solution is to use Mono.fromDirect or use Reactor alternatives of connection/transaction managers.

I did rewrite your code to:

  private final R2dbcOperations r2dbcOperations;
    private final ReactorConnectionOperations<Connection> connectionOperations;
    private final ReactorReactiveTransactionOperations<Connection> transactionOperations;

    @Override
    public Flux<Customer> findAll() {
        var sql = "SELECT *  FROM  customers ";
        return connectionOperations.withConnectionFlux(status -> Flux
                .from(status.getConnection().createStatement(sql).execute())
                .flatMap(r -> r.map(MAPPING_FUNCTION))
        );
    }

    @Override
    public Mono<Customer> findById(UUID id) {
        var sql = "SELECT *  FROM  customers WHERE id=$1 ";
        return connectionOperations.withConnectionMono(status -> Mono
                .from(status.getConnection().createStatement(sql)
                        .bind(0, id)
                        .execute()
                )
                .flatMap(r -> Mono.from(r.map(MAPPING_FUNCTION)))
                .switchIfEmpty(Mono.empty())
        );
    }

    @Override
    public Mono<UUID> save(Customer data) {
        //var sql = "INSERT INTO customers(name, age, street, city, zip) VALUES (?, ?, ?, ?, ?) RETURNING id ";
        var sql = "INSERT INTO customers (name, age, street, city, zip) VALUES ($1, $2, $3, $4, $5)";
        return transactionOperations.withTransactionMono(status ->
                Mono.just(status.getConnection())
                        .flatMap(connection -> Mono
                                .from(connection.createStatement(sql)
                                        .bind(0, data.name())
                                        .bind(1, data.age())
                                        .bind(2, data.address().street())
                                        .bind(3, data.address().street())
                                        .bind(4, data.address().zip())
                                        .returnGeneratedValues("id")
                                        .execute()
                                )
                                .flatMap(r -> Mono.from(r.map((row, rowMetadata) -> row.get("id", UUID.class))))
                                .switchIfEmpty(Mono.empty())
                        )

        );
    }

Which makes the test pass even with some remaining Mono.from. I think we had some hack in v3 and it might have worked.

dstepanov commented 11 months ago

I think this problem disappears if r2dbc pool is added.

hantsy commented 11 months ago

I have used reactive transaction operations in other repository case. Of course it is still working well.

In this example, I only want to use R2dbcOperations, I have to declare there the origin codes is broken as upgrading the new version, in the initial version it is working well.

dstepanov commented 11 months ago

Not sure if this is something that should be fixed as it's the correct behaviour of R2DBC https://github.com/pgjdbc/r2dbc-postgresql/issues/428#issuecomment-883388654

hantsy commented 11 months ago

I don't think it is caused by Mono.from, in another example to demo the ConnectionFactory, I also used Mono.from, it worked, https://github.com/hantsy/micronaut-sandbox/blob/master/data-r2dbc/src/main/java/com/example/CustomerDaoWithConnectionFactory.java

dstepanov commented 11 months ago

I will reopen the issue and take another look. The main problem is when the TX block receives the cancel signal, it automatically sends it to the connection block which cancels the connection, so the onCancel action cannot be executed.