SOHU-Co / kafka-node

Node.js client for Apache Kafka 0.8 and later.
MIT License
2.66k stars 628 forks source link

kafka client is sending request to broker which went down #1107

Open shashanksah opened 6 years ago

shashanksah commented 6 years ago

Bug Report

When a broker host goes down/restarts, the kafka client keeps sending the new requests to the same hosts. As a result, the requests are failing with this error: Request timed out after 30000ms

Environment

For specific cases also provide

Include Sample Code to reproduce behavior


"clusterConfig" : {
    "kafkaHost": "localhost:9092,localhost:9093,localhost:9094",
    "autoConnect": true
  }
...
let kafkaClient = new kafka.KafkaClient(clusterConfig);
      producer = new kafka.HighLevelProducer(kafkaClient, cluster.producerConfig);
      Promise.promisifyAll(producer);
...
producer.sendAsync([eventPayload])
            .then(function (data) {
              let topicName = eventPayload.topic;
              let payLoadSize = (eventPayload || '').length;
              logger.eventInfo(topicName, payLoadSize, source);
            })
            .catch(function (e) {
              logger.produceFailedEvent(eventPayload, source);
              throw Error.getErrorObject(errorType, e, topic, source);
            });

Include output with Debug turned on

kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9093 +1s kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9093 +1s kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9093 +1s kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9093 +1s kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9093 +1s kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9093 +1s kafka-node:KafkaClient compressing messages if needed +787ms kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +1ms kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9093 +217ms kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9093 +1s kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9093 +1s kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9093 +1s kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9093 +1s kafka-node:KafkaClient compressing messages if needed +528ms kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +1ms

Thanks for your contribution!

shashanksah commented 6 years ago

On further debugging, I found that this code pointer is not being reached once the host goes down- if (!broker || !broker.isConnected()) { this.refreshBrokerMetadata(); callback(new errors.BrokerNotAvailableError('Broker not available (sendRequest)')); return; } link- https://github.com/SOHU-Co/kafka-node/blob/master/lib/kafkaClient.js#L979

shashanksah commented 6 years ago

Given below are the logs with the corresponding hosts they are going to connect. The first calls was successful, second failed and third was successful. In the below case localhost:9092 was down.

kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s kafka-node:KafkaClient compressing messages if needed +76ms shashanksah actual broker!!!!![BrokerWrapper localhost:9094 (connected: true) (ready: true) (idle: false) (needAuthentication: false) (authenticated: false)] kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +928ms kafka-node:KafkaClient compressing messages if needed +422ms shashanksah actual broker!!!!![BrokerWrapper localhost:9092 (connected: false) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)] kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +1ms kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +583ms kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s kafka-node:KafkaClient compressing messages if needed +280ms shashanksah actual broker!!!!![BrokerWrapper localhost:9093 (connected: true) (ready: true) (idle: false) (needAuthentication: false) (authenticated: false)] kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +723ms kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s kafka-node:KafkaClient kafka-node-client reconnecting to localhost:9092 +1s

The issue is that it get the ip of host which is down and then waits on that host to be ready (for 30secs) and then fails the sent request. If we change the function ensureBrokerReady in the kafkaClient.js file in kafka-node to something like given below, then this issue does not occurs:

  const ensureBrokerReady = async.ensureAsync((leader, callback) => {
    let broker = this.brokerForLeader(leader, longpolling);
    console.log("shashanksah actual broker!!!!!" + broker);
    if (!broker.isReady()) {
      this.refreshBrokerMetadata();
      broker = this.brokerForLeader(leader, longpolling);
      //console.log("shashanksah broker not ready so refresh and retry!!!!!");
    }
    if (!broker.isReady()) {
      //console.log("shashanksah !broker.isReady");
      logger.debug('missing apiSupport waiting until broker is ready...');
      this.waitUntilReady(broker, callback);
    } else {
      callback(null);
    }
  }); 

Please check and tell if the RCA is correct.

Thanks, Shashank

Celthi commented 5 years ago

I have another question: when the broker host that the client connecting goes down , will the node-kafka be able to find the working replica and retry to send the message for the application? Or the application code should do the retry manually?

Thanks.

hxinhan commented 5 years ago

I've encountered into the same case. It's easier to reproduce if you assign leadership of all partitions to one of these three brokers and then turn down the broker which holds all leaderships. Producers get Request timed out after 30000ms errors if this happens and they seem not to recover afterwards. My feeling is that client does not have updated view of brokers when this happens.

A workaround for me is to call refreshMetadata method of KafkaClient and retry on sending. But it would be nice if this can be fixed on kafka-node library level.

crobinson42 commented 5 years ago

+1 @hyperlink just looking for some input on this bug - I'm happy to help with a PR

hyperlink commented 5 years ago

@crobinson42 today I pushed out 4.0.3 that should improve this. When the socket is closed and a BrokerNotAvailable error is passed to all pending requests kafka-node should now schedule a metadata refresh. Try it out and let me know.

crobinson42 commented 5 years ago

@hyperlink tried v4.0.3 and the OP's issue still stands.

I think what we're expecting is for a the send to resolve to the next available broker connection instead of timing out on the broker that is currently down. Is that a reasonable expectation, out of curiosity?

hyperlink commented 5 years ago

I think what we're expecting is for a the send to resolve to the next available broker connection instead of timing out on the broker that is currently down. Is that a reasonable expectation, out of curiosity?

@crobinson42 a produce request should only be sent to the leader of the partition otherwise the request will yield NotLeaderForPartition error. Either way the request will fail until kafka-node gets the latest metadata from kafka.

anandchangediya commented 5 years ago

Hey any update on this issue I am facing the same thing

@hyperlink Could you assist me on the same.

Thanks in advance

crobinson42 commented 5 years ago

@anandchangediya i switched to kafkajs and i don't experience this issue anymore

anandchangediya commented 5 years ago

@crobinson42 I implemented the kafkajs module but still, the issue persists below is the exception

Caused by: KafkaJSConnectionError: Connection timeout at Timeout.onTimeout [as _onTimeout] (node_modules\kafkajs\src\network\connection.js:149:23) at ontimeout (timers.js:424:11) at tryOnTimeout (timers.js:288:5) at listOnTimeout (timers.js:251:5) at Timer.processTimers (timers.js:211:10) name: '**KafkaJSNumberOfRetriesExceeded**', module reties connection 5 times and the stuck with above exception.

Is there any setting that I am missing? below is my Kafka client configuration

kafka = new Kafka({ brokers: ['kafka1:9092', 'kafka2:9092','kafka3:9092'] })

peturh commented 4 years ago

Got error now as well.

Error sending telementry payload Error: NotLeaderForPartition
2020-05-27T06:13:03.691219260Z     at Object.<anonymous> (/dist//server/main.js:1:2169270)
2020-05-27T06:13:03.691225160Z     at Object.self.tap (/dist//server/main.js:1:598580)
2020-05-27T06:13:03.691229660Z     at Object.decodePartitions (/dist//server/main.js:1:2169229)
2020-05-27T06:13:03.691250761Z     at Object.self.loop (/dist//server/main.js:1:598836)
2020-05-27T06:13:03.691255561Z     at Object.<anonymous> (/dist//server/main.js:1:2141520)
2020-05-27T06:13:03.691259861Z     at Object.self.loop (/dist//server/main.js:1:598836)
2020-05-27T06:13:03.691264161Z     at decodeProduceV2Response (/dist//server/main.js:1:2169012)
2020-05-27T06:13:03.691268461Z     at KafkaClient.Client.invokeResponseCallback (/dist//server/main.js:1:1868290)
2020-05-27T06:13:03.691272761Z     at KafkaClient.Client.handleReceivedData (/dist//server/main.js:1:1867946)
2020-05-27T06:13:03.691277061Z     at KafkaClient.Atu0.KafkaClient.handleReceivedData (/dist//server/main.js:1:664490)

and

2020-05-27T06:28:58.629865294Z TelemetryProducer error BrokerNotAvailableError: Broker not available (loadMetadataForTopics)
2020-05-27T06:28:58.629886994Z     at new BrokerNotAvailableError (/dist/server/main.js:1:2774240)
2020-05-27T06:28:58.629892094Z     at KafkaClient.Atu0.KafkaClient.loadMetadataForTopics (/dist/server/main.js:1:654676)
2020-05-27T06:28:58.629896594Z     at RetryOperation._fn (/dist/server/main.js:1:1863405)
2020-05-27T06:28:58.629909094Z     at Timeout.<anonymous> (/dist/server/main.js:1:1846915)
2020-05-27T06:28:58.629914094Z     at ZoneDelegate.invokeTask (/dist/server/main.js:1:1799492)
2020-05-27T06:28:58.629918594Z     at Zone.runTask (/dist/server/main.js:1:1792057)
2020-05-27T06:28:58.629922794Z     at ZoneTask.invokeTask (/dist/server/main.js:1:1801077)
2020-05-27T06:28:58.629926994Z     at Timeout.ZoneTask.options.useG.invoke (/dist/server/main.js:1:1800906)
2020-05-27T06:28:58.629931494Z     at Timeout.timer [as _onTimeout] (/dist/server/main.js:1:1837218)
2020-05-27T06:28:58.629935694Z     at listOnTimeout (internal/timers.js:549:17)
2020-05-27T06:28:58.630895900Z TelemetryProducer error BrokerNotAvailableError: Broker not available (loadMetadataForTopics)
2020-05-27T06:28:58.630907500Z     at new BrokerNotAvailableError (/dist/server/main.js:1:2774240)
2020-05-27T06:28:58.630912200Z     at KafkaClient.Atu0.KafkaClient.loadMetadataForTopics (/dist/server/main.js:1:654676)
2020-05-27T06:28:58.630916500Z     at RetryOperation._fn (/dist/server/main.js:1:1863405)
2020-05-27T06:28:58.630920700Z     at Timeout.<anonymous> (/dist/server/main.js:1:1846915)
2020-05-27T06:28:58.630925400Z     at ZoneDelegate.invokeTask (/dist/server/main.js:1:1799492)
2020-05-27T06:28:58.630929700Z     at Zone.runTask (/dist/server/main.js:1:1792057)
2020-05-27T06:28:58.630934000Z     at ZoneTask.invokeTask (/dist/server/main.js:1:1801077)
2020-05-27T06:28:58.630938200Z     at Timeout.ZoneTask.options.useG.invoke (/dist/server/main.js:1:1800906)
2020-05-27T06:28:58.630942600Z     at Timeout.timer [as _onTimeout] (/dist/server/main.js:1:1837218)
2020-05-27T06:28:58.630953700Z     at listOnTimeout (internal/timers.js:549:17)