camunda-community-hub / camunda-platform-7-camel

Community Extension to add Apache Camel support for Camunda Platform 7
Apache License 2.0
82 stars 57 forks source link

"Couldn't find process instance" while using async communication #51

Open ashulinskiy opened 5 years ago

ashulinskiy commented 5 years ago

Hi folks,

Another issue I could use some assistance with please.

I have a BPMN which uses the Camunda-Camel library to communicate to a third party service through Kafka, i.e.:

1) A BPMN Send Task of a BPMN process sends a message to a Camel route. That task is the fist one in the BPMN process which may be important. 2) The Camel route forwards the message to the "FromCamunda"Kafka topic. 3) A third party service reads from the "FromCamunda" Kafka topic. 4) The third party service does some processing and sends the reply to the "ToCamunda" Kafka topic. 5) Another camel route configured on the same Camunda server as the one in steps 1-2 reads the message from the "ToCamunda" Kafka topic. 6) The route forwards the message to the same Camunda BPMN process which sent the outgoing message in steps 1-2. The BPMN Process Instance ID is used for the correlation.

The routing sometime breaks with the following error:

2019-04-25 11:56:33,696 ERROR o.a.c.p.DefaultErrorHandler [-1) thread #1 - KafkaConsumer[toCamunda]] -> Failed delivery for (MessageId: ID-LAPTOP-9OG51EMC-1556205751527-0-4 on ExchangeId: ID-LAPTOP-9OG51EMC-1556205751527-0-3). Exhausted after delivery attempt: 1 caught: java.lang.RuntimeException: Couldn't find waiting process instance with id 'b2c37f57-6772-11e9-9616-e86a64530ab3' for message 'camel.answer'

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[asyncResponse     ] [asyncResponse     ] [kafka://localhost:9092?autoOffsetReset=earliest&brokers=localhost%3A9092&consu] [       100]
[asyncResponse     ] [unmarshal1        ] [unmarshal[org.apache.camel.model.dataformat.JsonDataFormat@473c9784]          ] [        22]
[asyncResponse     ] [process2          ] [Processor@0x3a92a57c                                                          ] [         0]
[asyncResponse     ] [to2               ] [camunda-bpm:message?messageName=camel.answer&copyBodyAsVariable=aggregateData ] [        75]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.RuntimeException: Couldn't find waiting process instance with id 'b2c37f57-6772-11e9-9616-e86a64530ab3' for message 'camel.answer'
    at org.camunda.bpm.camel.component.producer.MessageProducer.process(MessageProducer.java:116)
    at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
    at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:326)
    at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:215)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
2019-04-25 11:56:33,698 WARN  o.a.c.c.k.KafkaConsumer [-1) thread #1 - KafkaConsumer[toCamunda]] -> Error during processing. Exchange[ID-LAPTOP-9OG51EMC-1556205751527-0-3]. Caused by: [java.lang.RuntimeException - Couldn't find waiting process instance with id 'b2c37f57-6772-11e9-9616-e86a64530ab3' for message 'camel.answer']
java.lang.RuntimeException: Couldn't find waiting process instance with id 'b2c37f57-6772-11e9-9616-e86a64530ab3' for message 'camel.answer'
    at org.camunda.bpm.camel.component.producer.MessageProducer.process(MessageProducer.java:116)
    at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148)
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
    at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
    at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:326)
    at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:215)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

The BPMN:

`<?xml version="1.0" encoding="UTF-8"?>

SequenceFlow_0aodgcz SequenceFlow_1yw1ho6 SequenceFlow_1i1ujc8 SequenceFlow_1i1ujc8 SequenceFlow_0aodgcz SequenceFlow_1yw1ho6 ` **The Camel routes:** ``` from("direct://asyncService") .routeId("asyncService") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { Map inMap = (Map)exchange.getIn().getBody(); Map kafkaOutParams = new HashMap<>(); kafkaOutParams.put("aggregateSchemaId", inMap.get("aggregateSchemaId?").toString()); kafkaOutParams.put("aggregateInstanceId", inMap.get("aggregateInstanceId?").toString()); exchange.getOut().setBody(kafkaOutParams, Map.class); exchange.getOut().setHeader(KafkaConstants.KEY, exchange.getProperties().get(CamundaBpmConstants.EXCHANGE_HEADER_PROCESS_INSTANCE_ID)); } }) .marshal().json(JsonLibrary.Jackson, Map.class) .convertBodyTo(String.class) .to("kafka:localhost:9092?topic=fromCamunda&brokers=localhost:9092"); from("kafka:localhost:9092?topic=toCamunda&groupId=group_id&autoOffsetReset=earliest&consumersCount=1&brokers=localhost:9092") .routeId("asyncResponse") .unmarshal().json(JsonLibrary.Jackson, Map.class) .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { exchange.setProperty(CamundaBpmConstants.EXCHANGE_HEADER_PROCESS_INSTANCE_ID, exchange.getIn().getHeader(KafkaConstants.KEY)); Map result = new HashMap<>(); result.put("aggregateData", exchange.getIn().getBody()); exchange.getIn().setBody(result, Map.class); } }) .to("camunda-bpm:message?messageName=camel.answer©BodyAsVariable=aggregateData"); ``` As far as I understand there's a race condition between the thread in the Camunda process that sends the outgoing Kafka message and the thread that reads the Kafka message and uses the Process Instance ID to find the process to forward the message to. I.e. when the correlation is done in the second thread the process hasn'd been committed to the DB in the first thread. - Does this theory sound right? - If it doesn't what else could be the problem - Any advice how to fix the issue? Any help is greatly appreciated! Thanks, Andrey.
umamaheswaran3003 commented 4 years ago

May I know the versions of camel, camunda, spring you are using. I am using current versions of all the dependencies, facing issue while server starts up. Please help!!!

ashulinsky-korio commented 4 years ago

It's been a while but I believe I have found the branch where that investigation happened.

Here's the POM with the versions we used back then, has the versions.

We ended up not using this approach so no further insight sorry.

`<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0
<groupId>DSABackend</groupId>
<artifactId>DSABackend</artifactId>
<version>1.0</version>

<properties>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
</properties>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>2.1.1.RELEASE</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>org.camunda.bpm.springboot</groupId>
        <artifactId>camunda-bpm-spring-boot-starter-webapp</artifactId>
        <version>3.2.0</version>
    </dependency>

    <!-- TODO The OTB Camunda REST API is left in the project for convenience. MUST be removed before we go production!!! -->
    <dependency>
        <groupId>org.camunda.bpm.springboot</groupId>
        <artifactId>camunda-bpm-spring-boot-starter-rest</artifactId>
        <version>3.2.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.springframework.data/spring-data-rest-webmvc -->
    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-rest-webmvc</artifactId>
        <version>3.1.4.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
    </dependency>

    <!-- https://stackoverflow.com/questions/43574426/how-to-resolve-java-lang-noclassdeffounderror-javax-xml-bind-jaxbexception-in-j -->
    <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>2.2.11</version>
    </dependency>
    <dependency>
        <groupId>com.sun.xml.bind</groupId>
        <artifactId>jaxb-core</artifactId>
        <version>2.2.11</version>
    </dependency>
    <dependency>
        <groupId>com.sun.xml.bind</groupId>
        <artifactId>jaxb-impl</artifactId>
        <version>2.2.11</version>
    </dependency>
    <dependency>
        <groupId>javax.activation</groupId>
        <artifactId>activation</artifactId>
        <version>1.1.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.8.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-collections4 -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-collections4</artifactId>
        <version>4.2</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/ch.qos.logback/ -->
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.2.3</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-core</artifactId>
        <version>1.2.3</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/ma.glasnost.orika/orika-core -->
    <dependency>
        <groupId>ma.glasnost.orika</groupId>
        <artifactId>orika-core</artifactId>
        <version>1.5.2</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.arangodb/arangodb-java-driver -->
    <dependency>
        <groupId>com.arangodb</groupId>
        <artifactId>arangodb-java-driver</artifactId>
        <version>5.0.4</version>
    </dependency>
    <dependency>
        <groupId>com.arangodb</groupId>
        <artifactId>arangodb-spring-data</artifactId>
        <version>3.2.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.8.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.avro/avro-tools -->
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-tools</artifactId>
        <version>1.8.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.2.5.RELEASE</version>
    </dependency>

    <!-- camunda BPM Apache Camel Integration -->
    <dependency>
        <groupId>org.camunda.bpm.extension.camel</groupId>
        <artifactId>camunda-bpm-camel-cdi</artifactId>
        <version>0.6</version>
        <exclusions>
            <exclusion>
                <artifactId>jaxb-impl</artifactId>
                <groupId>com.sun.xml.bind</groupId>
            </exclusion>
        </exclusions>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.camunda.bpm/camunda-engine -->
    <dependency>
        <groupId>org.camunda.bpm</groupId>
        <artifactId>camunda-engine</artifactId>
        <version>7.10.0</version>
    </dependency>

    <!-- Camel Components -->
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-core</artifactId>
        <version>2.23.2</version>
    </dependency>
    <!--<dependency>-->
    <!--<groupId>org.apache.camel</groupId>-->
    <!--<artifactId>camel-spring-boot</artifactId>-->
    <!--<version>2.23.2</version>-->
    <!--</dependency>-->
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-spring-boot-starter</artifactId>
        <version>2.23.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-cdi</artifactId>
        <version>2.23.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-groovy</artifactId>
        <version>2.23.2</version>
        <exclusions>
            <exclusion>
                <artifactId>jaxb-impl</artifactId>
                <groupId>com.sun.xml.bind</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-http</artifactId>
        <version>2.23.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-jackson -->
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-jackson</artifactId>
        <version>2.23.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.camel/camel-xstream -->
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-xstream</artifactId>
        <version>2.23.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-kafka</artifactId>
        <version>2.23.2</version>
        <!-- use the same version as your Camel core version -->
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-ognl</artifactId>
        <version>2.23.2</version>
    </dependency>

    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <version>42.2.5</version>
    </dependency>

    <!-- TEST DEPENDENCIES -->

    <!-- https://mvnrepository.com/artifact/org.mockito/mockito-core -->
    <dependency>
        <groupId>org.mockito</groupId>
        <artifactId>mockito-core</artifactId>
        <version>2.23.4</version>
        <scope>test</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.powermock/powermock-core -->
    <dependency>
        <groupId>org.powermock</groupId>
        <artifactId>powermock-core</artifactId>
        <version>2.0.0</version>
        <scope>test</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.powermock/powermock-module-junit4 -->
    <dependency>
        <groupId>org.powermock</groupId>
        <artifactId>powermock-module-junit4</artifactId>
        <version>2.0.0</version>
        <scope>test</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.powermock/powermock-api-mockito2 -->
    <dependency>
        <groupId>org.powermock</groupId>
        <artifactId>powermock-api-mockito2</artifactId>
        <version>2.0.0</version>
        <scope>test</scope>
    </dependency>

</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <layout>ZIP</layout>
            </configuration>
            <executions>
                <execution>
                    <goals>
                        <goal>repackage</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

    </plugins>
</build>

`