mostafa / xk6-kafka

k6 extension to load test Apache Kafka with support for various serialization formats, SASL, TLS, compression, Schema Registry client and beyond
Apache License 2.0
147 stars 61 forks source link

Endless waiting for the consumer when trying to read all sent messages in the topic. #207

Closed bloodgang94 closed 1 year ago

bloodgang94 commented 1 year ago

Hello! Thanks for your work. I have a question. I am using your example https://github.com/mostafa/xk6-kafka/blob/main/scripts/test_consumer_group.js and ran into the problem that if you try to subtract all 300 messages, then the consumer waits indefinitely. And this happens somewhere in 3 out of 5 attempts. Full code

/*
This is a k6 test script that imports the xk6-kafka and
tests Kafka with a 200 JSON messages per iteration.
*/

import { check } from "k6";
// import * as kafka from "k6/x/kafka";
import {
  Writer,
  Reader,
  Connection,
  SchemaRegistry,
  CODEC_SNAPPY,
  SCHEMA_TYPE_JSON,
} from "k6/x/kafka"; // import kafka extension

// Prints module-level constants
// console.log(kafka);

const brokers = ["localhost:9092"];
const topic = "test";
const groupID = "my-group";

const writer = new Writer({
  brokers: brokers,
  topic: topic,
  compression: CODEC_SNAPPY,
});
const reader = new Reader({
  brokers: brokers,
  groupID: groupID,
  groupTopics: [topic],
});
const connection = new Connection({
  address: brokers[0],
});
const schemaRegistry = new SchemaRegistry();

if (__VU == 0) {
  connection.createTopic({
    topic: topic,
    numPartitions: 3,
    replicationFactor: 1,
    configEntries: [
      {
        configName: "compression.type",
        configValue: CODEC_SNAPPY,
      },
    ],
  });
}

export const options = {
  thresholds: {
    // Base thresholds to see if the writer or reader is working
    kafka_writer_error_count: ["count == 0"],
    kafka_reader_error_count: ["count == 0"],
  },
};

export default function () {
  let messages = [];
  for (let i = 0; i < 100; i++) {
    for (let partition = 0; partition < 3; partition++) {
      messages.push({
        // The data type of the key is JSON
        key: schemaRegistry.serialize({
          data: {
            key: "value",
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
        // The data type of the value is JSON
        value: schemaRegistry.serialize({
          data: {
            key: "value",
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
        parition: partition,
      });
    }
  }

  writer.produce({ messages: messages });

  // Read one message only
  messages = reader.consume({ limit: 300 });

  check(messages, {
    "10 messages is received": (messages) => messages.length == 300,
  });

  check(messages[0], {
    "Topic equals to xk6_kafka_consumer_group_topic": (msg) =>
      msg["topic"] == topic,
    "Key contains key/value and is JSON": (msg) =>
      schemaRegistry.deserialize({
        data: msg.key,
        schemaType: SCHEMA_TYPE_JSON,
      }).key == "value",
    "Value contains key/value and is JSON": (msg) =>
      typeof schemaRegistry.deserialize({
        data: msg.value,
        schemaType: SCHEMA_TYPE_JSON,
      }) == "object" &&
      schemaRegistry.deserialize({
        data: msg.value,
        schemaType: SCHEMA_TYPE_JSON,
      }).key == "value",
  });
}

export function teardown(data) {
      if (__VU == 0) {
    // Delete the topic
    connection.deleteTopic(topic);
  }
  writer.close();
  reader.close();
  connection.close();
}

logs

time="2023-03-13T09:02:59+03:00" level=info msg="entering loop for consumer group, my-group\n"
time="2023-03-13T09:02:59+03:00" level=info msg="entering loop for consumer group, my-group\n"
time="2023-03-13T09:03:04+03:00" level=info msg="joined group my-group as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41 in generation 10"tions
time="2023-03-13T09:03:04+03:00" level=info msg="selected as leader for group, my-group\n" VU
time="2023-03-13T09:03:04+03:00" level=info msg="joined group my-group as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-87f19579-d19e-4e5c-aed6-b3b2bc286b5f in generation 10"
time="2023-03-13T09:03:04+03:00" level=info msg="joinGroup succeeded for response, my-group.  generationID=10, memberID=k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-87f19579-d19e-4e5c-aed6-b3b2bc286b5f"
time="2023-03-13T09:03:04+03:00" level=info msg="Joined group my-group as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-87f19579-d19e-4e5c-aed6-b3b2bc286b5f in generation 10"
time="2023-03-13T09:03:04+03:00" level=info msg="using 'range' balancer to assign group, my-group"
time="2023-03-13T09:03:04+03:00" level=info msg="found member: k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41/[]byte(nil)"
time="2023-03-13T09:03:04+03:00" level=info msg="found member: k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-87f19579-d19e-4e5c-aed6-b3b2bc286b5f/[]byte(nil)"
time="2023-03-13T09:03:04+03:00" level=info msg="found topic/partition: test/0"
time="2023-03-13T09:03:04+03:00" level=info msg="assigned member/topic/partitions k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41/test/[0]"
time="2023-03-13T09:03:04+03:00" level=info msg="joinGroup succeeded for response, my-group.  generationID=10, memberID=k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41"
time="2023-03-13T09:03:04+03:00" level=info msg="Joined group my-group as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41 in generation 10"
time="2023-03-13T09:03:04+03:00" level=info msg="Syncing 2 assignments for generation 10 as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-de501498-6407-4c63-8e28-15f52cd51a41"
time="2023-03-13T09:03:04+03:00" level=info msg="received empty assignments for group, my-group as member k6@QPRM-FVSC1F3 (github.com/segmentio/kafka-go)-87f19579-d19e-4e5c-aed6-b3b2bc286b5f for generation 10"
time="2023-03-13T09:03:04+03:00" level=info msg="sync group finished for group, my-group"
time="2023-03-13T09:03:04+03:00" level=info msg="sync group finished for group, my-group"

running (00m05.0s), 1/1 VUs, 0 complete and 0 interrupted iterations
default   [--------------------------------------time="2023-03-13T09:03:04+03:00" level=info msg="started heartbeat for group, my-group [3s]"
time="2023-03-13T09:03:04+03:00" level=info msg="subscribed to topics and partitions: map[]"
] 1 VUs  00m05.0s/10m0s  0/1 iters, 1 per VUtime="2023-03-13T09:03:04+03:00" level=info msg="started heartbeat for group, my-group [3s]"

time="2023-03-13T09:03:04+03:00" level=info msg="initializing kafka reader for partition 0 of test starting at offset first offset"
time="2023-03-13T09:03:04+03:00" level=info msg="subscribed to topics and partitions: map[{topic:test partition:0}:-2]"

image I have tried the rebalanceTimeout and sessionTimeout settings but without success.

Could you suggest what I am doing wrong?

mostafa commented 1 year ago

@bloodgang94

I ran your exact script (with no modifications) for a minute with 50 VUs and this is the result:

View results ```bash ./k6 run --vus 50 --duration 60s test-kafka.js /\ |‾‾| /‾‾/ /‾‾/ /\ / \ | |/ / / / / \/ \ | ( / ‾‾\ / \ | |\ \ | (‾) | / __________ \ |__| \__\ \_____/ .io execution: local script: /mnt/c/Users/Mostafa Moradian/Desktop/test-kafka.js output: engine scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop): * default: 50 looping VUs for 1m0s (gracefulStop: 30s) ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ✓ 10 messages is received ✓ Topic equals to xk6_kafka_consumer_group_topic ✓ Key contains key/value and is JSON ✓ Value contains key/value and is JSON █ teardown checks.........................: 100.00% ✓ 1764 ✗ 0 data_received..................: 0 B 0 B/s data_sent......................: 0 B 0 B/s iteration_duration.............: avg=137.67ms min=122.3ms med=131.32ms max=941ms p(90)=147.73ms p(95)=155.48ms iterations.....................: 441 4.857311/s kafka_reader_dial_count........: 1 0.011014/s kafka_reader_dial_seconds......: avg=4.13µs min=0s med=0s max=1.82ms p(90)=0s p(95)=0s ✓ kafka_reader_error_count.......: 0 0/s kafka_reader_fetch_bytes.......: 7.5 MB 83 kB/s kafka_reader_fetch_bytes_max...: 1000000 min=1000000 max=1000000 kafka_reader_fetch_bytes_min...: 1 min=1 max=1 kafka_reader_fetch_size........: 124995 1376.733793/s kafka_reader_fetch_wait_max....: 10s min=10s max=10s kafka_reader_fetches_count.....: 16 0.176229/s kafka_reader_lag...............: 7400 min=6500 max=14600 kafka_reader_message_bytes.....: 4.0 MB 44 kB/s kafka_reader_message_count.....: 132401 1458.30578/s kafka_reader_offset............: 132400 min=400 max=132400 kafka_reader_queue_capacity....: 100 min=100 max=100 kafka_reader_queue_length......: 100 min=99 max=100 kafka_reader_read_seconds......: avg=127.24ms min=0s med=0s max=4.01s p(90)=0s p(95)=0s kafka_reader_rebalance_count...: 1 0.011014/s kafka_reader_timeouts_count....: 0 0/s kafka_reader_wait_seconds......: avg=32.96µs min=0s med=0s max=1.54ms p(90)=0s p(95)=0s kafka_writer_acks_required.....: 0 min=0 max=0 kafka_writer_async.............: 0.00% ✓ 0 ✗ 490 kafka_writer_attempts_max......: 0 min=0 max=0 kafka_writer_batch_bytes.......: 26 kB 281 B/s kafka_writer_batch_max.........: 1 min=1 max=1 kafka_writer_batch_size........: 490 5.397012/s kafka_writer_batch_timeout.....: 0s min=0s max=0s ✓ kafka_writer_error_count.......: 0 0/s kafka_writer_message_bytes.....: 7.6 MB 84 kB/s kafka_writer_message_count.....: 147000 1619.103705/s kafka_writer_read_timeout......: 0s min=0s max=0s kafka_writer_retries_count.....: 0 0/s kafka_writer_wait_seconds......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s kafka_writer_write_count.......: 147000 1619.103705/s kafka_writer_write_seconds.....: avg=30.37µs min=12.5µs med=23.17µs max=163.86µs p(90)=50.43µs p(95)=78.4µs kafka_writer_write_timeout.....: 0s min=0s max=0s vus............................: 0 min=0 max=50 vus_max........................: 50 min=50 max=50 running (1m30.8s), 00/50 VUs, 441 complete and 49 interrupted iterations default ✓ [======================================] 50 VUs 1m0s ```

I am aware of the reader errors, as you see in the logs. The writer generated 147k messages and the reader consumed 132k. The number of errors and your limit of 300 correlates to the difference between the messages produced vs. consumed. I tweaked the reader config:

import {
  Writer,
  Reader,
  Connection,
  SchemaRegistry,
  CODEC_SNAPPY,
  SCHEMA_TYPE_JSON,
  SECOND,
} from "k6/x/kafka"

const reader = new Reader({
  brokers: brokers,
  groupID: groupID,
  groupTopics: [topic],
  queueCapacity: 300,
  readBatchTimeout: 20 * SECOND,
  maxBytes: 10000000, // 10 MB
  readLagInterval: 10 * SECOND,
})

And this is the best result I can get:

View results ```bash ./k6 run --vus 50 --duration 60s test-kafka.js /\ |‾‾| /‾‾/ /‾‾/ /\ / \ | |/ / / / / \/ \ | ( / ‾‾\ / \ | |\ \ | (‾) | / __________ \ |__| \__\ \_____/ .io execution: local script: /mnt/c/Users/Mostafa Moradian/Desktop/test-kafka.js output: engine scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop): * default: 50 looping VUs for 1m0s (gracefulStop: 30s) ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ERRO[0090] Unable to read messages. error="Unable to read messages." ✓ 10 messages is received ✓ Topic equals to xk6_kafka_consumer_group_topic ✓ Key contains key/value and is JSON ✓ Value contains key/value and is JSON █ teardown checks.........................: 100.00% ✓ 1808 ✗ 0 data_received..................: 0 B 0 B/s data_sent......................: 0 B 0 B/s iteration_duration.............: avg=134.43ms min=122.3ms med=128.82ms max=998.19ms p(90)=138.76ms p(95)=145.67ms iterations.....................: 452 4.971897/s kafka_reader_dial_count........: 1 0.011/s kafka_reader_dial_seconds......: avg=2.98µs min=0s med=0s max=1.35ms p(90)=0s p(95)=0s ✓ kafka_reader_error_count.......: 0 0/s kafka_reader_fetch_bytes.......: 7.8 MB 86 kB/s kafka_reader_fetch_bytes_max...: 10000000 min=10000000 max=10000000 kafka_reader_fetch_bytes_min...: 1 min=1 max=1 kafka_reader_fetch_size........: 130200 1432.170354/s kafka_reader_fetch_wait_max....: 10s min=10s max=10s kafka_reader_fetches_count.....: 10 0.109998/s kafka_reader_lag...............: 8700 min=0 max=14400 kafka_reader_message_bytes.....: 4.1 MB 45 kB/s kafka_reader_message_count.....: 135901 1494.880056/s kafka_reader_offset............: 135900 min=600 max=135900 kafka_reader_queue_capacity....: 300 min=300 max=300 kafka_reader_queue_length......: 300 min=300 max=300 kafka_reader_read_seconds......: avg=125.21ms min=0s med=0s max=6.52s p(90)=0s p(95)=0s kafka_reader_rebalance_count...: 1 0.011/s kafka_reader_timeouts_count....: 0 0/s kafka_reader_wait_seconds......: avg=17.22µs min=0s med=0s max=1.14ms p(90)=0s p(95)=0s kafka_writer_acks_required.....: 0 min=0 max=0 kafka_writer_async.............: 0.00% ✓ 0 ✗ 501 kafka_writer_attempts_max......: 0 min=0 max=0 kafka_writer_batch_bytes.......: 26 kB 287 B/s kafka_writer_batch_max.........: 1 min=1 max=1 kafka_writer_batch_size........: 501 5.510886/s kafka_writer_batch_timeout.....: 0s min=0s max=0s ✓ kafka_writer_error_count.......: 0 0/s kafka_writer_message_bytes.....: 7.8 MB 86 kB/s kafka_writer_message_count.....: 150300 1653.265778/s kafka_writer_read_timeout......: 0s min=0s max=0s kafka_writer_retries_count.....: 0 0/s kafka_writer_wait_seconds......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s kafka_writer_write_count.......: 150300 1653.265778/s kafka_writer_write_seconds.....: avg=34.71µs min=11.38µs med=21.61µs max=406.52µs p(90)=45.03µs p(95)=130.23µs kafka_writer_write_timeout.....: 0s min=0s max=0s vus............................: 0 min=0 max=50 vus_max........................: 50 min=50 max=50 running (1m30.9s), 00/50 VUs, 452 complete and 49 interrupted iterations default ✓ [======================================] 50 VUs 1m0s ```
mostafa commented 1 year ago

I'll close this due to inactivity. Feel free to re-open it if you still have the issue.

SanthoS-web commented 8 months ago

@mostafa After making changes also getting Unable to read messages error right Then what does it mean? How to avoid this error?

SanthoS-web commented 8 months ago

Even in your result also writer message count and reader message count are different