confluentinc / confluent-kafka-javascript

Confluent's Apache Kafka JavaScript client
https://www.npmjs.com/package/@confluentinc/kafka-javascript
MIT License
91 stars 5 forks source link

Confluent connections interfere with KafkaJS connections when SSL is involved #55

Open apeloquin-agilysys opened 2 weeks ago

apeloquin-agilysys commented 2 weeks ago

This is an odd one.

We're attempting to phase-in use of the Confluent library to our app for new functionality. This app has existing functionality using KafkaJS, and the existing functionality is in production. Given the "early access" nature of the Confluent library, it makes sense to adopt it for the new functionality, but retain KafkaJS for the existing functionality for now.

The new functionality was added, integration tests showed everything working, but as soon as we deployed to an environment using external Kafka cluster(s) the KafkaJS connections immediately started failing with:

Connection error: Client network socket disconnected before secure TLS connection was established {"broker":"XXXX.westus2.azure.confluent.cloud:9092","clientId":"kafkajs"}: Error: Client network socket disconnected before secure TLS connection was established

I narrowed it down to starting multiple consumers for both KafkaJS and Confluent concurrently... with SSL connections.

I created a test that replicates the scenario by creating consumers for 5 topics each with KafkaJS and Confluent concurrently on a Confluent Cloud cluster. There are also tests that run just 5 KafkaJS consumers or just 5 Confluent consumers to prove there is no issue when run independently.

Notes:

import {KafkaJS as Confluent} from "@confluentinc/kafka-javascript";
import {Logger} from "@confluentinc/kafka-javascript/types/kafkajs.js";
import {fail} from "assert";
import {Consumer, ConsumerGroupJoinEvent, Kafka, logLevel} from "kafkajs";

const KAFKA_JS_TOPICS: string[] = [
  "test-kafkajs-topic",
  "test-kafkajs-topic-2",
  "test-kafkajs-topic-3",
  "test-kafkajs-topic-4",
  "test-kafkajs-topic-5"
];

const CONFLUENT_TOPICS: string[] = [
  "test-confluent-topic",
  "test-confluent-topic-2",
  "test-confluent-topic-3",
  "test-confluent-topic-4",
  "test-confluent-topic-5"
];

describe("Supports KafkaJS and Confluent consumers", async () => {
  let confluentConsumers: Confluent.Consumer[] = [];
  let kafkaJSConsumers: Consumer[] = [];

  afterEach(async () => {
    const promises: Promise<void>[] = [];
    for (const consumer of kafkaJSConsumers) {
      promises.push(consumer.disconnect());
    }
    for (const consumer of confluentConsumers) {
      promises.push(consumer.disconnect());
    }
    await Promise.all(promises);
    confluentConsumers = [];
    kafkaJSConsumers = [];
  });

  it("Handles concurrent startup of multiple KafkaJS consumers", async () => {
    await doTest(KAFKA_JS_TOPICS, []);
  });

  it("Handles concurrent startup of multiple Confluent consumers", async () => {
    await doTest([], CONFLUENT_TOPICS);
  });

  it("Handles concurrent startup of multiple KafkaJS and Confluent consumers", async () => {
    await doTest(KAFKA_JS_TOPICS, CONFLUENT_TOPICS);
  });

  async function doTest(kafkaJSTopics: string[], confluentTopics: string[]) {
    const kafkaJSKafka = new Kafka({
      brokers: ["XXXX.westus2.azure.confluent.cloud:9092"],
      ssl: true,
      sasl: {
        mechanism: "plain",
        username: "XXXX",
        password: "XXXX"
      },
      logLevel: logLevel.INFO,
      logCreator: kafkaLevel => {
        return entry => {
          const {timestamp, logger, message, stack, ...others} = entry.log;
          console.log(`[KafkaJS:${entry.namespace}] ${message} ${JSON.stringify(others)}${stack ? `: ${stack}` : ""}`);
        };
      }
    });

    const confluentKafka = new Confluent.Kafka({
      kafkaJS: {
        brokers: ["XXXX.westus2.azure.confluent.cloud:9092"],
        ssl: true,
        sasl: {
          mechanism: "plain",
          username: "XXXX",
          password: "XXXX"
        },
        logLevel: Confluent.logLevel.INFO,
        logger: new ConfluentLogger()
      }
    });

    kafkaJSConsumers = [];
    let kafkaJSConnected: number = 0;
    setImmediate(async () => {
      for (const topic of kafkaJSTopics) {
        const kafkaJSConsumer = kafkaJSKafka.consumer({groupId: `${topic}-group`});
        kafkaJSConsumer.on(kafkaJSConsumer.events.GROUP_JOIN, (event: ConsumerGroupJoinEvent) => {
          kafkaJSConnected++;
        });
        await kafkaJSConsumer.connect();
        await kafkaJSConsumer.subscribe({topic});
        await kafkaJSConsumer.run({
          eachMessage: async ({message}) => {}
        });
        kafkaJSConsumers.push(kafkaJSConsumer);
      }
    });

    confluentConsumers = [];
    let confluentConnected: number = 0;
    setImmediate(async () => {
      for (const topic of confluentTopics) {
        const confluentConsumer = confluentKafka.consumer({kafkaJS: {groupId: `${topic}-group`}});
        await confluentConsumer.connect();
        confluentConnected++;
        await confluentConsumer.subscribe({topic});
        await confluentConsumer.run({
          eachMessage: async ({message}) => {}
        });
        confluentConsumers.push(confluentConsumer);
      }
    });

    await until(async () => confluentTopics.length == confluentConnected);
    for (const consumer of confluentConsumers) {
      await until(async () => consumer.assignment().length > 0);
    }
    await until(async () => kafkaJSTopics.length == kafkaJSConnected);
  }
});

class ConfluentLogger implements Logger {
  private logLevel: Confluent.logLevel;

  constructor() {
    this.logLevel = Confluent.logLevel.INFO;
  }

  setLogLevel(logLevel: Confluent.logLevel) {
    this.logLevel = logLevel;
  }

  info = (message: string, extra?: object) => this.doLog(Confluent.logLevel.INFO, message, extra);
  error = (message: string, extra?: object) => this.doLog(Confluent.logLevel.ERROR, message, extra);
  warn = (message: string, extra?: object) => this.doLog(Confluent.logLevel.WARN, message, extra);
  debug = (message: string, extra?: object) => this.doLog(Confluent.logLevel.DEBUG, message, extra);

  namespace() {
    return this;
  }

  private doLog(level: Confluent.logLevel, message: string, extra?: object) {
    if (this.logLevel >= level) {
      console.log(`[ConfluentKafka] ${message}${extra ? ` ${JSON.stringify(extra)}` : ""}`);
    }
  }
}

async function until(condition: () => Promise<boolean>) {
  const timeout = 30000;
  const finish = Date.now() + timeout;
  while (Date.now() <= finish) {
    const result = await condition();
    if (result) return;
    await new Promise(resolve => setTimeout(resolve, 500));
  }
  fail(`Failed within ${timeout!}ms`);
}

The test for both ultimately fails to connect all the consumers and on the KafkaJS side produces many occurrences of this error (which is not present when running KafkaJS only):

[KafkaJS:Connection] Connection error: Client network socket disconnected before secure TLS connection was established {"broker":"XXXX.westus2.azure.confluent.cloud:9092","clientId":"kafkajs"}: Error: Client network socket disconnected before secure TLS connection was established
    at connResetException (node:internal/errors:787:14)
    at TLSSocket.onConnectEnd (node:_tls_wrap:1727:19)
    at TLSSocket.emit (node:events:530:35)
    at endReadableNT (node:internal/streams/readable:1696:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)

It would be helpful to understand what is conflicting here and if it can be prevented on the Confluent side or if there is a way to work around it.

I have confirmed that if I start the KafkaJS consumers before the Confluent consumers, the KafkaJS connections succeed. This is not viable in a real-world scenario however, because if later on the connection is dropped and the consumer tries to reconnect it will encounter this same issue.

milindl commented 1 week ago

Hi @apeloquin-agilysys , thanks for the detailed steps as always.

I could reproduce this with Confluent Cloud and the given test, using node's built-in test-runner.

I dug in and investigated it. I ran only "Handles concurrent startup of multiple KafkaJS consumers", and I found that it encountered the same error as described in the issue, intermittently (Confluent Kafka Javascript consumers are not running here at all, only KafkaJS consumers).

I increased the number of topics (and thus consumers) and made the connection of the clients concurrent for KafkaJS (rather than one after the other) until it started failing consistently with the same issue. Based on a few other bugs filed, like this one:

What's happening is that we enforce the timeout after 1 second, which closes the socket from the client end. That in turn causes the socket to emit an error because, well... The client network socket disconnected before secure TLS connection was established, just like the error says.

As far as I understand, TLS connection establishment takes possibly >1s, and in that case, KafkaJS will spawn another connection on this timeout. Confluent Cloud also has some throttling built in for preventing these connection storm situations, and that just makes it worse.

To fix this and make the test run, I added this to the KafkaJS configuration, connectionTimeout: 30000 (The exact value might be different for your case, I think starting from 10s and going upwards would make sense). Let me know if that helps you, too.

This started making the test pass for me, there are no more of the TLS issues.

I did need to increase the timeout for until() and --test-timeout, however, when I increased the number of topics to 10.

We have actually created a PR with KafkaJS for changing this default, but it's not been accepted yet.

As for why running it with Confluent consumers makes a difference as opposed to your normal case, I think it's possible that the additional number of connections from the Confluent consumers is something that makes the KafkaJS connections take a little more time, causing the reconnects and the throttling. But that's a guess at the moment.