apache / incubator-kie-issues

Apache License 2.0
12 stars 1 forks source link

No session found errors when sending kafka messages with correlations in bulk #1016

Open martinweiler opened 8 months ago

martinweiler commented 8 months ago

Using the correlation feature from JBPM-9735 there is no problem sending messages one by one, eg:

$ kafka-console-producer --broker-list localhost:9092 --topic collaboration 
>{"data":{"type":"collaboration","id":"M007"}}
>{"data":{"type":"collaboration","id":"M002"}}
...

The messages are processed as expected and the correct process instance is signalled.

However, using the same process, but sending messages in bulk:

$ kafka-console-producer --broker-list localhost:9092 --topic collaboration < /tmp/signal-correlations-no-tokens.txt

leads to errors:

14:02:37,738 ERROR [org.kie.server.services.jbpm.kafka.KafkaServerConsumer] (pool-18-thread-3) Exception deserializing event: org.kie.internal.runtime.manager.SessionNotFoundException: No session found for context 52
    at org.jbpm.runtime.manager.impl.PerProcessInstanceRuntimeManager$PerProcessInstanceInitializer.initKieSession(PerProcessInstanceRuntimeManager.java:579)
    at org.jbpm.runtime.manager.impl.RuntimeEngineImpl.internalGetKieSession(RuntimeEngineImpl.java:169)
    at org.jbpm.runtime.manager.impl.RuntimeEngineImpl.getKieSession(RuntimeEngineImpl.java:76)
    at org.jbpm.runtime.manager.impl.PerProcessInstanceRuntimeManager.signalEvent(PerProcessInstanceRuntimeManager.java:185)
    at org.jbpm.kie.services.impl.ProcessServiceImpl.signalEvent(ProcessServiceImpl.java:383)
    at org.kie.server.services.jbpm.kafka.KafkaServerConsumer.signalEvent(KafkaServerConsumer.java:215)
    at org.kie.server.services.jbpm.kafka.KafkaServerConsumer.lambda$new$2(KafkaServerConsumer.java:212)
    at org.kie.server.services.jbpm.kafka.KafkaServerConsumer.processEvent(KafkaServerConsumer.java:251)
    at org.kie.server.services.jbpm.kafka.KafkaServerConsumer.processMessage(KafkaServerConsumer.java:232)
    at org.kie.server.services.jbpm.kafka.KafkaServerRegistration.forEach(KafkaServerRegistration.java:118)
    at org.kie.server.services.jbpm.kafka.KafkaServerRegistration.forEachMessage(KafkaServerRegistration.java:101)
    at org.kie.server.services.jbpm.kafka.KafkaServerConsumer.processEvent(KafkaServerConsumer.java:220)
    at org.kie.server.services.jbpm.kafka.KafkaServerConsumer.lambda$processEvents$1(KafkaServerConsumer.java:191)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Attaching the example kafka-signal-correlations-test.zip that can be used to replicate the issue:

elguardian commented 8 months ago

looks to me like a race condition https://github.com/kiegroup/jbpm/pull/2403

the lock strategy is set during getting the engine not when read which process instances are being waiting for signal