SOHU-Co / kafka-node

Node.js client for Apache Kafka 0.8 and later.
MIT License
2.66k stars 627 forks source link

NestedError: refreshBrokerMetadata failed, Caused By: Error: Unable to find available brokers to try #1378

Open yingzi20 opened 4 years ago

yingzi20 commented 4 years ago

Questions?

NestedError: refreshBrokerMetadata failed, Caused By: Error: Unable to find available brokers to try

Bug Report

Environment

For specific cases also provide

new kafka.HighLevelProducer(new kafka.KafkaClient(), {
    // Configuration for when to consider a message as acknowledged, default 1
    requireAcks: 1,
    // The amount of time in milliseconds to wait for all acks before considered, default 100ms
    ackTimeoutMs: 100,
    // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
    // partitionerType: 2
  })

This error often occurs, what is the cause?

rorysavage77 commented 4 years ago

I'm seeing it too.

Errors { NestedError: refreshBrokerMetadata failed at async.waterfall (/node_modules/kafka-node/lib/kafkaClient.js:378:35) at /node_modules/async/dist/async.js:473:16 at /node_modules/async/dist/async.js:5329:29) at /node_modules/async/dist/async.js:969:16

rorysavage77 commented 4 years ago

I think I found a fix for this. It's not pretty, but it seems to work.

flaviosv commented 4 years ago

@rorysavage77 What is the fix?

bmakuh commented 4 years ago

@rorysavage77 what is your fix?

krunkosaurus commented 4 years ago

@rorysavage77 What is your fix?

zquintana commented 4 years ago

@rorysavage77 that fix?

binjiezhao commented 4 years ago

I wonder why this issue is closed? I'm experiencing it and there doesn't seem to be any good fix or work around?

rorysavage77 commented 4 years ago

Sorry for the late response, here is my patch for kafkaClient.js

--- kafkaClient.js  2020-07-28 13:05:28.000000000 -0400
+++ kafkaClient.js-new  2020-08-03 12:12:17.000000000 -0400
@@ -776,12 +776,12 @@
           error = new errors.SaslAuthenticationError(null, message);
         } else {
           error = new errors.BrokerNotAvailableError('Broker not available (socket closed)');
-          //if (!self.connecting && !brokerWrapper.isIdle()) {
-            //logger.debug(`${self.clientId} schedule refreshBrokerMetadata()`);
-            //setImmediate(function () {
-              //self.refreshBrokerMetadata();
-            //});
-          //}
+          if (!self.connecting && !brokerWrapper.isIdle()) {
+            logger.debug(`${self.clientId} schedule refreshBrokerMetadata()`);
+            setImmediate(function () {
+              self.refreshBrokerMetadata();
+            });
+          }
         }
       }
       self.clearCallbackQueue(this, error);
@@ -893,7 +893,7 @@

   const ensureBrokerReady = (broker, cb) => {
     if (!broker.isReady()) {
-      logger.debug('missing apiSupport waiting until broker is ready...');
+      logger.debug('missing apiSupport waiting until broker is ready...(loadMetadataForTopics)');
       this.waitUntilReady(broker, cb);
     } else {
       cb(null);
@@ -906,20 +906,13 @@
         ensureBrokerReady(broker, cb);
       },
       cb => {
-        /* Additional patching suggested by: https://github.com/SOHU-Co/kafka-node/issues/995 */
-        try {
-          const broker = this.brokerForLeader();
-          const correlationId = this.nextId();
-          const supportedCoders = getSupportedForRequestType(broker, 'metadata');
-          const request = supportedCoders.encoder(this.clientId, correlationId, topics);
-
-          this.queueCallback(broker.socket, correlationId, [supportedCoders.decoder, cb]);
-          broker.write(request);
-        } catch (err) {
-          callback(err);
-        }
-
+        const broker = this.brokerForLeader();
+        const correlationId = this.nextId();
+        const supportedCoders = getSupportedForRequestType(broker, 'metadata');
+        const request = supportedCoders.encoder(this.clientId, correlationId, topics);

+        this.queueCallback(broker.socket, correlationId, [supportedCoders.decoder, cb]);
+        broker.write(request);
       }
     ],
     (err, result) => {
@@ -1000,19 +993,25 @@

   const onReady = () => {
     logger.debug('broker is now ready');
-    this._clearTimeout(timeoutId);
-    timeoutId = null;
+
+    if (timeoutId !== null) {
+      this._clearTimeout(timeoutId);
+      timeoutId = null;
+    }
+
     callback(null);
   };

   const timeout = this.options.requestTimeout;
   const readyEventName = broker.getReadyEventName();

-  timeoutId = this._createTimeout(() => {
-    this.removeListener(readyEventName, onReady);
-    this._timeouts.delete(timeoutId);
-    callback(new TimeoutError(`Request timed out after ${timeout}ms`));
-  }, timeout);
+  if (timeout !== false) {
+    timeoutId = this._createTimeout(() => {
+      this.removeListener(readyEventName, onReady);
+      this._timeouts.delete(timeoutId);
+      callback(new TimeoutError(`Request timed out after ${timeout}ms`));
+    }, timeout);
+  }

   this.once(readyEventName, onReady);
 };
@@ -1081,8 +1080,13 @@

   const ensureBrokerReady = async.ensureAsync((leader, callback) => {
     const broker = this.brokerForLeader(leader, longpolling);
+    if (!broker.isConnected()) {
+      this.refreshBrokerMetadata();
+      callback(new errors.BrokerNotAvailableError('Broker not available (sendRequest -> ensureBrokerReady)'));
+      return;
+    }
     if (!broker.isReady()) {
-      logger.debug('missing apiSupport waiting until broker is ready...');
+      logger.debug(`missing apiSupport waiting until broker is ready... (sendRequest ${request.type})`);
       this.waitUntilReady(broker, callback);
     } else {
       callback(null);
@@ -1098,14 +1102,7 @@
             ensureBrokerReady(leader, callback);
           },
           function (callback) {
-            /* Updated sendToBroker call per:
-               Suggestion from: https://github.com/SOHU-Co/kafka-node/issues/995
-             */
-            try {
-              sendToBroker(payload, leader, callback);
-            } catch (ex) {
-              callback(ex);
-            }
+            sendToBroker(payload, leader, callback);
           }
         ],
         function (error, results) {
@@ -1387,6 +1384,12 @@
     return callback(new Error('Client is not ready (describeConfigs)'));
   }
   let err;
+
+  // Broker resource requests must go to the specific node
+  // other requests can go to any node
+  const brokerResourceRequests = [];
+  const nonBrokerResourceRequests = [];
+
   _.forEach(payload.resources, function (resource) {
     if (resourceTypeMap[resource.resourceType] === undefined) {
       err = new Error(`Unexpected resource type ${resource.resourceType} for resource ${resource.resourceName}`);
@@ -1394,39 +1397,47 @@
     } else {
       resource.resourceType = resourceTypeMap[resource.resourceType];
     }
+
+    if (resource.resourceType === resourceTypeMap['broker']) {
+      brokerResourceRequests.push(resource);
+    } else {
+      nonBrokerResourceRequests.push(resource);
+    }
   });
+
   if (err) {
     return callback(err);
   }
-  const brokers = this.brokerMetadata;
-  async.mapValuesLimit(
-    brokers,
-    this.options.maxAsyncRequests,
-    (brokerMetadata, brokerId, cb) => {
-      const broker = this.brokerForLeader(brokerId);
-      if (!broker || !broker.isConnected()) {
-        return cb(new errors.BrokerNotAvailableError('Broker not available (describeConfigs)'));
-      }
-
-      const correlationId = this.nextId();

-      let apiVersion = 0;
-      if (broker.apiSupport && broker.apiSupport.describeConfigs) {
-        apiVersion = broker.apiSupport.describeConfigs.max;
-      }
-      apiVersion = Math.min(apiVersion, 2);
-      const request = protocol.encodeDescribeConfigsRequest(this.clientId, correlationId, payload, apiVersion);
-      this.sendWhenReady(broker, correlationId, request, protocol.decodeDescribeConfigsResponse(apiVersion), cb);
-    },
-    (err, results) => {
-      if (err) {
-        callback(err);
-        return;
+  async.parallelLimit([
+    (cb) => {
+      if (nonBrokerResourceRequests.length > 0) {
+        this.sendRequestToAnyBroker('describeConfigs', [{ resources: nonBrokerResourceRequests, includeSynonyms: payload.includeSynonyms }], cb);
+      } else {
+        cb(null, []);
       }
-      results = _.values(results);
-      callback(null, _.merge.apply({}, results));
+    },
+    ...brokerResourceRequests.map(r => {
+      return (cb) => {
+        this.sendRequestToBroker(r.resourceName, 'describeConfigs', [{ resources: [r], includeSynonyms: payload.includeSynonyms }], cb);
+      };
+    })
+  ], this.options.maxAsyncRequests, (err, result) => {
+    if (err) {
+      return callback(err);
     }
-  );
+
+    callback(null, _.flatten(result));
+  });
+};
+
+/**
+ * Sends a request to any broker in the cluster
+ */
+KafkaClient.prototype.sendRequestToAnyBroker = function (requestType, args, callback) {
+  // For now just select the first broker
+  const brokerId = Object.keys(this.brokerMetadata)[0];
+  this.sendRequestToBroker(brokerId, requestType, args, callback);
 };

 module.exports = KafkaClient;
sagitsofan commented 4 years ago

@rorysavage77 Can you please send your full kafkaClient.js file?

adziri commented 4 years ago

Any idea why this start happening even with older versions of Kafka-node ?

jopais commented 3 years ago

any update on this?

raghav2496 commented 2 years ago

In my case, the above error was seen because of not having a listener and advertised listener. by default, it will be commented on the server.properties file.

listeners = PLAINTEXT://your.host.name:9092
advertised.listeners = PLAINTEXT://your.host.name:9092

additional config added in server.properties in config folder.

host.name=your.host.name
advertised.host.name=your.host.name