apache / incubator-kie-kogito-apps

Kogito Apps - Kogito is a cloud-native business automation technology for building cloud-ready business applications.
http://kogito.kie.org
Apache License 2.0
59 stars 126 forks source link

Event Processing without Error Handling in Data Index and Sequential Processing between runtime and DataIndex MongoDB #1900

Open debu999 opened 10 months ago

debu999 commented 10 months ago

Describe the bug

There is no explicit error handling during event processing in data index. ReactiveMessagingEventConsumer/BlockingMessagingEventConsumer,

@Incoming(KOGITO_PROCESSINSTANCES_EVENTS)
    public Uni<Void> onProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
        LOGGER.debug("Process instance consumer received ProcessInstanceDataEvent: \n{}", event);
        return Uni.createFrom().item(event)
                .invoke(indexingService::indexProcessInstanceEvent)
                .invoke(eventPublisher::fire)
                .onFailure()
                .invoke(t -> LOGGER.error("Error processing process instance ProcessInstanceDataEvent: {}", t.getMessage(), t))
                .onItem().ignore().andContinueWithNull();
    }

    @Incoming(KOGITO_USERTASKINSTANCES_EVENTS)
    public Uni<Void> onUserTaskInstanceEvent(UserTaskInstanceDataEvent<?> event) {
        LOGGER.debug("Task instance received UserTaskInstanceDataEvent \n{}", event);
        return Uni.createFrom().item(event)
                .invoke(indexingService::indexUserTaskInstanceEvent)
                .invoke(eventPublisher::fire)
                .onFailure()
                .invoke(t -> LOGGER.error("Error processing task instance UserTaskInstanceDataEvent: {}", t.getMessage(), t))
                .onItem().ignore().andContinueWithNull();
    }

If any error occurs then its silently ignored. Can there be something to either store the message which failed.

There is one more issue with the usecase

AbstractTransactionManager.java

 @Override
    public void onBeforeStartEvent(UnitOfWorkStartEvent event) {
        if (!enabled()) {
            return;
        }

        ClientSession clientSession = mongoClient.startSession();
        this.clientSessionLocal.set(clientSession);
        TransactionOptions txnOptions = TransactionOptions.builder()
                .readPreference(ReadPreference.primary())
                .readConcern(ReadConcern.MAJORITY)
                .writeConcern(WriteConcern.MAJORITY)
                .build();
        clientSession.startTransaction(txnOptions);
    }

    @Override
    public void onAfterEndEvent(UnitOfWorkEndEvent event) {
        if (!enabled()) {
            return;
        }

        try (ClientSession clientSession = this.getClientSession()) {
            clientSession.commitTransaction();
        } finally {
            clientSessionLocal.remove();
        }
    }

How to ensure consistency between runtime and dataindex. We see in rare cases the following is happening. 1.Lets assume we have 3 servers in replica set S1/S2/S3 and the publisher app is writing to S1/S3 for the majority write config.

  1. Data is not yet acknowledges from majority
  2. The cloud event is fired and say data index which is the consumer of cloud event is able to process the event
  3. DI is reading data from the mongodb and thus it might be reading old data to update.
  4. Say if it connects to S2 it will read old data. as default read concern is local.
  5. How can we avoid this scenario. Please advise on this.
  6. What should be the read concern in mongodb for the dataindex in replica set mode out of the following local/available/majority/linearizable/snapshot

Expected behavior

Ideally runtime persistence completion should trigger events and then data index should always get the latest data from the system.

Moreover any errror in event processing must be stored rather than silently ignoring them in the system. Do advise on the solution.

Actual behavior

we see DI process data after receiving events but the data read from mongo db in certain scenarios is stale or old thus we get incorrect updates in database.

DI must not read anything at all from acutal runtime collection.

How to Reproduce?

This is tricky as it is not reproduceable. These are kindly edge cases where if event processing is fast, data read sometimes are stale. Need help on right config to make DI read latest data and any error in DI must be logged rather than silently ignoring them.

Output of uname -a or ver

Darwin Mac-mini.local 22.2.0 Darwin Kernel Version 22.2.0: Fri Nov 11 02:04:44 PST 2022; root:xnu-8792.61.2~4/RELEASE_ARM64_T8103 arm64

Output of java -version

openjdk version "20.0.1" 2023-04-18 OpenJDK Runtime Environment GraalVM CE 20.0.1-dev+9.1 (build 20.0.1+9-jvmci-23.1-b02) OpenJDK 64-Bit Server VM GraalVM CE 20.0.1-dev+9.1 (build 20.0.1+9-jvmci-23.1-b02, mixed mode, sharing)

GraalVM version (if different from Java)

20.0.1

Kogito version or git rev (or at least Quarkus version if you are using Kogito via Quarkus platform BOM)

1.44.1

Build tool (ie. output of mvnw --version or gradlew --version)

Apache Maven 3.9.2 (c9616018c7a021c1c39be70fb2843d6f5f9b8a1c) Maven home: /opt/apache-maven-3.9.2 Java version: 20.0.1, vendor: GraalVM Community, runtime: /Library/Java/JavaVirtualMachines/graalvm-community-openjdk-20.0.1+9.1/Contents/Home Default locale: en_IN, platform encoding: UTF-8 OS name: "mac os x", version: "13.1", arch: "aarch64", family: "mac"

Additional information

2 issues

  1. error handling support in di and logging the errors in somecollection for debugging
  2. di processing on latest data from runtime.
debu999 commented 10 months ago

@pefernan @ricardozanini @nmirasch Please advise