camunda / connectors

Camunda Connectors
https://docs.camunda.io/docs/components/integration-framework/connectors/out-of-the-box-connectors/available-connectors-overview/
Apache License 2.0
37 stars 36 forks source link

rabbitmq inbound connector publish message with wrong name #1323

Open bulivlad opened 9 months ago

bulivlad commented 9 months ago

Describe the Bug

For some process instances, the rabbimq inbound connector - intermediate catch event - publishes a message with the wrong name, the correlation key seems to be correct, however the wrong message name prevents the message to be correlated.

Steps to Reproduce

  1. create a process with more than 1 rabbitmq connector - intermediate catch event
  2. start the bpmn, wait for the token to reach the inbound connector, and push the message to the queue

Expected Behavior

Message should be correlated based on message name and correlation key

Environment

Test process

<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_0o5p122" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.13.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.2.0">
  <bpmn:process id="Process_onboarding" isExecutable="true">
    <bpmn:extensionElements />
    <bpmn:startEvent id="StartEvent_1">
      <bpmn:outgoing>Flow_0m0flnh</bpmn:outgoing>
    </bpmn:startEvent>
    <bpmn:sequenceFlow id="Flow_0m0flnh" sourceRef="StartEvent_1" targetRef="Gateway_0533885" />
    <bpmn:endEvent id="Event_05x97di">
      <bpmn:incoming>Flow_1ilduf1</bpmn:incoming>
    </bpmn:endEvent>
    <bpmn:sequenceFlow id="Flow_14b6eot" sourceRef="Event_0k5c5rz" targetRef="Activity_1q6qehk" />
    <bpmn:intermediateCatchEvent id="Event_0k5c5rz" name="Save success" zeebe:modelerTemplate="io.camunda.connectors.inbound.RabbitMQ.Intermediate.v1" zeebe:modelerTemplateVersion="3" zeebe:modelerTemplateIcon="data:image/svg+xml;utf8,%3Csvg xmlns=&#39;http://www.w3.org/2000/svg&#39; width=&#39;18&#39; height=&#39;18&#39; viewBox=&#39;-7.5 0 271 271&#39; preserveAspectRatio=&#39;xMidYMid&#39;%3E%3Cpath d=&#39;M245.44 108.308h-85.09a7.738 7.738 0 0 1-7.735-7.734v-88.68C152.615 5.327 147.29 0 140.726 0h-30.375c-6.568 0-11.89 5.327-11.89 11.894v88.143c0 4.573-3.697 8.29-8.27 8.31l-27.885.133c-4.612.025-8.359-3.717-8.35-8.325l.173-88.241C54.144 5.337 48.817 0 42.24 0H11.89C5.321 0 0 5.327 0 11.894V260.21c0 5.834 4.726 10.56 10.555 10.56H245.44c5.834 0 10.56-4.726 10.56-10.56V118.868c0-5.834-4.726-10.56-10.56-10.56zm-39.902 93.233c0 7.645-6.198 13.844-13.843 13.844H167.69c-7.646 0-13.844-6.199-13.844-13.844v-24.005c0-7.646 6.198-13.844 13.844-13.844h24.005c7.645 0 13.843 6.198 13.843 13.844v24.005z&#39; fill=&#39;%23F60&#39;/%3E%3C/svg%3E">
      <bpmn:extensionElements>
        <zeebe:properties>
          <zeebe:property name="inbound.type" value="io.camunda:connector-rabbitmq-inbound:1" />
          <zeebe:property name="authentication.authType" value="uri" />
          <zeebe:property name="authentication.uri" value="amqp://admin:admin@rabbitmq:5672/devcon" />
          <zeebe:property name="queueName" value="ack" />
          <zeebe:property name="consumerTag" value="" />
          <zeebe:property name="arguments" value="" />
          <zeebe:property name="exclusive" value="false" />
          <zeebe:property name="correlationKeyExpression" value="=string(message.body.ssn) + &#34;-&#34; + string(message.body.status)" />
        </zeebe:properties>
      </bpmn:extensionElements>
      <bpmn:incoming>Flow_1n9ceqm</bpmn:incoming>
      <bpmn:outgoing>Flow_14b6eot</bpmn:outgoing>
      <bpmn:messageEventDefinition id="MessageEventDefinition_1ha0hu7" messageRef="Message_1ydgiu3" />
    </bpmn:intermediateCatchEvent>
    <bpmn:sequenceFlow id="Flow_1n9ceqm" sourceRef="Gateway_0533885" targetRef="Event_0k5c5rz" />
    <bpmn:eventBasedGateway id="Gateway_0533885">
      <bpmn:incoming>Flow_0m0flnh</bpmn:incoming>
      <bpmn:outgoing>Flow_1n9ceqm</bpmn:outgoing>
      <bpmn:outgoing>Flow_1hv7n7i</bpmn:outgoing>
      <bpmn:outgoing>Flow_12z54lh</bpmn:outgoing>
    </bpmn:eventBasedGateway>
    <bpmn:sequenceFlow id="Flow_1hv7n7i" sourceRef="Gateway_0533885" targetRef="Event_1jzlt5e" />
    <bpmn:intermediateCatchEvent id="Event_1jzlt5e" name="Save failed" zeebe:modelerTemplate="io.camunda.connectors.inbound.RabbitMQ.Intermediate.v1" zeebe:modelerTemplateVersion="3" zeebe:modelerTemplateIcon="data:image/svg+xml;utf8,%3Csvg xmlns=&#39;http://www.w3.org/2000/svg&#39; width=&#39;18&#39; height=&#39;18&#39; viewBox=&#39;-7.5 0 271 271&#39; preserveAspectRatio=&#39;xMidYMid&#39;%3E%3Cpath d=&#39;M245.44 108.308h-85.09a7.738 7.738 0 0 1-7.735-7.734v-88.68C152.615 5.327 147.29 0 140.726 0h-30.375c-6.568 0-11.89 5.327-11.89 11.894v88.143c0 4.573-3.697 8.29-8.27 8.31l-27.885.133c-4.612.025-8.359-3.717-8.35-8.325l.173-88.241C54.144 5.337 48.817 0 42.24 0H11.89C5.321 0 0 5.327 0 11.894V260.21c0 5.834 4.726 10.56 10.555 10.56H245.44c5.834 0 10.56-4.726 10.56-10.56V118.868c0-5.834-4.726-10.56-10.56-10.56zm-39.902 93.233c0 7.645-6.198 13.844-13.843 13.844H167.69c-7.646 0-13.844-6.199-13.844-13.844v-24.005c0-7.646 6.198-13.844 13.844-13.844h24.005c7.645 0 13.843 6.198 13.843 13.844v24.005z&#39; fill=&#39;%23F60&#39;/%3E%3C/svg%3E">
      <bpmn:extensionElements>
        <zeebe:properties>
          <zeebe:property name="inbound.type" value="io.camunda:connector-rabbitmq-inbound:1" />
          <zeebe:property name="authentication.authType" value="uri" />
          <zeebe:property name="authentication.uri" value="amqp://admin:admin@rabbitmq:5672/devcon" />
          <zeebe:property name="queueName" value="ack" />
          <zeebe:property name="consumerTag" value="" />
          <zeebe:property name="arguments" value="" />
          <zeebe:property name="exclusive" value="false" />
          <zeebe:property name="correlationKeyExpression" value="=string(message.body.ssn) + &#34;-&#34; + string(message.body.status)" />
        </zeebe:properties>
      </bpmn:extensionElements>
      <bpmn:incoming>Flow_1hv7n7i</bpmn:incoming>
      <bpmn:outgoing>Flow_0fz3r38</bpmn:outgoing>
      <bpmn:messageEventDefinition id="MessageEventDefinition_01dgsqw" messageRef="Message_0fuk94b" />
    </bpmn:intermediateCatchEvent>
    <bpmn:endEvent id="Event_1jdze4c">
      <bpmn:incoming>Flow_0ptaz4j</bpmn:incoming>
    </bpmn:endEvent>
    <bpmn:sequenceFlow id="Flow_0fz3r38" sourceRef="Event_1jzlt5e" targetRef="Activity_0o7tai8" />
    <bpmn:sequenceFlow id="Flow_0ptaz4j" sourceRef="Activity_0o7tai8" targetRef="Event_1jdze4c" />
    <bpmn:sequenceFlow id="Flow_1ilduf1" sourceRef="Activity_1q6qehk" targetRef="Event_05x97di" />
    <bpmn:sendTask id="Activity_1q6qehk" name="Start backoffice process">
      <bpmn:extensionElements>
        <zeebe:taskDefinition type="start-backoffice-process" />
      </bpmn:extensionElements>
      <bpmn:incoming>Flow_14b6eot</bpmn:incoming>
      <bpmn:outgoing>Flow_1ilduf1</bpmn:outgoing>
    </bpmn:sendTask>
    <bpmn:sequenceFlow id="Flow_12z54lh" sourceRef="Gateway_0533885" targetRef="Event_0d5gade" />
    <bpmn:intermediateCatchEvent id="Event_0d5gade" name="Already registered" zeebe:modelerTemplate="io.camunda.connectors.inbound.RabbitMQ.Intermediate.v1" zeebe:modelerTemplateVersion="3" zeebe:modelerTemplateIcon="data:image/svg+xml;utf8,%3Csvg xmlns=&#39;http://www.w3.org/2000/svg&#39; width=&#39;18&#39; height=&#39;18&#39; viewBox=&#39;-7.5 0 271 271&#39; preserveAspectRatio=&#39;xMidYMid&#39;%3E%3Cpath d=&#39;M245.44 108.308h-85.09a7.738 7.738 0 0 1-7.735-7.734v-88.68C152.615 5.327 147.29 0 140.726 0h-30.375c-6.568 0-11.89 5.327-11.89 11.894v88.143c0 4.573-3.697 8.29-8.27 8.31l-27.885.133c-4.612.025-8.359-3.717-8.35-8.325l.173-88.241C54.144 5.337 48.817 0 42.24 0H11.89C5.321 0 0 5.327 0 11.894V260.21c0 5.834 4.726 10.56 10.555 10.56H245.44c5.834 0 10.56-4.726 10.56-10.56V118.868c0-5.834-4.726-10.56-10.56-10.56zm-39.902 93.233c0 7.645-6.198 13.844-13.843 13.844H167.69c-7.646 0-13.844-6.199-13.844-13.844v-24.005c0-7.646 6.198-13.844 13.844-13.844h24.005c7.645 0 13.843 6.198 13.843 13.844v24.005z&#39; fill=&#39;%23F60&#39;/%3E%3C/svg%3E">
      <bpmn:extensionElements>
        <zeebe:properties>
          <zeebe:property name="inbound.type" value="io.camunda:connector-rabbitmq-inbound:1" />
          <zeebe:property name="authentication.authType" value="uri" />
          <zeebe:property name="authentication.uri" value="amqp://admin:admin@rabbitmq:5672/devcon" />
          <zeebe:property name="queueName" value="ack" />
          <zeebe:property name="consumerTag" value="" />
          <zeebe:property name="arguments" value="" />
          <zeebe:property name="exclusive" value="false" />
          <zeebe:property name="correlationKeyExpression" value="=string(message.body.ssn) + &#34;-&#34; + string(message.body.status)" />
        </zeebe:properties>
      </bpmn:extensionElements>
      <bpmn:incoming>Flow_12z54lh</bpmn:incoming>
      <bpmn:outgoing>Flow_0bmmnn0</bpmn:outgoing>
      <bpmn:messageEventDefinition id="MessageEventDefinition_16dzqa3" messageRef="Message_0juxewj" />
    </bpmn:intermediateCatchEvent>
    <bpmn:endEvent id="Event_0si2p4a">
      <bpmn:incoming>Flow_0bmmnn0</bpmn:incoming>
    </bpmn:endEvent>
    <bpmn:sequenceFlow id="Flow_0bmmnn0" sourceRef="Event_0d5gade" targetRef="Event_0si2p4a" />
    <bpmn:serviceTask id="Activity_0o7tai8" name="Send email to CSR" zeebe:modelerTemplate="io.camunda.connectors.RabbitMQ.v1" zeebe:modelerTemplateVersion="1" zeebe:modelerTemplateIcon="data:image/svg+xml;utf8,%3Csvg xmlns=&#39;http://www.w3.org/2000/svg&#39; width=&#39;18&#39; height=&#39;18&#39; viewBox=&#39;-7.5 0 271 271&#39; preserveAspectRatio=&#39;xMidYMid&#39;%3E%3Cpath d=&#39;M245.44 108.308h-85.09a7.738 7.738 0 0 1-7.735-7.734v-88.68C152.615 5.327 147.29 0 140.726 0h-30.375c-6.568 0-11.89 5.327-11.89 11.894v88.143c0 4.573-3.697 8.29-8.27 8.31l-27.885.133c-4.612.025-8.359-3.717-8.35-8.325l.173-88.241C54.144 5.337 48.817 0 42.24 0H11.89C5.321 0 0 5.327 0 11.894V260.21c0 5.834 4.726 10.56 10.555 10.56H245.44c5.834 0 10.56-4.726 10.56-10.56V118.868c0-5.834-4.726-10.56-10.56-10.56zm-39.902 93.233c0 7.645-6.198 13.844-13.843 13.844H167.69c-7.646 0-13.844-6.199-13.844-13.844v-24.005c0-7.646 6.198-13.844 13.844-13.844h24.005c7.645 0 13.843 6.198 13.843 13.844v24.005z&#39; fill=&#39;%23F60&#39;/%3E%3C/svg%3E">
      <bpmn:extensionElements>
        <zeebe:taskDefinition type="io.camunda:connector-rabbitmq:1" />
        <zeebe:ioMapping>
          <zeebe:input source="uri" target="authentication.authType" />
          <zeebe:input source="amqp://admin:admin@rabbitmq:5672/devcon" target="authentication.uri" />
          <zeebe:input source="customer" target="routing.exchange" />
          <zeebe:input source="mail" target="routing.routingKey" />
          <zeebe:input source="={&#10;  &#34;message&#34;: &#34;saved failed for customer &#34; + customer.ssn,&#10;  &#34;to&#34;: &#34;email@domain.com&#34;&#10;}" target="message.body" />
          <zeebe:input source="{}" target="message.properties" />
        </zeebe:ioMapping>
        <zeebe:taskHeaders />
      </bpmn:extensionElements>
      <bpmn:incoming>Flow_0fz3r38</bpmn:incoming>
      <bpmn:outgoing>Flow_0ptaz4j</bpmn:outgoing>
    </bpmn:serviceTask>
  </bpmn:process>
  <bpmn:message id="Message_1ydgiu3" name="39fa5a4a-e86c-45cf-b406-b2b8d19fa2d3" zeebe:modelerTemplate="io.camunda.connectors.inbound.RabbitMQ.Intermediate.v1">
    <bpmn:extensionElements>
      <zeebe:subscription correlationKey="=string(customer.ssn) + &#34;-success&#34;" />
    </bpmn:extensionElements>
  </bpmn:message>
  <bpmn:message id="Message_0fuk94b" name="77cd64b1-d37f-4583-89ae-6a14c712d907" zeebe:modelerTemplate="io.camunda.connectors.inbound.RabbitMQ.Intermediate.v1">
    <bpmn:extensionElements>
      <zeebe:subscription correlationKey="=string(customer.ssn) + &#34;-failure&#34;" />
    </bpmn:extensionElements>
  </bpmn:message>
  <bpmn:message id="Message_0juxewj" name="292bcfdb-4f85-46f7-955e-d04fe87438de" zeebe:modelerTemplate="io.camunda.connectors.inbound.RabbitMQ.Intermediate.v1">
    <bpmn:extensionElements>
      <zeebe:subscription correlationKey="=string(customer.ssn) + &#34;-existent&#34;" />
    </bpmn:extensionElements>
  </bpmn:message>
  <bpmndi:BPMNDiagram id="BPMNDiagram_1">
    <bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_onboarding">
      <bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
        <dc:Bounds x="152" y="99" width="36" height="36" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Event_05x97di_di" bpmnElement="Event_05x97di">
        <dc:Bounds x="1642" y="99" width="36" height="36" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Event_01wkmyk_di" bpmnElement="Event_0k5c5rz">
        <dc:Bounds x="1312" y="99" width="36" height="36" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="1297" y="142" width="68" height="14" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Gateway_1pazu5r_di" bpmnElement="Gateway_0533885">
        <dc:Bounds x="1205" y="92" width="50" height="50" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Event_134roqx_di" bpmnElement="Event_1jzlt5e">
        <dc:Bounds x="1312" y="202" width="36" height="36" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="1303" y="245" width="55" height="14" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Event_1jdze4c_di" bpmnElement="Event_1jdze4c">
        <dc:Bounds x="1642" y="202" width="36" height="36" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Activity_1gvilfb_di" bpmnElement="Activity_1q6qehk">
        <dc:Bounds x="1440" y="77" width="100" height="80" />
        <bpmndi:BPMNLabel />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Event_049f47u_di" bpmnElement="Event_0d5gade">
        <dc:Bounds x="1312" y="312" width="36" height="36" />
        <bpmndi:BPMNLabel>
          <dc:Bounds x="1287" y="355" width="90" height="14" />
        </bpmndi:BPMNLabel>
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Event_0si2p4a_di" bpmnElement="Event_0si2p4a">
        <dc:Bounds x="1642" y="312" width="36" height="36" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNShape id="Activity_1jkwpvu_di" bpmnElement="Activity_0o7tai8">
        <dc:Bounds x="1440" y="180" width="100" height="80" />
      </bpmndi:BPMNShape>
      <bpmndi:BPMNEdge id="Flow_0m0flnh_di" bpmnElement="Flow_0m0flnh">
        <di:waypoint x="188" y="117" />
        <di:waypoint x="1205" y="117" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="Flow_14b6eot_di" bpmnElement="Flow_14b6eot">
        <di:waypoint x="1348" y="117" />
        <di:waypoint x="1440" y="117" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="Flow_1n9ceqm_di" bpmnElement="Flow_1n9ceqm">
        <di:waypoint x="1255" y="117" />
        <di:waypoint x="1312" y="117" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="Flow_1hv7n7i_di" bpmnElement="Flow_1hv7n7i">
        <di:waypoint x="1230" y="142" />
        <di:waypoint x="1230" y="220" />
        <di:waypoint x="1312" y="220" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="Flow_0fz3r38_di" bpmnElement="Flow_0fz3r38">
        <di:waypoint x="1348" y="220" />
        <di:waypoint x="1440" y="220" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="Flow_0ptaz4j_di" bpmnElement="Flow_0ptaz4j">
        <di:waypoint x="1540" y="220" />
        <di:waypoint x="1642" y="220" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="Flow_1ilduf1_di" bpmnElement="Flow_1ilduf1">
        <di:waypoint x="1540" y="117" />
        <di:waypoint x="1642" y="117" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="Flow_12z54lh_di" bpmnElement="Flow_12z54lh">
        <di:waypoint x="1230" y="142" />
        <di:waypoint x="1230" y="330" />
        <di:waypoint x="1312" y="330" />
      </bpmndi:BPMNEdge>
      <bpmndi:BPMNEdge id="Flow_0bmmnn0_di" bpmnElement="Flow_0bmmnn0">
        <di:waypoint x="1348" y="330" />
        <di:waypoint x="1642" y="330" />
      </bpmndi:BPMNEdge>
    </bpmndi:BPMNPlane>
  </bpmndi:BPMNDiagram>
</bpmn:definitions>

ES message Working

{
    "_index": "zeebe-record_message_8.2.5_2023-10-24",
    "_type": "_doc",
    "_id": "1-714",
    "_version": 1,
    "_seq_no": 0,
    "_primary_term": 1,
    "_routing": "1",
    "found": true,
    "_source": {
        "partitionId": 1,
        "value": {
            "name": "39fa5a4a-e86c-45cf-b406-b2b8d19fa2d3",
            "timeToLive": 3600000,
            "messageId": "",
            "deadline": 1698152680347,
            "variables": {},
            "correlationKey": "1-success"
        },
        "key": 2251799813685601,
        "timestamp": 1698149080349,
        "valueType": "MESSAGE",
        "brokerVersion": "8.2.5",
        "recordType": "EVENT",
        "sourceRecordPosition": 713,
        "intent": "PUBLISHED",
        "rejectionType": "NULL_VAL",
        "rejectionReason": "",
        "position": 714,
        "sequence": 2251799813685249
    }
}

Wrong message name - the message name belongs to a different catch event

{
    "_index": "zeebe-record_message_8.2.5_2023-10-24",
    "_type": "_doc",
    "_id": "1-992",
    "_version": 1,
    "_seq_no": 2,
    "_primary_term": 1,
    "_routing": "1",
    "found": true,
    "_source": {
        "partitionId": 1,
        "value": {
            "name": "77cd64b1-d37f-4583-89ae-6a14c712d907",
            "timeToLive": 3600000,
            "messageId": "",
            "deadline": 1698152716088,
            "variables": {},
            "correlationKey": "2-success"
        },
        "key": 2251799813685719,
        "timestamp": 1698149116088,
        "valueType": "MESSAGE",
        "brokerVersion": "8.2.5",
        "recordType": "EVENT",
        "sourceRecordPosition": 991,
        "intent": "PUBLISHED",
        "rejectionType": "NULL_VAL",
        "rejectionReason": "",
        "position": 992,
        "sequence": 2251799813685251
    }
}
sbuettner commented 8 months ago

Hey @bulivlad thanks for your request and the provided examples. We see that you are trying to correlate message from the same queue in different connectors. Please be aware that the connectors use their message name they are configured with in the XML when publishing message back to zeebe. As two (or more) connectors read from the same queue, you might see exactly what you describe: Correlation attempts using a different message name.

We recommend to make use of an activation condition to only correlate messages if the activation condition (message) matches the connector process tasks. Please be aware that even using this might lead to acknowledgment of messages that dont match the acivation code.

Therefore we recommend to use different queues for different purposes here.

@bulivlad Does that work for you?

bastiankoerber commented 7 months ago

@bulivlad does that solve your issue? Can we close the ticket?

bastiankoerber commented 7 months ago

the behavior will be improved by deduplication for clients (internal note: https://camunda.productboard.com/roadmap/7537339-connectors-engineering-team-roadmap/features/25202643/detail)

michaelarnauts commented 6 months ago

Hmm, so basically, the RabbitMQ Inbound Connector only works reliable when every Connector has it's own queue?

This doesn't seem efficient. Is there a better way for this? We have many processes with multiple Inbound Connectors.

My actual use-case is sending RPC messages, so we need to wait for the reply by using a Inbound Connector. See https://forum.camunda.io/t/using-the-rabbitmq-connectors-to-make-rpc-calls/49799