tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 527 forks source link

Incorrect API Versions Used on Clusters with Mixed Kafka Versions #1656

Open gtowne opened 10 months ago

gtowne commented 10 months ago

Describe the bug

While in the middle of upgrading Kafka on our multi node Kafka cluster we noticed the following logs in our Kafka brokers that had yet to be upgraded:

[2024-01-03 12:58:30,560] ERROR Closing socket for <REDACTED>:9092-<REDACTED>:24721-271136307 because of error (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: PRODUCE, apiVersion: 7, connectionId:  <REDACTED>:9092-<REDACTED>:24721-271136307, listenerName: ListenerName(<REDACTED>), principal: User:<REDACTED>
Caused by: java.lang.IllegalArgumentException: Invalid version for API key PRODUCE: 7
...

Correspondingly, we saw an error on our kafkajs application that showed a produce had failed after 4 retry attempts:

{
    level:"ERROR",
    logger:"kafkajs",
    message:"[Producer] Failed to send messages: Closed connection",
    retryCount:4,
    retryTime:3860,
    timestamp:"2024-01-03T12:58:26.692Z"
}

Based on the logs, it seems like kafkajs is sending messages with API versions only supported by the upgraded brokers to brokers that have not been upgraded.

The cause seems to be that the seed broker, which is created using a random broker from the list of brokers specified, is used to set the API versions. This is used to set the API versions for all connections to the other brokers in the Kafka cluster. Essentially, this means that brokers are presumed to support the same API versions as the seed broker even though there are situations where this might not be the case. The end result is that kafkajs runs into kafka interaction issues with clusters that running brokers with different versions such as when Kafka is being upgraded on a multi node cluster.

The fix seems to be as simple as removing the setting of the version when creating the broker connections which should allow the code to discover the correct versions using a given broker's API. However, we have limited experience with the internals of the library so there might be a better way of doing this.

To Reproduce

If none of the above are possible to provide, please write down the exact steps to reproduce the behavior:

  1. Run a Kafka cluster where the brokers are running different versions of Kafka
  2. Attempt to send() messages from a producer to the Kafka cluster
  3. Notice that a percentage of messages fail to send with the Closed connection error above

Expected behavior When sending messages to a Kafka cluster where the brokers are running different versions (eg. in the middle of an upgrade) the protocol version used should be the protocol version of the broker that the message is being sent to.

Observed behavior The message is sent to a broker using the protocol version expected by a random broker which may differ from the one where the message is being sent

Environment: