apache / incubator-kie-kogito-runtimes

Kogito Runtimes - Kogito is a cloud-native business automation technology for building cloud-ready business applications.
http://kogito.kie.org
Apache License 2.0
540 stars 210 forks source link

Merging output from parallel blocks on parallel gateway sometimes doesn't work #3167

Open DotNetPart opened 1 year ago

DotNetPart commented 1 year ago

Describe the bug

I have quite a simple workflow where I split flow into 3 parallel requests and then merge their output in parallel gateway. Sometimes though, my flow hangs and merging doesn't happen though all 3 parallel blocks have finished. I can see that browsing through kogito process instance events.

I went through Kogito code a bit and found a suspicious place. Doesn't this one have a race condition in count updating procedure?

https://github.com/kiegroup/kogito-runtimes/blob/1b2146d259b9119e5fb4e51c422c010199689789/jbpm/jbpm-flow/src/main/java/org/jbpm/workflow/instance/node/JoinInstance.java#L71

Expected behavior

No response

Actual behavior

No response

How to Reproduce?

No response

Output of uname -a or ver

No response

Output of java -version

No response

GraalVM version (if different from Java)

No response

Kogito version or git rev (or at least Quarkus version if you are using Kogito via Quarkus platform BOM)

No response

Build tool (ie. output of mvnw --version or gradlew --version)

No response

Additional information

No response

radtriste commented 1 year ago

@ricardozanini @fjtirado any idea ?

dfiai commented 9 months ago

Hello, I have a similar issue. Additionally, when I have several identical BPMN process pods, I encounter a problem where processes with parallel actions freeze.

dfiai commented 9 months ago

@ricardozanini, @fjtirado what are your thoughts on this(but it still won't work with several pods):

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

private ConcurrentHashMap<Long, AtomicInteger> triggers = new ConcurrentHashMap<>();
...
case Join.TYPE_AND:
    // Use computeIfAbsent to handle the initialization and increment atomically
    triggers.computeIfAbsent(from.getNodeId(), k -> new AtomicInteger(0)).incrementAndGet();

    if (checkAllActivated()) {
        decreaseAllTriggers();
        triggerCompleted();
    }
    break;
fjtirado commented 9 months ago

@dfiai Please go ahead opening a PR with that fix Certainly it would still fail when using multi pod, but at least we avoid the race condition while single pod. Good catch!

fjtirado commented 9 months ago

@dfiai If you have issues opening the PR, let me know and I will do myself, but I believe you deserve the credit on pursuing this together with @DotNetPart

dfiai commented 9 months ago

@fjtirado, sure I'll try it by myself first.