citrusframework / citrus

Framework for automated integration tests with focus on messaging integration
https://citrusframework.org
Apache License 2.0
460 stars 137 forks source link

Kafka. timeout #1250

Open akuz0 opened 2 weeks ago

akuz0 commented 2 weeks ago

Citrus Version 4.3.3

Question test case

  1. SUT sends an event in the topic
  2. Validate the event in the topic

Why can't I subtract events? And why poll it is called once? Maybe I'm doing something wrong.

What I've tried so far If you run a group of tests, all the tests after the first one fall image I think, problem Group coordinator MY_KAFKA_IP (id: 2147483646 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response. isDisconnected: false. Rediscovery will be attempted.

log 2024-11-02 20:49:31 DEBUG AbstractFetch:194 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Fetch read_uncommitted at offset 114 for partition ePC-0 returned fetch data PartitionData(partitionIndex=0, errorCode=0, highWatermark=116, lastStableOffset=116, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, records=MemoryRecords(size=661, buffer=java.nio.HeapByteBuffer[pos=0 lim=661 cap=664])) 2024-11-02 20:49:31 DEBUG AbstractFetch:278 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Removing pending request for fetch session: 613051 for node: IP:9092 (id: 1 rack: null) 2024-11-02 20:49:31 DEBUG NetworkClient:954 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Received OFFSET_COMMIT response from node 2147483646 for request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, correlationId=16, headerVersion=2): OffsetCommitResponseData(throttleTimeMs=0, topics=[OffsetCommitResponseTopic(name='ePC', partitions=[OffsetCommitResponsePartition(partitionIndex=0, errorCode=0)])]) 2024-11-02 20:51:28 DEBUG ConsumerCoordinator:1326 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Committed offset 114 for partition -0 2024-11-02 20:51:28 INFO ConsumerCoordinator:999 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Group coordinator MY_KAFKA_IP (id: 2147483646 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response. isDisconnected: false. Rediscovery will be attempted. 2024-11-02 20:51:28 INFO ConsumerCoordinator:1012 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Requesting disconnect from last known coordinator IP:9092 (id: 2147483646 rack: null) 2024-11-02 20:51:28 INFO NetworkClient:343 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Client requested disconnect from node 2147483646

Additional information If in debug mode, when citrus send throw, i use poll and see my event. Why poll it is called once?

for example while

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        executorService.submit(() -> {
            processRecord(record); // Process the record asynchronously
        });
    }
}

https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

image

I can use while in code, but i have timeout exception instante

while (true) {
 testActionRunner.run(
                        receive()
                                .selector("citrus_kafka_messageKey ="+Id)
                                .endpoint(topic)
);
}
akuz0 commented 2 weeks ago

I`m solved it, but i think it not correctly image

   HashMap<String, Object> properties = new HashMap<>();
        properties.put("max.poll.records", 2);
bbortt commented 2 weeks ago

is it possible that both your tests use the same Id? I have a gut feeling that this is related to the fact that a citrus endpoint receive message can only return exactly one Message. if multiple match the same Id that would explain the problem. your selector must be unique!

akuz0 commented 2 weeks ago

@bbortt maybe, how set selector by message body for Kafka? jsonPath works?

.selector(Collections.singletonMap("jsonPath:$.data.commandId", id))
bbortt commented 2 weeks ago

@akuz0 no, I've focused on header-based filtering for now. I can take this as a feature request, if you need it? I think JSON-path-based value (message body) matching sounds reasonable.