mostafa / xk6-kafka

k6 extension to load test Apache Kafka with support for various serialization formats, SASL, TLS, compression, Schema Registry client and beyond
Apache License 2.0
146 stars 62 forks source link

Kafka connection failure in K6 tests #253

Closed PrabhanjanDesai closed 9 months ago

PrabhanjanDesai commented 11 months ago

Hi Team, I am facing this error during execution of my test case related to Strimzi Kafka. Basically the test case tries to produce the data on a topic.

2023-09-25 06:56:30,593 | [31mERROR   [0m | time="2023-09-25T06:56:30Z" level=error msg="Failed to get controller., OriginalError: %!w(*fmt.wrapError=&{failed to dial: failed to open connection to srini-project-kf-op-sz-kafka-2.srini-project-kf-op-sz-kafka-brokers.srini-deploy-b.svc:9092: dial tcp 192.168.67.221:9092: i/o timeout 0xc011f7e5c0})" error="Failed to get controller., OriginalError: %!w(*fmt.wrapError=&{failed to dial: failed to open connection to srini-project-kf-op-sz-kafka-2.srini-project-kf-op-sz-kafka-brokers.srini-deploy-b.svc:9092: dial tcp 192.168.67.221:9092: i/o timeout 0xc011f7e5c0})"[0m
2023-09-25 06:56:30,594 | [31mERROR   [0m | time="2023-09-25T06:56:30Z" level=error msg="Failed to get controller., OriginalError: %!w(*fmt.wrapError=&{failed to dial: failed to open connection to srini-project-kf-op-sz-kafka-2.srini-project-kf-op-sz-kafka-brokers.srini-deploy-b.svc:9092: dial tcp 192.168.67.221:9092: operation was canceled 0xc011f7e9c0})" error="Failed to get controller., OriginalError: %!w(*fmt.wrapError=&{failed to dial: failed to open connection to srini-project-kf-op-sz-kafka-2.srini-project-kf-op-sz-kafka-brokers.srini-deploy-b.svc:9092: dial tcp 192.168.67.221:9092: operation was canceled 0xc011f7e9c0})"[0m

The above logs are observed in my producer test cases. I don't see any errors in Strimzi Kafka pods during this time interval. Also this is intermittently happening. Any idea ?. As this is connecting on 9092 non-tls port/internal port and intermittently failing in same cluster, Need some inputs on debugging the same.

Kafka broker service -

Name:              srini-project-kf-op-sz-kafka-brokers
Namespace:         srini-deploy-b
Labels:            [app.kubernetes.io/instance=srini-project-kf-op-sz](http://app.kubernetes.io/instance=srini-project-kf-op-sz)
                   [app.kubernetes.io/managed-by=strimzi-cluster-operator](http://app.kubernetes.io/managed-by=strimzi-cluster-operator)
                   [app.kubernetes.io/name=kafka](http://app.kubernetes.io/name=kafka)
                   [app.kubernetes.io/part-of=strimzi-srini-project-kf-op-sz](http://app.kubernetes.io/part-of=strimzi-srini-project-kf-op-sz)
                   chart=srini-project-kf-op-sz-1.1.0-42
                   helm.sh/chart=srini-project-kf-op-sz-1.1.0-42
                   [strimzi.io/cluster=srini-project-kf-op-sz](http://strimzi.io/cluster=srini-project-kf-op-sz)
                   [strimzi.io/kind=Kafka](http://strimzi.io/kind=Kafka)
                   [strimzi.io/name=srini-project-kf-op-sz-kafka](http://strimzi.io/name=srini-project-kf-op-sz-kafka)
Annotations:       [service.alpha.kubernetes.io/tolerate-unready-endpoints](http://service.alpha.kubernetes.io/tolerate-unready-endpoints): true
Selector:          [strimzi.io/cluster=srini-project-kf-op-sz,strimzi.io/kind=Kafka,strimzi.io/name=srini-project-kf-op-sz-kafka](http://strimzi.io/cluster=srini-project-kf-op-sz,strimzi.io/kind=Kafka,strimzi.io/name=srini-project-kf-op-sz-kafka)
Type:              ClusterIP
IP Family Policy:  SingleStack
IP Families:       IPv4
IP:                None
IPs:               None
Port:              tcp-ctrlplane  9090/TCP
TargetPort:        9090/TCP
Endpoints:         192.168.227.22:9090,192.168.233.232:9090,192.168.67.221:9090
Port:              tcp-replication  9091/TCP
TargetPort:        9091/TCP
Endpoints:         192.168.227.22:9091,192.168.233.232:9091,192.168.67.221:9091
Port:              tcp-clients  9092/TCP
TargetPort:        9092/TCP
Endpoints:         192.168.227.22:9092,192.168.233.232:9092,192.168.67.221:9092
Session Affinity:  None

Kafka custom resource -

Kafka:
   Authorization:
     Super Users:
       ANONYMOUS
     Type: simple
   Config:
     default.replication.factor:               3
     inter.broker.protocol.version:            3.3
     min.insync.replicas:                      2
     offsets.topic.replication.factor:         3
     Retries:                                  5
     [retry.backoff.ms](http://retry.backoff.ms/):                         1000
     transaction.state.log.min.isr:            2
     transaction.state.log.replication.factor: 3
   Image:                                      ***********
   Listeners:
     Name: plain
     Port: 9092
     Tls:  false
     Type: internal

I am using following in K6 tests

image

mostafa commented 11 months ago

@PrabhanjanDesai

Both the Writer and Reader objects have a connectLogger boolean property you can set to see the internal logs of the Kafka library used in this extension, which is kafka-go. Consider setting that property and observing what happens. Also, if the connection timeouts happen intermittently, and not constantly, it might mean that your Kafka instance might not have enough resources to handle that many connections.

PrabhanjanDesai commented 11 months ago

Thanks @mostafa . Issue doesn't seem to be with resources here, issue mainly occurs while we are establishing connection and not while initializing reader/writer.

image

mostafa commented 11 months ago

@PrabhanjanDesai

I faced this issue as well, but since it's intermittent, it makes it very hard to reproduce and debug. I'd be happy to know how I can reproduce this.

PrabhanjanDesai commented 11 months ago

@mostafa Unfortunately this issue occurs in only few clusters, cant provide anything that can help you reproduce. I would like to know two things -

  1. Is this an issue with xk6 module ?
  2. Currently I have added connectlogger to reader and writer only. Can we add something similar to connectlogger while initializing connection also, which may help us to find the root cause? kafka
PrabhanjanDesai commented 11 months ago

Any update @mostafa on my latest questions ?

PrabhanjanDesai commented 11 months ago

Hi @mostafa - Could you please provide your inputs?

mostafa commented 11 months ago

@PrabhanjanDesai

  1. It probably is. I am using the same exact dialer for the connection class and there is always an extra step to connect to the cluster controller and it sometimes fail. 🤷
  2. Unlike the reader and writer of the kafka-go API, the connection is a low-level object and it doesn't have a logger property, so I can enable it via a boolean such as connectLogger in xk6-kafka code. The only possible way is to add heavy logging/printing to the code I referenced in item 1 and then build a custom version of k6 with xk6-kafka using xk6.

As I also mentioned above, this is hard to reproduce.

mostafa commented 9 months ago

Hey,

I suppose your question is answered, I'll close this issue. If you have further questions related to the same issue, please re-open this, otherwise open a new one.

Madhankumar11 commented 5 months ago

hi @mostafa i get the error in ERRO[0000] TypeError: Value is not an object: undefined at file:///C:/kafka/k6-kafka/kafka.js:8:24(39) hint="script exception" after run the code

import { check } from 'k6';
import { writer, produce, reader, createTopic } from 'k6/x/kafka';

const bootstrapServers = ['localhost:9092'];
const kafkaTopic = 'xk6_kafka_json_topic';

// Initialize Kafka producer and consumer
const producer = writer(bootstrapServers, kafkaTopic);
const consumer = reader(bootstrapServers, kafkaTopic);

// Create Kafka topic
createTopic(bootstrapServers[0], kafkaTopic);

// Function to generate random integer
function getRandomInt(max = 1000) {
    return Math.floor(Math.random() * max + 1);
}

// Main function for load testing
export default function () {
    // Generate messages
    const messages = [
        {
            key: JSON.stringify({
                correlationId: 'test-id-sql-' + getRandomInt(),
            }),
            value: JSON.stringify({
                title: 'Load Testing SQL Databases with k6',
                url: 'https://k6.io/blog/load-testing-sql-databases-with-k6/',
                locale: 'en',
            }),
        },
        {
            key: JSON.stringify({
                correlationId: 'test-id-redis-' + getRandomInt(),
            }),
            value: JSON.stringify({
                title: 'Benchmarking Redis with k6',
                url: 'https://k6.io/blog/benchmarking-redis-with-k6/',
                locale: 'en',
            }),
        },
    ];

    // Produce messages
    const error = produce(producer, messages);
    check(error, {
        'messages sent successfully': (err) => err === null,
    });
}

// Teardown function to close producer and consumer connections
export function teardown(data) {
    producer.close();
    consumer.close();
}
mostafa commented 5 months ago

Hey @Madhankumar11,

You are using a very old syntax to create topics. Consider updating your code according to this example.