spring-projects / spring-data-r2dbc

Provide support to increase developer productivity in Java when using Reactive Relational Database Connectivity. Uses familiar Spring concepts such as a DatabaseClient for core API usage and lightweight repository style data access.
Apache License 2.0
708 stars 132 forks source link

`findAllByxxx` incorrectly returns a single object instead of an Iterable. #856

Open EmptyDreams opened 5 months ago

EmptyDreams commented 5 months ago

I declare interfaces like the following:

interface PostsRepository : CoroutineCrudRepository<MysqlPostsMeta, Int> {

    suspend fun findAllByPidIn(pidArray: List<Int>): Iterable<MysqlPostsMeta>

}

When I call findAllByPidIn(list), if only one piece of data ends up being matched, then R2DBC returns the MysqlPostsMeta object directly instead of the Iterable object.

When a function should return more than one result, an error is reported directly:

java.lang.IndexOutOfBoundsException: Source emitted more than one item
    at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:134) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onNext(FluxConcatMapNoPrefetch.java:197) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:348) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:547) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:988) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:194) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxHandleFuseable$HandleFuseableConditionalSubscriber.onNext(FluxHandleFuseable.java:505) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.6.3.jar:3.6.3]
    at io.asyncer.r2dbc.mysql.internal.util.DiscardOnCancelSubscriber.onNext(DiscardOnCancelSubscriber.java:66) ~[r2dbc-mysql-1.1.2.jar:1.1.2]
    at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:670) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:748) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxWindowPredicate$WindowFlux.onNext(FluxWindowPredicate.java:790) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.onNext(FluxWindowPredicate.java:268) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:251) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.6.3.jar:3.6.3]
    at io.asyncer.r2dbc.mysql.internal.util.DiscardOnCancelSubscriber.onNext(DiscardOnCancelSubscriber.java:66) ~[r2dbc-mysql-1.1.2.jar:1.1.2]
    at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onNext(FluxHandle.java:343) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.SinkManyEmitterProcessor.drain(SinkManyEmitterProcessor.java:476) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.SinkManyEmitterProcessor.tryEmitNext(SinkManyEmitterProcessor.java:273) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27) ~[reactor-core-3.6.3.jar:3.6.3]
    at io.asyncer.r2dbc.mysql.client.ReactorNettyClient$ResponseSink.next(ReactorNettyClient.java:389) ~[r2dbc-mysql-1.1.2.jar:1.1.2]
    at io.asyncer.r2dbc.mysql.client.ReactorNettyClient.lambda$new$0(ReactorNettyClient.java:119) ~[r2dbc-mysql-1.1.2.jar:1.1.2]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294) ~[reactor-netty-core-1.1.16.jar:1.1.16]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403) ~[reactor-netty-core-1.1.16.jar:1.1.16]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426) ~[reactor-netty-core-1.1.16.jar:1.1.16]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.16.jar:1.1.16]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:454) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1338) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1387) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
    at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

I tried replacing the return value with a Stream, Streamable, etc., but it still does.

I just upgraded to 3.2.4 and the problem persists.

EmptyDreams commented 5 months ago

I tried again and I found that the code works fine if I return Flow<T>.

I see that there are some clarifications in the docs (https://docs.spring.io/spring-data/r2dbc/docs/3.1.1/reference/html/#repositories.query-async), but for kotlin, I've already modified the function with suspend, so shouldn't I be allowed to return a synchronised flow object like Iterable?