mostafa / xk6-kafka

k6 extension to load test Apache Kafka with support for various serialization formats, SASL, TLS, compression, Schema Registry client and beyond
Apache License 2.0
150 stars 68 forks source link

Error writing messages: broker appears to be expecting TLS #265

Open dnaby opened 10 months ago

dnaby commented 10 months ago
import { check } from "k6";
import { Writer, Reader, SchemaRegistry, SCHEMA_TYPE_JSON, TLS_1_3 } from "k6/x/kafka";

const tlsConfig = {
  enableTls: true,
  insecureSkipTlsVerify: false,
  minVersion: TLS_1_3,

  clientCertPem: "/Users/mac/go/bin/script_kafka/super-user/user.crt",
  clientKeyPem: "/Users/mac/go/bin/script_kafka/super-user/user.key",
  serverCaPem: "/Users/mac/go/bin/script_kafka/super-user/ca.crt"
};

const writer = new Writer({
  brokers: ["x.x.x.x:9094", "x.x.x.x:9094", "x.x.x.x:9094"],
  topic: "xk6-kafka-test-topic",
  tlsConfig: tlsConfig,
});

const reader = new Reader({
  brokers: ["x.x.x.x:9094", "x.x.x.x:9094", "x.x.x.x:9094"],
  topic: "xk6-kafka-test-topic",
  tlsConfig: tlsConfig,
});

const schemaRegistry = new SchemaRegistry();

export const options = {
  thresholds: {
    kafka_writer_error_count: ["count == 0"],
    kafka_reader_error_count: ["count == 0"],
  },
};

export default function () {
  for (let index = 0; index < 100; index++) {
    let messages = [
      {
        // The data type of the key is JSON
        key: schemaRegistry.serialize({
          data: {
            correlationId: "test-id-abc-" + index,
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
        // The data type of the value is JSON
        value: schemaRegistry.serialize({
          data: {
            name: "xk6-kafka",
            version: "0.9.0",
            author: "Mouhamadou Naby DIA",
            description:
              "k6 extension to load test Apache Kafka",
            index: index,
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
        headers: {
          key: "value",
        },
        offset: index,
        partition: 0,
        time: new Date(), // Will be converted to timestamp automatically
      },
      {
        key: schemaRegistry.serialize({
          data: {
            correlationId: "test-id-def-" + index,
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
        value: schemaRegistry.serialize({
          data: {
            name: "xk6-kafka",
            version: "0.9.0",
            author: "Mouhamadou Naby DIA",
            description:
              "k6 extension to load test Apache Kafka",
            index: index,
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
        headers: {
          key: "value",
        },
      },
    ];

    writer.produce({ messages: messages });
  }

  // Read 10 messages only
  let messages = reader.consume({ limit: 10 });

  check(messages, {
    "10 messages are received": (messages) => messages.length == 10,
  });

  check(messages[0], {
    "Topic equals to xk6_kafka_json_topic": (msg) => msg["topic"] == "xk6-kafka-test-topic",
    "Key contains key/value and is JSON": (msg) =>
      schemaRegistry
        .deserialize({ data: msg.key, schemaType: SCHEMA_TYPE_JSON })
        .correlationId.startsWith("test-id-"),
    "Value contains key/value and is JSON": (msg) =>
      typeof schemaRegistry.deserialize({
        data: msg.value,
        schemaType: SCHEMA_TYPE_JSON,
      }) == "object" &&
      schemaRegistry.deserialize({
        data: msg.value,
        schemaType: SCHEMA_TYPE_JSON,
      }).name == "xk6-kafka",
    "Header equals {'key': 'value'}": (msg) =>
      "key" in msg.headers &&
      String.fromCharCode(...msg.headers["key"]) == "value",
    "Time is past": (msg) => new Date(msg["time"]) < new Date(),
    "Partition is zero": (msg) => msg["partition"] == 0,
    "Offset is gte zero": (msg) => msg["offset"] >= 0,
    "High watermark is gte zero": (msg) => msg["highWaterMark"] >= 0,
  });
}

export function teardown(data) {
  writer.close();
  reader.close();
}

When trying to run the below xk6-kafka script, I got the following error:

❯ ./k6 run script_kafka/topic.js                                                                                                                                                           ─╯

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: script_kafka/topic.js
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
           * default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)

ERRO[0003] Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS  error="Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS"m03.0s), 1/1 VUs, 0 complete and 0 interrupted iterations
ERRO[0003] GoError: Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS
        at github.com/mostafa/xk6-kafka.(*Kafka).writerClass.func1 (native)
        at file:///Users/mac/go/bin/script_kafka/topic.js:89:19(104)  executor=per-vu-iterations scenario=default source=stacktrace

     █ teardown

     data_received......................: 0 B   0 B/s
     data_sent..........................: 0 B   0 B/s
     iteration_duration.................: avg=1.5s min=3.58µs med=1.5s max=3.01s p(90)=2.71s p(95)=2.86s
     iterations.........................: 1     0.331717/s
   ✓ kafka_reader_error_count...........: 0     0/s
     kafka_writer_acks_required.........: 0     min=0      max=0 
     kafka_writer_async.................: 0.00% ✓ 0        ✗ 1   
     kafka_writer_attempts_max..........: 0     min=0      max=0 
     kafka_writer_batch_bytes...........: 0 B   0 B/s
     kafka_writer_batch_max.............: 1     min=1      max=1 
     kafka_writer_batch_queue_seconds...: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_batch_seconds.........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_batch_size............: 0     0/s
     kafka_writer_batch_timeout.........: 0s    min=0s     max=0s
   ✓ kafka_writer_error_count...........: 0     0/s
     kafka_writer_message_bytes.........: 0 B   0 B/s
     kafka_writer_message_count.........: 0     0/s
     kafka_writer_read_timeout..........: 0s    min=0s     max=0s
     kafka_writer_retries_count.........: 0     0/s
     kafka_writer_wait_seconds..........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_write_count...........: 0     0/s
     kafka_writer_write_seconds.........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_write_timeout.........: 0s    min=0s     max=0s
     vus................................: 1     min=1      max=1 
     vus_max............................: 1     min=1      max=1 

running (00m03.0s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs  00m03.0s/10m0s  1/1 iters, 1 per VU

I setup my Kafka cluster on a kubernetes cluster using strimzi operator and I have enabled mTLS(mutual TLS).

mostafa commented 10 months ago

Hey @dnaby,

Are your certificates self-signed? Have you tried setting insecureSkipTlsVerify to true to see if it works? Or even downgrading the TLS version?

Update: I released a new version (v0.22.0) just now. Try that as well to see if it is fixed.

dnaby commented 10 months ago

Hey @mostafa,

I am using certificates coming from strimzi. They offer an automated certificate management system.

I have tried all these but it's not working. Also the cluster on which I installed kafka is on premise. And lately there has been added a security that block remote ssh connection to the nodes(if the access has not been granted). So I am waiting to get the authorization in order to see if the problem is not coming from there.

mostafa commented 10 months ago

@dnaby Can you connect the logger to both instances of Writer (in WriterConfig.connectLogger) and Reader (in ReaderConfig.connectLogger) to see what's happening behind the scenes?

dnaby commented 10 months ago
❯ ./k6 run script_kafka/kafka.js                                                                                                                                                           ─╯

          /\      |‾‾| /‾‾/   /‾‾/   
     /\  /  \     |  |/  /   /  /    
    /  \/    \    |     (   /   ‾‾\  
   /          \   |  |\  \ |  (‾)  | 
  / __________ \  |__| \__\ \_____/ .io

  execution: local
     script: script_kafka/kafka.js
     output: -

  scenarios: (100.00%) 1 scenario, 1 max VUs, 10m30s max duration (incl. graceful stop):
           * default: 1 iterations for each of 1 VUs (maxDuration: 10m0s, gracefulStop: 30s)

INFO[0000] {"This":{"addr":[{},{},{}],"topic":"xk6_kafka_json_topic","balancer":{},"max_attempts":0,"write_backoff_min":0,"write_backoff_max":0,"batch_size":1,"batch_bytes":0,"batch_timeout":0,"read_timeout":0,"write_timeout":0,"required_acks":0,"async":false,"compression":0,"logger":{"out":{},"hooks":{},"formatter":{"force_colors":false,"disable_colors":false,"force_quote":false,"disable_quote":false,"environment_override_colors":false,"disable_timestamp":false,"full_timestamp":false,"timestamp_format":"","disable_sorting":false,"disable_level_truncation":false,"pad_level_text":false,"quote_empty_fields":false,"field_map":{}},"report_caller":false,"level":4,"buffer_pool":null},"error_logger":null,"transport":{"dial_timeout":0,"idle_timeout":0,"metadata_ttl":0,"metadata_topics":[],"client_id":"","tls":null,"s_a_s_l":null,"resolver":null,"context":null},"allow_auto_topic_creation":false}}  source=console
INFO[0000] {"This":{}}                                   source=console
ERRO[0003] Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS  error="Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS"m03.0s), 1/1 VUs, 0 complete and 0 interrupted iterations
ERRO[0003] GoError: Error writing messages., OriginalError: unexpected EOF: broker appears to be expecting TLS
        at github.com/mostafa/xk6-kafka.(*Kafka).writerClass.func1 (native)
        at file:///Users/mac/go/bin/script_kafka/kafka.js:92:19(114)  executor=per-vu-iterations scenario=default source=stacktrace

     █ teardown

     data_received......................: 0 B         0 B/s
     data_sent..........................: 0 B         0 B/s
     iteration_duration.................: avg=1.5s min=15.2µs med=1.5s max=3.01s p(90)=2.71s p(95)=2.86s
     iterations.........................: 1           0.33155/s
   ✓ kafka_reader_error_count...........: 0           0/s
     kafka_writer_acks_required.........: 0           min=0           max=0          
     kafka_writer_async.................: 0.00%       ✓ 0             ✗ 1            
     kafka_writer_attempts_max..........: 10          min=10          max=10         
     kafka_writer_batch_bytes...........: 0 B         0 B/s
     kafka_writer_batch_max.............: 1           min=1           max=1          
     kafka_writer_batch_queue_seconds...: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_batch_seconds.........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_batch_size............: 0           0/s
     kafka_writer_batch_timeout.........: 277h46m40s  min=277h46m40s  max=277h46m40s 
   ✓ kafka_writer_error_count...........: 0           0/s
     kafka_writer_message_bytes.........: 0 B         0 B/s
     kafka_writer_message_count.........: 0           0/s
     kafka_writer_read_timeout..........: 2777h46m40s min=2777h46m40s max=2777h46m40s
     kafka_writer_retries_count.........: 0           0/s
     kafka_writer_wait_seconds..........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_write_count...........: 0           0/s
     kafka_writer_write_seconds.........: avg=0s   min=0s     med=0s   max=0s    p(90)=0s    p(95)=0s   
     kafka_writer_write_timeout.........: 2777h46m40s min=2777h46m40s max=2777h46m40s
     vus................................: 1           min=1           max=1          
     vus_max............................: 1           min=1           max=1          

running (00m03.0s), 0/1 VUs, 1 complete and 0 interrupted iterations
default ✓ [======================================] 1 VUs  00m03.0s/10m0s  1/1 iters, 1 per VU

Here is the output when I set the connectLogger to true and I log the writer and reader.