micronaut-projects / micronaut-data

Ahead of Time Data Repositories
Apache License 2.0
462 stars 195 forks source link

Micronaut data fails when trying to convert results to Flow #2849

Open zsiegel opened 5 months ago

zsiegel commented 5 months ago

Expected Behavior

When using a return type of Flow and when the given query only returns one result I expect the conversion to work to return a Flow that emits a single item.

Actual Behaviour

When the query returns a single item. Micronaut fails to convert that entity to a Flow.

Steps To Reproduce

  1. Create a CoroutineCrudRepository<MyEntity, Long> repository and entity MyEntity.
  2. Run the .findAll() query after inserting a single row.
  3. The query will fail with an error message below.
io.micronaut.core.convert.exceptions.ConversionErrorException: Cannot convert type [class com.example.MyEntity] to target type: interface kotlinx.coroutines.flow.Flow. Considering defining a TypeConverter bean to handle this case.
    at app//io.micronaut.core.convert.ConversionService.lambda$newConversionError$2(ConversionService.java:182)
    at java.base@17.0.1/java.util.Optional.orElseGet(Optional.java:364)
    at app//io.micronaut.core.convert.ConversionService.newConversionError(ConversionService.java:182)
    at app//io.micronaut.core.convert.ConversionService.lambda$convertRequired$0(ConversionService.java:177)
    at java.base@17.0.1/java.util.Optional.orElseThrow(Optional.java:403)
    at app//io.micronaut.core.convert.ConversionService.convertRequired(ConversionService.java:177)
    at app//io.micronaut.core.convert.ConversionService.convertRequired(ConversionService.java:159)
    at app//io.micronaut.data.runtime.intercept.AbstractQueryInterceptor.convertOne(AbstractQueryInterceptor.java:213)
    at app//io.micronaut.data.runtime.intercept.AbstractQueryInterceptor.convertOne(AbstractQueryInterceptor.java:202)
    at app//io.micronaut.data.runtime.intercept.async.AbstractConvertCompletionStageInterceptor.lambda$intercept$1(AbstractConvertCompletionStageInterceptor.java:61)
    at java.base@17.0.1/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
    at java.base@17.0.1/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base@17.0.1/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
    at app//reactor.core.publisher.MonoToCompletableFuture.onNext(MonoToCompletableFuture.java:64)
    at app//reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    at app//reactor.core.publisher.MonoUsingWhen$MonoUsingWhenSubscriber.deferredComplete(MonoUsingWhen.java:269)
    at app//reactor.core.publisher.FluxUsingWhen$CommitInner.onComplete(FluxUsingWhen.java:528)
    at app//reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:250)
    at app//reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete(MonoFlatMap.java:324)
    at app//reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:250)
    at app//reactor.core.publisher.MonoFlatMap$FlatMapInner.onComplete(MonoFlatMap.java:324)
    at app//reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:155)
    at app//reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
    at app//reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
    at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367)
    at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2241)
    at app//reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
    at app//reactor.core.publisher.Mono.subscribe(Mono.java:4512)
    at app//reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82)
    at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231)
    at app//reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
    at app//reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231)
    at app//reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210)
    at app//reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210)
    at app//reactor.pool.SimpleDequePool.maybeRecycleAndDrain(SimpleDequePool.java:537)
    at app//reactor.pool.SimpleDequePool$QueuePoolRecyclerInner.onComplete(SimpleDequePool.java:767)
    at app//reactor.core.publisher.Operators.complete(Operators.java:137)
    at app//reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
    at app//reactor.core.publisher.Mono.subscribe(Mono.java:4512)
    at app//reactor.pool.SimpleDequePool$QueuePoolRecyclerMono.subscribe(SimpleDequePool.java:879)
    at app//reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
    at app//reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241)
    at app//reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
    at app//reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    at app//reactor.core.publisher.Operators.complete(Operators.java:137)
    at app//reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
    at app//reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
    at app//reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:264)
    at app//reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
    at app//reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
    at app//reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241)
    at app//reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
    at app//reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
    at app//reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:239)
    at app//reactor.core.publisher.MonoSupplier$MonoSupplierSubscription.request(MonoSupplier.java:148)
    at app//reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:260)
    at app//reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onSubscribe(MonoIgnoreElements.java:72)
    at app//reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:164)
    at app//reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:48)
    at app//reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
    at app//reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:264)
    at app//reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
    at app//reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
    at app//reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
    at app//reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
    at app//reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
    at app//reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
    at app//reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
    at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367)
    at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2241)
    at app//reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
    at app//reactor.core.publisher.Mono.subscribe(Mono.java:4512)
    at app//reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82)
    at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231)
    at app//reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
    at app//reactor.core.publisher.Operators.complete(Operators.java:137)
    at app//reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
    at app//reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
    at app//reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
    at app//reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
    at app//reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
    at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367)
    at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2241)
    at app//reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
    at app//reactor.core.publisher.Mono.subscribe(Mono.java:4512)
    at app//reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82)
    at app//reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231)
    at app//reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
    at app//reactor.core.publisher.Operators.complete(Operators.java:137)
    at app//reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
    at app//reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
    at app//reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onComplete(FluxUsingWhen.java:385)
    at app//reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
    at app//reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:850)
    at app//reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:612)
    at app//reactor.core.publisher.FluxFlatMap$FlatMapMain.innerComplete(FluxFlatMap.java:898)
    at app//reactor.core.publisher.FluxFlatMap$FlatMapInner.onComplete(FluxFlatMap.java:1001)
    at app//reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:223)
    at app//reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260)
    at app//reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:239)
    at app//reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onComplete(FluxFilterFuseable.java:391)
    at app//reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
    at app//reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
    at app//reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
    at app//io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:104)
    at app//reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)
    at app//reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:223)
    at app//reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:465)
    at app//reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:871)
    at app//reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:819)
    at app//reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:249)
    at app//reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:215)
    at app//reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:206)
    at app//io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:668)
    at app//io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:934)
    at app//io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:810)
    at app//io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:716)
    at app//reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
    at app//reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
    at app//reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
    at app//reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
    at app//reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
    at app//reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
    at app//reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426)
    at app//reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114)
    at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at app//io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at app//io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
    at app//io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
    at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at app//io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at app//io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    at app//io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at app//io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at app//io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at app//io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
    at app//io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
    at app//io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
    at app//io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    at app//io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at app//io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at app//io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base@17.0.1/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalArgumentException: Cannot convert type [class com.example.MyEntity] to target type: interface kotlinx.coroutines.flow.Flow. Considering defining a TypeConverter bean to handle this case.
    ... 147 more

Environment Information

MacOS 14.4

openjdk 17.0.1 2021-10-19 LTS OpenJDK Runtime Environment Zulu17.30+15-CA (build 17.0.1+12-LTS) OpenJDK 64-Bit Server VM Zulu17.30+15-CA (build 17.0.1+12-LTS, mixed mode, sharing)

Example Application

No response

Version

4.3.7

zsiegel commented 5 months ago

Note that I am currently working around this right now by using a TypeConverter. But I have a number of entities that may return a single item from time to time and this is a bit cumbersome.

I was looking around to see if I could submit a PR myself but I am a little lost figuring out where the root of the issue might be.

@Factory
class TypeConverters {
    @Singleton
    fun myEntityToFlow(): TypeConverter<MyEntity, Flow<MyEntity>> {
        return TypeConverter<MyEntity, Flow<MyEntity>> { `object`, targetType, context ->
            Optional.of(flowOf(`object`))
        }
    }
}
zsiegel commented 5 months ago

I am also seeing some interesting and unexpected behavior when the return is empty. I would expect to get back an empty Flow but instead I get a null object even though the return type is Flow.

dstepanov commented 5 months ago

Do you have micronaut-kotlin dependency included?

zsiegel commented 5 months ago

I have the following dependencies related to kotlin specified.

implementation("io.micronaut.kotlin:micronaut-kotlin-extension-functions")
implementation("io.micronaut.kotlin:micronaut-kotlin-runtime")
dstepanov commented 5 months ago

Try also to add: org.jetbrains.kotlinx:kotlinx-coroutines-reactive or maybe org.jetbrains.kotlinx:kotlinx-coroutines-reactor

zsiegel commented 5 months ago

Yes I have the first one. I tried adding org.jetbrains.kotlinx:kotlinx-coroutines-reactor and have the same issue.

zsiegel commented 5 months ago

Testing this out a bit more I think I was able to determine that this only happens with interface methods that use the @Query annotation. The findAll() and other built-in methods seem to return reactor types, but the custom @Query annotation does not?

I have solved this problem with the following factory that handles the conversion for any single object and any list of objects.

@Factory
class TypeConverters {
    @Singleton
    fun listToFlow(): TypeConverter<List<Any>, Flow<Any>> {
        return TypeConverter<List<Any>, Flow<Any>> { `object`, targetType, context ->
            Optional.of(flowOf(`object`))
        }
    }

    @Singleton
    fun anyToFlow(): TypeConverter<Any, Flow<Any>> {
        return TypeConverter<Any, Flow<Any>> { `object`, targetType, context ->
            Optional.of(flowOf(`object`))
        }
    }
}

I am not sure if this makes sense to contribute or if I have properly diagnosed things. I am not sure what the different is between the built-in default methods and using @Query

zsiegel commented 4 months ago

My implementation above was in correct. I am now trying.

@Factory
class TypeConverters {
    @Singleton
    fun listToFlow(): TypeConverter<List<Any>, Flow<Any>> {
        return TypeConverter<List<Any>, Flow<Any>> { `object`, targetType, context ->
            Optional.of(`object`.asFlow())
        }
    }

    @Singleton
    fun anyToFlow(): TypeConverter<Any, Flow<Any>> {
        return TypeConverter<Any, Flow<Any>> { `object`, targetType, context ->
            Optional.of(flowOf(`object`))
        }
    }
}

In addition I am not able to get more than 1 result from the flow. This might be user error but I am struggling a bit to see what might be going wrong here.