oracle / oracle-r2dbc

R2DBC Driver for Oracle Database
https://oracle.com
Other
201 stars 42 forks source link

"ORA-01002: fetch out of sequence" error #150

Closed akaretnikov90 closed 1 month ago

akaretnikov90 commented 4 months ago

I have @Scheduler process, which select rows from Oracle database with option FOR UPDATE SKIP LOCKED every 30 seconds When table is empty i periodically catch exception ORA-01002 fetch out of sequence

Repository:

public interface PfrStatusRepository extends R2dbcRepository<PfrStatus, Long> {
    @Query("""
            SELECT ID,
                   OBJECT_ID,
                   RUN_ID,
                   APP_ID,
                   CLIENT_ID,
                   PFR_REQUEST_ID,
                   ERROR_MSG,
                   MAIN_STATUS,
                   EXTERNAL_STATUS,
                   EXTERNAL_STATUS_NAME,
                   IDBANK_DOC_ID,
                   t.FILE_XML.getclobval(),
                   FILE_PDF,
                   CREATE_DT,
                   UPDATE_DT,
                   OPERATION_TYPE,
                   PRODUCT_NAME FROM pfr_status t
            WHERE CASE WHEN main_status = 1 THEN main_status END = 1 FOR UPDATE SKIP LOCKED
            """)
    Flux<PfrStatus> getInProgressPfrRecords();
    }

Dao method:

public Flux<PfrStatus> getInProgressPfrRecords() {
        return pfrStatusRepository.getInProgressPfrRecords()
                .switchIfEmpty(Flux.defer(() -> {
                    log.info("В таблице PFR_STATUS отсутствуют необработанные заявления ПФР");
                    return Flux.empty();
                }))
                .onErrorResume(
                        thw -> Mono.error(new DbException(
                                "Ошибка при получении необработанных запросов ПФР из таблицы PFR_STATUS: " +
                                        thw)));
    }

Stacktrace:

17.07.2024 17:11:37.233 ERROR [ForkJoinPool.commonPool-worker-5] SchedulerService.java:171   - GET_OBJECT_STATUS error->
ru.camunda.connector.pfradapter.exceptions.DbException: Ошибка при получении необработанных запросов ПФР из таблицы PFR_STATUS: org.springframework.r2dbc.UncategorizedR2dbcException: executeMany; SQL [SELECT ID,
       OBJECT_ID,
       RUN_ID,
       APP_ID,
       CLIENT_ID,
       PFR_REQUEST_ID,
       ERROR_MSG,
       MAIN_STATUS,
       EXTERNAL_STATUS,
       EXTERNAL_STATUS_NAME,
       IDBANK_DOC_ID,
       t.FILE_XML.getclobval(),
       FILE_PDF,
       CREATE_DT,
       UPDATE_DT,
       OPERATION_TYPE,
       PRODUCT_NAME FROM pfr_status t
WHERE CASE WHEN main_status = 1 THEN main_status END = 1 FOR UPDATE SKIP LOCKED
];  ORA-01002: fetch out of sequence

    at ru.camunda.connector.pfradapter.data.dao.PfrStatusDao.lambda$getInProgressPfrRecords$1(PfrStatusDao.java:31) ~[classes/:?]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2236) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredError(FluxUsingWhen.java:403) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxUsingWhen$RollbackInner.onComplete(FluxUsingWhen.java:480) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Operators.complete(Operators.java:137) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4568) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onError(FluxUsingWhen.java:368) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:256) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.maybeOnError(FluxConcatMapNoPrefetch.java:327) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onError(FluxConcatMapNoPrefetch.java:221) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Operators.error(Operators.java:198) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoError.subscribe(MonoError.java:53) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4568) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredError(FluxUsingWhen.java:403) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxUsingWhen$RollbackInner.onComplete(FluxUsingWhen.java:480) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.pool.SimpleDequePool.maybeRecycleAndDrain(SimpleDequePool.java:540) ~[reactor-pool-1.0.5.jar:1.0.5]
    at reactor.pool.SimpleDequePool$QueuePoolRecyclerInner.onComplete(SimpleDequePool.java:770) ~[reactor-pool-1.0.5.jar:1.0.5]
    at reactor.core.publisher.Operators.complete(Operators.java:137) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4568) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.pool.SimpleDequePool$QueuePoolRecyclerMono.subscribe(SimpleDequePool.java:882) ~[reactor-pool-1.0.5.jar:1.0.5]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoSupplier$MonoSupplierSubscription.request(MonoSupplier.java:148) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onSubscribe(MonoIgnoreThen.java:135) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:171) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:48) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4568) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:223) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1866) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:337) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onNext(MonoCacheTime.java:354) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onSubscribe(MonoCacheTime.java:293) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:143) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4552) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4568) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:157) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxFilter$FilterConditionalSubscriber.onError(FluxFilter.java:291) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:265) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Operators.error(Operators.java:198) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoError.subscribe(MonoError.java:53) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4568) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onError(FluxUsingWhen.java:368) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:846) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:612) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:592) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerError(FluxFlatMap.java:867) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxFlatMap$FlatMapInner.onError(FluxFlatMap.java:994) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Operators.error(Operators.java:198) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxError.subscribe(FluxError.java:43) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Flux.subscribe(Flux.java:8840) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:251) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:348) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:74) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4568) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onError(FluxConcatArray.java:186) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:205) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93) ~[reactor-core-3.6.6.jar:3.6.6]
    at oracle.r2dbc.impl.AsyncLock$UsingConnectionSubscriber.onError(AsyncLock.java:517) ~[oracle-r2dbc-1.2.0.jar:1.2.0]
    at reactor.core.publisher.StrictSubscriber.onError(StrictSubscriber.java:106) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Operators.error(Operators.java:198) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.MonoError.subscribe(MonoError.java:53) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4568) ~[reactor-core-3.6.6.jar:3.6.6]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.6.6.jar:3.6.6]
    at org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber.onError(FlowAdapters.java:216) ~[reactive-streams-1.0.4.jar:?]
    at oracle.jdbc.internal.CompletionStageUtil$BatchItemPublisher.subscribeToFailedBatch(CompletionStageUtil.java:636) ~[ojdbc11-21.11.0.0.jar:21.11.0.0.0]
    at oracle.jdbc.internal.CompletionStageUtil$BatchItemPublisher.lambda$subscribe$0(CompletionStageUtil.java:591) ~[ojdbc11-21.11.0.0.jar:21.11.0.0.0]
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) ~[?:?]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) ~[?:?]
    at oracle.jdbc.driver.OraclePreparedStatement.lambda$executeAsyncOracle$4(OraclePreparedStatement.java:4171) ~[ojdbc11-21.11.0.0.jar:21.11.0.0.0]
    at oracle.jdbc.driver.OraclePreparedStatement.lambda$executeInternalAsync$0(OraclePreparedStatement.java:3814) ~[ojdbc11-21.11.0.0.jar:21.11.0.0.0]
    at oracle.jdbc.driver.OracleStatement.lambda$doExecuteWithTimeoutAsync$2(OracleStatement.java:1556) ~[ojdbc11-21.11.0.0.jar:21.11.0.0.0]
    at oracle.jdbc.driver.OracleStatement.lambda$executeSQLSelectAsync$3(OracleStatement.java:1727) ~[ojdbc11-21.11.0.0.jar:21.11.0.0.0]
    at oracle.jdbc.driver.OracleStatement.lambda$executeMaybeDescribeAsync$0(OracleStatement.java:1211) ~[ojdbc11-21.11.0.0.jar:21.11.0.0.0]
    at oracle.jdbc.driver.T4CPreparedStatement.lambda$executeForRowsAsync$5(T4CPreparedStatement.java:1311) ~[ojdbc11-21.11.0.0.jar:21.11.0.0.0]
    at oracle.jdbc.driver.T4CPreparedStatement.lambda$doOall8Async$0(T4CPreparedStatement.java:236) ~[ojdbc11-21.11.0.0.jar:21.11.0.0.0]
    at oracle.jdbc.driver.T4C8Oall.lambda$doOALLAsync$0(T4C8Oall.java:583) ~[ojdbc11-21.11.0.0.jar:21.11.0.0.0]
    at oracle.jdbc.driver.T4CTTIfun.lambda$receiveRPCAsync$1(T4CTTIfun.java:474) ~[ojdbc11-21.11.0.0.jar:21.11.0.0.0]
    at oracle.jdbc.driver.RestrictedLock.lambda$runUnrestricted$1(RestrictedLock.java:444) ~[ojdbc11-21.11.0.0.jar:21.11.0.0.0]
    at oracle.jdbc.driver.RestrictedLock.callUnrestricted(RestrictedLock.java:475) ~[ojdbc11-21.11.0.0.jar:21.11.0.0.0]
    at oracle.jdbc.driver.RestrictedLock.runUnrestricted(RestrictedLock.java:443) ~[ojdbc11-21.11.0.0.jar:21.11.0.0.0]
    at oracle.jdbc.driver.PhysicalConnection.lambda$initializeAsyncExecutor$4(PhysicalConnection.java:1284) ~[ojdbc11-21.11.0.0.jar:21.11.0.0.0]
    at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.compute(ForkJoinTask.java:1726) [?:?]
    at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.compute(ForkJoinTask.java:1717) [?:?]
    at java.base/java.util.concurrent.ForkJoinTask$InterruptibleTask.exec(ForkJoinTask.java:1641) [?:?]
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:507) [?:?]
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1491) [?:?]
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:2073) [?:?]
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:2035) [?:?]
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:187) [?:?]

What could be the problem?

Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production Version 19.23.0.0.0

Michael-A-McMahon commented 4 months ago

With a FOR UPDATE clause, we can get ORA-01002 if we do this:

  1. Begin a transaction
  2. Execute the query with FOR UPDATE
  3. Commit or rollback the transaction
  4. Fetch more rows from the query.

Here's JDBC code to illustrate:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;

public class ForUpdate {

  public static void main(String[] args) throws Exception {
    try (
      Connection connection =
        DriverManager.getConnection("jdbc:oracle:thin:@test");
      Statement statement = connection.createStatement();
      AutoCloseable dropTable = () -> statement.execute("DROP TABLE test");
      PreparedStatement query = connection.prepareStatement(
        "SELECT id FROM test FOR UPDATE SKIP LOCKED");
      PreparedStatement insert = connection.prepareStatement(
        "INSERT INTO test VALUES (?)"
      )) {

      statement.execute("CREATE TABLE test (id NUMBER)");

      for (int i = 0; i < 100; i++) {
        insert.setInt(1, i);
        insert.addBatch();
      }
      insert.executeBatch();

      // Transaction begins before executing the SELECT ... FOR UPDATE query
      connection.setAutoCommit(false);

      // Fetch 10 rows at time
      query.setFetchSize(10);

      // The SELECT ... FOR UPDATE query is executed
      try (ResultSet resultSet = query.executeQuery()) {

        // Transaction ends before all rows have been fetched
        connection.setAutoCommit(true);

        // ORA-01002 when JDBC (or R2DBC) goes to fetch the next 10 rows:
        while (resultSet.next())
          System.out.println(resultSet.getInt(1));
      }

    }
  }
}

If you know the number of rows this query returns, and you know that all row data can fit in memory, then you could pass the row count to Statement.fetchSize(int). This avoids R2DBC having to send multiple fetch requests to the database (so it's faster) and it might solve the issue of the transaction ending before all rows have been fetched.

But if the number of rows is unknown, or it is too much to fit in memory, then you'll want to make sure a commit/rollback doesn't happen until the Result publisher has emitted it's last row. Maybe the Flux.usingWhen operator could be useful here; You can end the transaction in the asyncCleanUp Function: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#usingWhen-org.reactivestreams.Publisher-java.util.function.Function-java.util.function.Function- Something like:

Flux.usingWhen(
  Mono.from(connection.beginTransaction()).thenReturn(0), // <-- need to emit a value to trigger query publisher
  ignored -> 
    Flux.from(connection.createStatement(FOR_UPDATE_QUERY))
      .flatMap(result -> 
        result.map(row -> createPfrStatus(row)), // <-- Your method for mapping row data into PrfStatus
  ignored -> connection.commitTransaction())

Code above isn't tested, but I hope the idea comes across: We don't commit/rollback until all rows have been received.

Michael-A-McMahon commented 1 month ago

Hope this was resolved. Otherwise, let me know if I can help.