micronaut-projects / micronaut-data

Ahead of Time Data Repositories
Apache License 2.0
460 stars 194 forks source link

R2dbc transactions not working properly when using coroutines #1889

Open filipenfst opened 1 year ago

filipenfst commented 1 year ago

Steps to Reproduce

When using the @Transactional on a method that returns a kotlin flow i am getting a NoTransactionException

@Transactional(Transactional.TxType.MANDATORY)
@R2dbcRepository(dialect = Dialect.POSTGRES)
interface RecordTransactionalCoroutineRepository : CoroutineCrudRepository<Record, UUID>
@Transactional
open fun saveAllUsingCoroutines(records: Iterable<Record>): Flow<Record> = coroutineRepository.saveAll(records)

The same error also happens when using ReactiveStreamsCrudRepository or when using declarative transactions

Expected Behaviour

The transaction should be propagated on the context and it should be committed only after the last item on the flow is collected

Actual Behaviour

Fails with the given stack

14:37:54.217 [reactor-tcp-epoll-2] WARN  i.m.d.r.o.DefaultR2dbcRepositoryOperations - Rolling back transaction: RecordTransactionalService.saveAllUsingCoroutines on error: Expected an existing transaction, but none was found in the Reactive context. for dataSource default
io.micronaut.transaction.exceptions.NoTransactionException: Expected an existing transaction, but none was found in the Reactive context.
    at io.micronaut.data.r2dbc.operations.DefaultR2dbcRepositoryOperations.lambda$withTransaction$15(DefaultR2dbcRepositoryOperations.java:410)
    at reactor.core.publisher.FluxDeferContextual.subscribe(FluxDeferContextual.java:49)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8660)
    at kotlinx.coroutines.reactive.PublisherAsFlow.collectImpl(ReactiveFlow.kt:94)
    at kotlinx.coroutines.reactive.PublisherAsFlow.collect(ReactiveFlow.kt:79)
    at kotlinx.coroutines.reactive.FlowSubscription.consumeFlow(ReactiveFlow.kt:275)
    at kotlinx.coroutines.reactive.FlowSubscription.flowProcessing(ReactiveFlow.kt:209)
    at kotlinx.coroutines.reactive.FlowSubscription.access$flowProcessing(ReactiveFlow.kt:187)
    at kotlinx.coroutines.reactive.FlowSubscription$createInitialContinuation$1$1.invoke(ReactiveFlow.kt:204)
    at kotlinx.coroutines.reactive.FlowSubscription$createInitialContinuation$1$1.invoke(ReactiveFlow.kt:204)
    at kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt$createCoroutineUnintercepted$$inlined$createCoroutineFromSuspendFunction$IntrinsicsKt__IntrinsicsJvmKt$2.invokeSuspend(IntrinsicsJvm.kt:205)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.internal.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:367)
    at kotlinx.coroutines.internal.DispatchedContinuationKt.resumeCancellableWith$default(DispatchedContinuation.kt:278)
    at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:18)
    at kotlinx.coroutines.reactive.FlowSubscription$createInitialContinuation$$inlined$Continuation$1.resumeWith(Continuation.kt:162)
    at kotlinx.coroutines.reactive.FlowSubscription.request(ReactiveFlow.kt:267)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
    at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319)
    at reactor.core.publisher.Operators$DeferredSubscription.set(Operators.java:1717)
    at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onSubscribe(FluxUsingWhen.java:409)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
    at kotlinx.coroutines.reactive.FlowAsPublisher.subscribe(ReactiveFlow.kt:182)
    at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:67)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8660)
    at reactor.core.publisher.FluxUsingWhen$ResourceSubscriber.onNext(FluxUsingWhen.java:195)
    at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2034)
    at reactor.core.publisher.MonoHasElements$HasElementsSubscriber.onComplete(MonoHasElements.java:93)
    at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
    at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:104)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:222)
    at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:104)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
    at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:460)
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:805)
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:753)
    at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:247)
    at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:213)
    at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:204)
    at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:671)
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:937)
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:813)
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:719)
    at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:128)
    at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:292)
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:401)
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:411)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:113)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)

Environment Information

Example Application

Here you can find some tests that reproduces the error

dstepanov commented 1 year ago

I have fixed a few cases, but we cannot propagate the context when the Kotlin Flow is used: https://github.com/micronaut-projects/micronaut-data/pull/1921. Maybe it would be possible in Micronaut 4. I'm planning a new propagation API. Thanks for reporting and the example!