r2dbc / r2dbc-mssql

R2DBC Driver for Microsoft SQL Server using TDS (Tabular Data Stream) Protocol
Apache License 2.0
178 stars 32 forks source link

Parameterized Statements throws exception while using Kotlin Flow #256

Closed xnart closed 1 year ago

xnart commented 2 years ago

Bug Report

Versions

Current Behavior

While collecting from the following flow, it randomly closes connection and cancels the flow. It happens when i try fetch too many rows. In my case that is 34k rows. Also, rarely it collects all rows without error.

fun findAllByIdBetween(fromId: Long, toId: Long): Flow<LegacyCustomer>
Stack trace ```java org.springframework.dao.DataAccessResourceFailureException: executeMany; SQL [SELECT Customers.Id, Customers.MobilePhone, Customers.FirstName, Customers.LastName, Customers.DMSId, Customers.BirthDate, Customers.Email, Customers.Gender, Customers.OccupationId, Customers.SegmentId, Customers.LFCustomerId, Customers.CreateDate, Customers.UpdateDate FROM Customers WHERE Customers.Id BETWEEN @P0_Id AND @P1_Id]; null; nested exception is io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionException at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:226) ~[spring-r2dbc-5.3.19.jar:5.3.19] at org.springframework.r2dbc.core.DefaultDatabaseClient.lambda$inConnectionMany$8(DefaultDatabaseClient.java:147) ~[spring-r2dbc-5.3.19.jar:5.3.19] at reactor.core.publisher.Flux.lambda$onErrorMap$29(Flux.java:6946) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Flux.lambda$onErrorResume$30(Flux.java:6999) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredError(FluxUsingWhen.java:398) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxUsingWhen$RollbackInner.onComplete(FluxUsingWhen.java:475) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:145) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:145) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Operators.complete(Operators.java:137) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:198) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:62) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:157) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFilter$FilterConditionalSubscriber.onError(FluxFilter.java:291) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:259) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Operators.error(Operators.java:198) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.MonoError.subscribe(MonoError.java:53) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onError(FluxUsingWhen.java:364) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:842) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1680) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17] at org.springframework.cloud.sleuth.instrument.reactor.SleuthContextOperator.request(ReactorSleuth.java:658) ~[spring-cloud-sleuth-instrumentation-3.1.1.jar:3.1.1] at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138) ~[reactor-core-3.4.17.jar:3.4.17] at kotlinx.coroutines.reactive.ReactiveSubscriber.makeRequest(ReactiveFlow.kt:153) ~[kotlinx-coroutines-reactive-1.5.2.jar:na] at kotlinx.coroutines.reactive.PublisherAsFlow.collectImpl(ReactiveFlow.kt:103) ~[kotlinx-coroutines-reactive-1.5.2.jar:na] at kotlinx.coroutines.reactive.PublisherAsFlow.access$collectImpl(ReactiveFlow.kt:48) ~[kotlinx-coroutines-reactive-1.5.2.jar:na] at kotlinx.coroutines.reactive.PublisherAsFlow$collectImpl$1.invokeSuspend(ReactiveFlow.kt) ~[kotlinx-coroutines-reactive-1.5.2.jar:na] at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) ~[kotlin-stdlib-1.6.21.jar:1.6.21-release-334(1.6.21)] at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) ~[kotlinx-coroutines-core-jvm-1.5.2.jar:na] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na] at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na] Caused by: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionException: null at io.r2dbc.mssql.client.ReactorNettyClient.lambda$handleConnectionError$17(ReactorNettyClient.java:619) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): *__checkpoint ⇢ SQL "SELECT Customers.Id, Customers.MobilePhone, Customers.FirstName, Customers.LastName, Customers.DMSId, Customers.BirthDate, Customers.Email, Customers.Gender, Customers.OccupationId, Customers.SegmentId, Customers.LFCustomerId, Customers.CreateDate, Customers.UpdateDate FROM Customers WHERE Customers.Id BETWEEN @P0_Id AND @P1_Id" [DatabaseClient] Original Stack Trace: at io.r2dbc.mssql.client.ReactorNettyClient.lambda$handleConnectionError$17(ReactorNettyClient.java:619) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:629) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at io.r2dbc.mssql.client.ReactorNettyClient.handleConnectionError(ReactorNettyClient.java:619) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at io.r2dbc.mssql.client.ReactorNettyClient.resumeError(ReactorNettyClient.java:317) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:321) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.request(FluxFlattenIterable.java:274) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:503) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.EmitterProcessor$EmitterInner.drainParent(EmitterProcessor.java:640) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:585) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17] at org.springframework.cloud.sleuth.instrument.reactor.SleuthContextOperator.request(ReactorSleuth.java:658) ~[spring-cloud-sleuth-instrumentation-3.1.1.jar:3.1.1] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.request(MonoFlatMapMany.java:112) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxHandle$HandleSubscriber.request(FluxHandle.java:269) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:503) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.EmitterProcessor$EmitterInner.drainParent(EmitterProcessor.java:640) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:585) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.request(FluxHandle.java:475) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17] at io.r2dbc.mssql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.request(FluxDiscardOnCancel.java:110) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:682) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:746) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxWindowPredicate$WindowFlux.request(FluxWindowPredicate.java:833) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:257) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapInner.request(FluxFlatMap.java:1008) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:729) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1680) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17] at org.springframework.cloud.sleuth.instrument.reactor.SleuthContextOperator.request(ReactorSleuth.java:658) ~[spring-cloud-sleuth-instrumentation-3.1.1.jar:3.1.1] at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138) ~[reactor-core-3.4.17.jar:3.4.17] at kotlinx.coroutines.reactive.ReactiveSubscriber.makeRequest(ReactiveFlow.kt:153) ~[kotlinx-coroutines-reactive-1.5.2.jar:na] at kotlinx.coroutines.reactive.PublisherAsFlow.collectImpl(ReactiveFlow.kt:103) ~[kotlinx-coroutines-reactive-1.5.2.jar:na] at kotlinx.coroutines.reactive.PublisherAsFlow.access$collectImpl(ReactiveFlow.kt:48) ~[kotlinx-coroutines-reactive-1.5.2.jar:na] at kotlinx.coroutines.reactive.PublisherAsFlow$collectImpl$1.invokeSuspend(ReactiveFlow.kt) ~[kotlinx-coroutines-reactive-1.5.2.jar:na] at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) ~[kotlin-stdlib-1.6.21.jar:1.6.21-release-334(1.6.21)] at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) ~[kotlinx-coroutines-core-jvm-1.5.2.jar:na] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na] at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na] Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Invalid header type: 0x0 at io.r2dbc.mssql.client.StreamDecoder$ListSink.error(StreamDecoder.java:350) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at io.r2dbc.mssql.client.StreamDecoder.withState(StreamDecoder.java:135) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:88) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:64) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at io.r2dbc.mssql.client.ReactorNettyClient.lambda$new$6(ReactorNettyClient.java:255) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:351) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.request(FluxFlattenIterable.java:274) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:503) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.EmitterProcessor$EmitterInner.drainParent(EmitterProcessor.java:640) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:585) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17] at org.springframework.cloud.sleuth.instrument.reactor.SleuthContextOperator.request(ReactorSleuth.java:658) ~[spring-cloud-sleuth-instrumentation-3.1.1.jar:3.1.1] at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.request(MonoFlatMapMany.java:112) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxHandle$HandleSubscriber.request(FluxHandle.java:269) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:503) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.EmitterProcessor$EmitterInner.drainParent(EmitterProcessor.java:640) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:585) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.request(FluxHandle.java:475) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.4.17.jar:3.4.17] at io.r2dbc.mssql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.request(FluxDiscardOnCancel.java:110) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:682) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:746) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxWindowPredicate$WindowFlux.request(FluxWindowPredicate.java:833) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:257) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapInner.request(FluxFlatMap.java:1008) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:729) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1680) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347) ~[reactor-core-3.4.17.jar:3.4.17] at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.4.17.jar:3.4.17] at org.springframework.cloud.sleuth.instrument.reactor.SleuthContextOperator.request(ReactorSleuth.java:658) ~[spring-cloud-sleuth-instrumentation-3.1.1.jar:3.1.1] at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138) ~[reactor-core-3.4.17.jar:3.4.17] at kotlinx.coroutines.reactive.ReactiveSubscriber.makeRequest(ReactiveFlow.kt:153) ~[kotlinx-coroutines-reactive-1.5.2.jar:na] at kotlinx.coroutines.reactive.PublisherAsFlow.collectImpl(ReactiveFlow.kt:103) ~[kotlinx-coroutines-reactive-1.5.2.jar:na] at kotlinx.coroutines.reactive.PublisherAsFlow.access$collectImpl(ReactiveFlow.kt:48) ~[kotlinx-coroutines-reactive-1.5.2.jar:na] at kotlinx.coroutines.reactive.PublisherAsFlow$collectImpl$1.invokeSuspend(ReactiveFlow.kt) ~[kotlinx-coroutines-reactive-1.5.2.jar:na] at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) ~[kotlin-stdlib-1.6.21.jar:1.6.21-release-334(1.6.21)] at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) ~[kotlinx-coroutines-core-jvm-1.5.2.jar:na] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na] at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na] Caused by: java.lang.IllegalArgumentException: Invalid header type: 0x0 at io.r2dbc.mssql.message.header.Type.valueOf(Type.java:68) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at io.r2dbc.mssql.message.header.Header.decode(Header.java:215) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at io.r2dbc.mssql.client.StreamDecoder$DecoderState.readChunk(StreamDecoder.java:289) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at io.r2dbc.mssql.client.StreamDecoder.withState(StreamDecoder.java:112) ~[r2dbc-mssql-0.8.8.RELEASE.jar:0.8.8.RELEASE] ... 53 common frames omitted ```

Table schema

Input Code ```kotlin @Table("Customers") data class LegacyCustomer( @Id @Column("Id") val id: Long? = null, @Column("MobilePhone") val mobilePhone: String?, @Column("FirstName") var firstName: String? = null, @Column("LastName") var lastName: String? = null, @Column("DMSId") var dmsId: String? = null, @Column("BirthDate") var birthDate: LocalDate? = null, @Column("Email") var email: String? = null, @Column("Gender") var gender: Int? = null, @Column("OccupationId") var occupationId: Long? = null, @Column("SegmentId") var segmentId: Long? = null, @Column("LFCustomerId") var customerId: String? = null, @CreatedDate @Column("CreateDate") var createDate: Instant? = null, @LastModifiedDate @Column("UpdateDate") var updateDate: Instant? = null ) ```

Steps to reproduce

Input Code ```kotlin interface LegacyCustomerRepository : CoroutineCrudRepository { fun findAllByIdBetween(fromId: Long, toId: Long): Flow } @Service class LegacyCustomerService(private val legacyCustomerRepository: LegacyCustomerRepository){ suspend fun createCustomerMigration(customerMigrationRequest: CustomerMigrationRequest) { with(customerMigrationRequest) { logger.info("Legacy customer migration started from id: $startFromId to id: $endToId.") legacyCustomerRepository.findAllByIdBetween(startFromId, endToId) .onCompletion { if (it == null) logger.info("Legacy customer migration finished. from id: $startFromId to id: $endToId.", it) } .catch { logger.error("Legacy customer migration finished with error. from id: $startFromId to id: $endToId.", it) } .onEach { try { logger.info("Migrating customer with id: ${it.id}") // ... another suspend method call } catch (exception: Throwable) { logger.error("Migrating failed for customer with id: ${it.id}", exception) } }.launchIn(CoroutineScope(Dispatchers.IO)) } } } ```

Expected behavior/code

Flow should be collected without errors as that is while using Reactor Flux. I do not face with the error if i use Flux and subscribe it:

fun findAllByIdBetween(fromId: Long, toId: Long): Flow<LegacyCustomer>

legacyCustomerRepo.findAllByIdBetween(startFromId, endToId)
    .doAfterTerminate {
        logger.info("Legacy customer migration finished. from id: $startFromId to id: $endToId.")
    }
    .doOnError {
        logger.error("Legacy customer migration finished with error. from id: $startFromId to id: $endToId.", it)
    }
    .subscribe {
        try {
            logger.info("Migrating customer with id: ${it.id}")
            runBlocking {
                // ... another suspend method call
            }
        } catch (exception: Throwable) {
            logger.error("Migrating failed for customer with id: ${it.id}", exception)
        }
    }

Possible Solution

When i used cursored exchange with fetch size as 128 instead of direct exchange, it worked as expected. But had faced with following prepared statement cache error when i called second time: Could not find prepared statement with handle 1073741825

Additional context

mp911de commented 2 years ago

This looks like a protocol offset error. There are a lot of moving parts involved. Can you provide a minimal reproducer using R2DBC MSSQL code only (without the use of Spring)? Otherwise, it will be next to impossible to diagnose the issue.

mp911de commented 1 year ago

Closing due to lack of requested feedback. If you would like us to look at this issue, please provide the requested information and we will re-open the issue.

SimenRokaas commented 1 year ago

Hi @mp911de, this error (Could not find prepared statement with handle 1073741825) now pops up again in version 1.0.1 when using prepared statements (bind method). First sql after startup runs fine, but subsequent sqls fail.

The error is not present in version 1.0.0, and the only code change I can see which has anything to do with prepared statements seems to be the removal of the no-param constructor in ConnectionOptions which was part of the commit which resolved issue 267.

We use bind a lot, so if it is possible to resolve this in the 1.x branch it would be much appreciated as we need to use Spring Boot 3 which is wired to the 1.x branch.