tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 526 forks source link

KafkaJSLockTimeout when producing large amount of messages at once with SASL/OAUTHBEARER #1039

Open j-a-h-i-r opened 3 years ago

j-a-h-i-r commented 3 years ago

This issue is extremely close to the https://github.com/tulios/kafkajs/issues/554. The difference is that I am using SASL/OAUTHBEARER for authentication.

Some background I am watching a directory using chokidar. When a new file arrives in the directory, it is immediately sent to Kafka. I am currently stress testing my Kafka setup. For this, I am copying some (~1000) files to the directory using a bash script. Unfortunately, this is causing errors and only a fraction of files (200-400) are ending up in Kafka.

Errors faced Below is the list of errors being thrown from KafkaJS. I have tried to add as many errors as possible. I have formatted the errors for brevity.

{"level":"ERROR",
"message": "[BrokerPool] KafkaJSLockTimeout: Timeout while acquiring lock (55 waiting locks): "connect to broker localhost:9093"",
"retryCount": 0, "retryTime": 311, 
"stack": "KafkaJSLockTimeout: Timeout while acquiring lock (55 waiting locks):  "connect to broker localhost:9093"
at Timeout._onTimeout (node_modules\\kafkajs\\src\\utils\\lock.js:48:23)}
{"level": "ERROR", "message": "[Connection] Connection timeout", "broker": "localhost:9093", "clientId":"test-producer"}
{"level":"ERROR",
"message":"[SASLOAuthBearerAuthenticator] SASL OAUTHBEARER authentication failed: Not connected", "broker": "localhost:9093"}

{"level":"ERROR",
"message":"[BrokerPool] KafkaJSSASLAuthenticationError: SASL OAUTHBEARER authentication failed: Not connected",
"retryCount":0, "retryTime":306,
"stack": "KafkaJSSASLAuthenticationError: SASL OAUTHBEARER authentication failed: Not connected
at OAuthBearerAuthenticator.authenticate (node_modules\\kafkajs\\src\\broker\\saslAuthenticator\\oauthBearer.js:49:21)
---- snipped -----
at async sendBatch (node_modules\\kafkajs\\src\\producer\\messageProducer.js:95:12)"}
error:  KafkaJSNonRetriableError
  Caused by: KafkaJSLockTimeout: Timeout while acquiring 
lock (107 waiting locks): "connect to broker localhost:9093"
at Timeout._onTimeout (node_modules\kafkajs\src\utils\lock.js:48:23)
at listOnTimeout (internal/timers.js:554:17)
at processTimers (internal/timers.js:497:7) {
  name: 'KafkaJSNumberOfRetriesExceeded',
  retriable: false,
  helpUrl: undefined,
  originalError: KafkaJSLockTimeout: Timeout while acquiring lock (107 waiting locks): "connect to broker localhost:9093"
      at Timeout._onTimeout (node_modules\kafkajs\src\utils\lock.js:48:23)
      at listOnTimeout (internal/timers.js:554:17)
      at processTimers (internal/timers.js:497:7) {
    retriable: false,
    helpUrl: undefined
  },
  retryCount: 0,
  retryTime: 291
}
{"level":"ERROR", "message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout", "retryCount":0,"retryTime":287}
{"level":"ERROR",
"message":"[Connection] Connection error: write after end",
"broker": "localhost:9093","clientId": "test-producer",
"stack":"Error [ERR_STREAM_WRITE_AFTER_END]: write after end
at Socket.Writable.write (_stream_writable.js:292:11)
at Object.sendRequest (node_modules\\kafkajs\\src\\network\\connection.js:312:27)
--- snipped ---
at sendRequest (node_modules\\kafkajs\\src\\network\\connection.js:302:14)
at async Connection.send (node_modules\\kafkajs\\src\\network\\connection.js:321:53)"}
error:  KafkaJSNonRetriableError
  Caused by: KafkaJSProtocolError: Request is not valid given the current SASL state
    at createErrorFromCode (node_modules\kafkajs\src\protocol\error.js:581:10)
    at Object.parse (node_modules\kafkajs\src\protocol\requests\metadata\v0\response.js:56:11)
    at Connection.send (node_modules\kafkajs\src\network\connection.js:336:35)
    --- snipped ---
    at async Cluster.refreshMetadata (node_modules\kafkajs\src\cluster\index.js:134:5)
    at async Cluster.findBroker (node_modules\kafkajs\src\cluster\index.js:221:9) {
  name: 'KafkaJSNumberOfRetriesExceeded', retriable: false, helpUrl: undefined,
  originalError: KafkaJSProtocolError: Request is not valid given the current SASL state
      at createErrorFromCode (node_modules\kafkajs\src\protocol\error.js:581:10)
      --- snipped --
      at async Cluster.findBroker (\node_modules\kafkajs\src\cluster\index.js:221:9) {
    retriable: false,
    helpUrl: undefined,
    type: 'ILLEGAL_SASL_STATE',
    code: 34
  },
  retryCount: 0,
  retryTime: 244
}

I have logged when the oauthBearerProvider gets called for refreshing the token. Ideally, it should be run every 50 seconds. But from the log, I saw that sometimes it was being called simultaneously,

[Thu Mar 04 2021 23:47:46 GMT+0600] - Fetching token from http://keycloak:8080
[Thu Mar 04 2021 23:47:46 GMT+0600] - Fetching token from http://keycloak:8080
[Thu Mar 04 2021 23:47:46 GMT+0600] - Fetching token from http://keycloak:8080

Besides, I have noticed the following errors on the Kafka broker log,

broker             | java.lang.IllegalStateException: Unexpected ApiVersions request received during SASL authentication state HANDSHAKE_REQUEST
broker             |   at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleApiVersionsRequest(SaslServerAuthenticator.java:562)
broker             |   at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleKafkaRequest(SaslServerAuthenticator.java:499)
broker             |   at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:259)
broker             |   at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:177)
broker             |   at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
broker             |   at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
broker             |   at kafka.network.Processor.poll(SocketServer.scala:893)
broker             |   at kafka.network.Processor.run(SocketServer.scala:792)
broker             |   at java.lang.Thread.run(Thread.java:748)
broker             | [2021-02-25 10:40:11,239] WARN [SocketServer brokerId=1] Unexpected error from /172.18.0.1; closing connection (org.apache.kafka.common.network.Selector)
INFO [SocketServer brokerId=1] Failed re-authentication with /172.18.0.1 (Unexpected Kafka request of type SASL_HANDSHAKE during SASL authentication.) (org.apache.kafka.common.network.Selector)
[2021-03-03 18:14:44,782] INFO [SocketServer brokerId=1] Failed re-authentication with /172.18.0.1 (Unexpected Kafka request of type PRODUCE during SASL authentication.) (org.apache.kafka.common.network.Selector)

I think the error is happening because KafkaJS is trying to produce value during the re-authentication of the OAuth token. I have no idea how I can fix this.

For comparison, I tried the same thing with the Java client and it can send thousands of messages to the same Kafka broker without any issues.

For example, here is the debug log printed by the Java producer during re-authentication,


2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to SEND_APIVERSIONS_REQUEST
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:178 - Creating SaslClient: client=null;service=kafka;serviceHostname=localhost;mechs=[OAUTHBEARER]
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG OAuthBearerSaslClient:155 - Setting SASL/OAUTHBEARER client state to SEND_CLIENT_FIRST_MESSAGE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to REAUTH_SEND_HANDSHAKE_REQUEST
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to REAUTH_INITIAL
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG OAuthBearerSaslClient:155 - Setting SASL/OAUTHBEARER client state to RECEIVE_SERVER_FIRST_MESSAGE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to INTERMEDIATE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG OAuthBearerSaslClient:109 - Successfully authenticated as test-producer
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG OAuthBearerSaslClient:155 - Setting SASL/OAUTHBEARER client state to COMPLETE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to COMPLETE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:620 - Finished re-authentication with session expiration in 55385 ms and session re-authentication on or after 49950 ms
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG Selector:553 - [Producer clientId=test-producer] Successfully re-authenticated with localhost/127.0.0.1

Notice the last message. A similar flow happens with KafkaJS too. But the broker somehow throws the previously mentioned error

java.lang.IllegalStateException: Unexpected ApiVersions request received during SASL authentication state HANDSHAKE_REQUEST

Kafka Setup

I am running Kafka in docker using the confluentinc/cp-kafka:5.5.0 image. My sample broker setup is below,

broker:
    image: confluentinc/cp-kafka:5.5.0
    ports:
      - 29092:29092
      - 9093:9093
    environment:
      # --- common properties --- #
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CLIENT:SASL_PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://broker:29092,CLIENT://broker:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,CLIENT://localhost:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
      ## OAuth Bearer SASL
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: OAUTHBEARER
      KAFKA_SASL_ENABLED_MECHANISMS: OAUTHBEARER
      KAFKA_CONNECTIONS_MAX_REAUTH_MS: 60000

Producer code

const { Kafka } = require('kafkajs');

const BROKER = 'localhost:9093';

const kafka = new Kafka({
    clientId: "<client ID>",
    brokers: [ "localhost:9093" ],
    ssl: false,
    sasl: {
        mechanism: 'oauthbearer',
        oauthBearerProvider: async () => {
            // fetch token from keycloak. code omitted.
            return {
                    value: token
            }
        }
    },
})

const producer = kafka.producer();

producer.connect()
.then((value) => console.log("Producer connected"))
.catch((err) => console.error("Failed to connect", err));

const sendDataToTopic = async (data, topic) => {
    return producer.send({
        topic: topic,
        messages: data
    })
    .then((resp) => {
        console.log('producerData: ', resp);
    })
    .catch((err) => {
        console.error('error: ', err);
    })
}

module.exports = {
    sendDataToTopic,
}

To Reproduce Call the sendDataToTopic method in a loop (~1000 iterations)

Expected behavior All the messages are sent to Kafka and connection reauthentication happens automatically.

Environment:

sp-suresh commented 3 years ago

I was facing similar issues while load testing, did you try producing in batch? It has helped me to avoid producing it multiple times.

j-a-h-i-r commented 3 years ago

@sp-suresh yes, I tried producing in batch. Still faced the same issue :( Are you using OAUTHBEARER too?

sp-suresh commented 3 years ago

No, I am not using OAUTHBEARER. Also, worth mentioning that connectionTimeout: 4000 and authenticationTimeout: 4000 has been a relief too. https://github.com/tulios/kafkajs/issues/554#issuecomment-568168042

j-a-h-i-r commented 3 years ago

Thanks for the info. I went through that issue multiple times and tried different configs without any success. I'll try again with your suggestions to see if the situation improves.

kzay commented 3 years ago

I have exactly the same issue when trying to push data coming from websocket to kafka cluster.

imeout while acquiring lock (712 waiting locks): "updating target topics" {"name":"KafkaJSNumberOfRetriesExceeded","retriable":false,"originalError":{"name":"KafkaJSLockTimeout","retriable":false},"retryCount":0,"retryTime":281,"stack":"KafkaJSNonRetriableError\n Caused by: KafkaJSLockTimeout: Timeout while acquiring lock (712 waiting locks): \"updating target topics\"\n at Timeout._onTimeout (F:\Environement\git-repo\crypto-feeds-api\node_modules\kafkajs\src\utils\lock.js:48:23)\n at listOnTimeout (internal/timers.js:549:17)\n at processTimers (internal/timers.js:492:7)"}

safonovklim commented 3 years ago

@j-a-h-i-r do you have updates for this issue?

j-a-h-i-r commented 3 years ago

@safonovklim Unfortunately no. I moved on to other tasks and haven't revisited this one. Are you also using SASL/OAUTHBEARER?

assaf-xm commented 3 years ago

Regarding the timeout issue @kzay reported above, like: KafkaJSLockTimeout: Timeout while acquiring lock (2162 waiting locks): "updating target topics"

It looks like this specific lock timeout error is coming from the 'mutatingTargetTopics' lock in 'cluster' module. Each message publish takes and releases this lock before the actual publish. And with a high publish parallelism, too many pending contexts cause the lock to be slower (~few thousands). Increasing the requestTimeout can avoid these errors, but the lock is not really needed, at least for 99% of the calls. Here is a PR that can avoid the lock when possible: https://github.com/tulios/kafkajs/pull/1185

tayclark commented 2 years ago

Any update on this? We're also running into the same java.lang.IllegalStateException: Unexpected ApiVersions request received during SASL authentication state HANDSHAKE_REQUEST every 60 seconds on all brokers

liuchengts commented 1 month ago

May I ask if this issue has been resolved? I tried adding the timeout attribute in producer.send, but it didn't solve the problem