tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.71k stars 525 forks source link

Issue in connecting consumer #1557

Open ankit-rawani opened 1 year ago

ankit-rawani commented 1 year ago

Describe the bug I'm trying to run a script (provided below) which basically should connect to kafka instance and subscribe to a few topics. But it shows few errors about not being able to connect to the broker and then it shows issues connecting the consumer and then connection timeout error.

const { Kafka } = require('kafkajs');
const dotenv = require('dotenv');

dotenv.config();

const connect = async () => {
  const kafka = new Kafka({
    brokers: [process.env.KAFKA_BROKER],
    sasl: {
      mechanism: 'scram-sha-256',
      username: process.env.KAFKA_USER_NAME,
      password: process.env.KAFKA_PASSWORD,
    },
    ssl: true,
  });

  const producer = kafka.producer();
  await producer.connect();

  const consumer = kafka.consumer({ groupId: 'health-consumer' });
  await consumer.connect();

  consumer.on(consumer.events.DISCONNECT, async () => {
    console.log('Consumer disconnected, reconnecting...');
    await consumer.connect();
  });

  return {
    producer, consumer,
  };
};

const run = async () => {
  const { consumer } = await connect();
  await consumer.subscribe({
    topics: [
      process.env.KAFKA_CONCOX_HEALTH_TOPIC,
      process.env.KAFKA_CONCOX_LOCATION_TOPIC,
      process.env.KAFKA_CONCOX_LOCATION_TOPIC_NO_ODOMETER,
    ],
  });

  await consumer.run({
    eachMessage: async ({ topic, message }) => {
      console.log(topic, ': ', message);
    },
  });
};

run().catch((e) => console.log(e));

Expected behavior The consumer should connect and then subscribe to the topics.

Observed behavior I'm getting this error.

{"level":"WARN","timestamp":"2023-03-21T10:17:40.239Z","logger":"kafkajs","message":"KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option \"createPartitioner: Partitioners.LegacyPartitioner\". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable \"KAFKAJS_NO_PARTITIONER_WARNING=1\""}
{"level":"ERROR","timestamp":"2023-03-21T10:17:41.260Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:41.263Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":314}

{"level":"ERROR","timestamp":"2023-03-21T10:17:45.650Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:45.652Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":271}
{"level":"INFO","timestamp":"2023-03-21T10:17:50.657Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"health-consumer"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:51.667Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-1-kafka.upstash.io:9093","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:51.669Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSConnectionError: Connection timeout","groupId":"health-consumer","stack":"KafkaJSConnectionError: Connection timeout\n    at Timeout.onTimeout [as _onTimeout] (/home/ankit/concox-trans-gs05/node_modules/kafkajs/src/network/connection.js:223:23)\n    at listOnTimeout (node:internal/timers:559:17)\n    at processTimers (node:internal/timers:502:7)"}
{"level":"INFO","timestamp":"2023-03-21T10:17:51.670Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"health-consumer"}
Consumer disconnected, reconnecting...
{"level":"ERROR","timestamp":"2023-03-21T10:17:51.672Z","logger":"kafkajs","message":"[Consumer] Restarting the consumer in 300ms","retryTime":300,"groupId":"health-consumer"}
{"level":"INFO","timestamp":"2023-03-21T10:17:51.974Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"health-consumer"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:52.674Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:52.677Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":309}
{"level":"ERROR","timestamp":"2023-03-21T10:17:53.988Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:53.989Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":1,"retryTime":648}
{"level":"ERROR","timestamp":"2023-03-21T10:17:55.639Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:55.641Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":2,"retryTime":1162}
{"level":"ERROR","timestamp":"2023-03-21T10:17:57.804Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:17:57.806Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":3,"retryTime":2018}
{"level":"ERROR","timestamp":"2023-03-21T10:18:00.825Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:00.827Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":4,"retryTime":4136}
{"level":"ERROR","timestamp":"2023-03-21T10:18:05.963Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:05.965Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":5,"retryTime":8316}
{"level":"ERROR","timestamp":"2023-03-21T10:18:05.966Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSNumberOfRetriesExceeded: Connection timeout","groupId":"health-consumer","retryCount":5,"stack":"KafkaJSNonRetriableError\n  Caused by: KafkaJSConnectionError: Connection timeout\n    at Timeout.onTimeout [as _onTimeout] (/home/ankit/concox-trans-gs05/node_modules/kafkajs/src/network/connection.js:223:23)\n    at listOnTimeout (node:internal/timers:559:17)\n    at processTimers (node:internal/timers:502:7)"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:05.966Z","logger":"kafkajs","message":"[Consumer] Failed to execute listener: Connection timeout","eventName":"consumer.disconnect","stack":"KafkaJSNonRetriableError\n  Caused by: KafkaJSConnectionError: Connection timeout\n    at Timeout.onTimeout [as _onTimeout] (/home/ankit/concox-trans-gs05/node_modules/kafkajs/src/network/connection.js:223:23)\n    at listOnTimeout (node:internal/timers:559:17)\n    at processTimers (node:internal/timers:502:7)"}
{"level":"INFO","timestamp":"2023-03-21T10:18:05.967Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"health-consumer"}
Consumer disconnected, reconnecting...
{"level":"ERROR","timestamp":"2023-03-21T10:18:05.967Z","logger":"kafkajs","message":"[Consumer] Restarting the consumer in 8316ms","retryCount":5,"retryTime":8316,"groupId":"health-consumer"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:06.967Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:06.968Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":349}
{"level":"ERROR","timestamp":"2023-03-21T10:18:08.319Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:08.319Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":1,"retryTime":568}
{"level":"ERROR","timestamp":"2023-03-21T10:18:09.888Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:09.890Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":2,"retryTime":1242}
{"level":"ERROR","timestamp":"2023-03-21T10:18:12.133Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:12.134Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":3,"retryTime":2188}
{"level":"INFO","timestamp":"2023-03-21T10:18:14.284Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"health-consumer"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:15.323Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"ample-earwig-9318-us1-kafka.upstash.io:9092","clientId":"kafkajs"}
{"level":"ERROR","timestamp":"2023-03-21T10:18:15.325Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":4,"retryTime":4980}

To Reproduce I'm running this script that is just trying to connect the consumer, subscribe to a few channels and console log the topic and messages.

Environment:

sancar commented 1 year ago

Hi @ankit-rawani, your kafka instance seems to be running fine. Can you make sure that your environment can connect to brokers ? I can see that I have connection to there with following commands.

 ❯ nc -vz ample-earwig-9318-us1-kafka.upstash.io 9092
Connection to ample-earwig-9318-us1-kafka.upstash.io port 9092 [tcp/XmlIpcRegSvc] succeeded!
~ on ☁️  (us-west-1)
❯ nc -vz ample-earwig-9318-us1-kafka.upstash.io 9093
Connection to ample-earwig-9318-us1-kafka.upstash.io port 9093 [tcp/*] succeeded!
~ on ☁️  (us-west-1)
❯ nc -vz ample-earwig-9318-us1-kafka.upstash.io 9094
Connection to ample-earwig-9318-us1-kafka.upstash.io port 9094 [tcp/*] succeeded!
❯ nc -vz ample-earwig-9318-us1-kafka.upstash.io 9095
Connection to ample-earwig-9318-us1-kafka.upstash.io port 9095 [tcp/*] succeeded!
sancar commented 1 year ago

Hi @ankit-rawani , are you still experiencing the problem ? Were you able to check if your environment can reach to kafka as I suggested ?

rishibakshidev commented 1 year ago

Hi @sancar I am having same issue and I tried your command to check if I can connect to my borkers and its successfully getting connect with all the brokers. Let me know what I should do further.

sancar commented 1 year ago

@rishibakshidev Can you share the kafka-cluster id and/or the endpoint from the console so that I can check ? Also can you try creating a second cluster and see it works ? And please also share the logs that you see as sometimes the problem is hidden in small differences ?

sancar commented 1 year ago

@rishibakshidev if it is Upstash related, you can ask your questions https://upstash.com/discord or you can use our chat support in https://upstash.com/ As you would guess we are not checking the issues in other repos regularly.

@tulios We can close this issue if you prefer.

izakdvlpr commented 1 year ago

I have the same problem when I connect to a cluster on upstash. :/

rishibakshidev commented 12 months ago

Add connectionTimeout:3000 while connecting and see if you are able to resolve the issue.

const kafkaConnect = new Kafka({ ...props, connectionTimeout: 3000, });

kushagra-nt commented 4 months ago

@rishibakshidev Thanks, I was having same issue but adding connectionTimeout somehow fixed it, Thanks dude!