tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 527 forks source link

消费者端无法连接到kafka,连接地址被固定 #1689

Open Dr-SummerFlower opened 5 months ago

Dr-SummerFlower commented 5 months ago

我在使用kafkajs写一个demo时发现了无法连接的情况,根据我对日志的排查,发现broker固定连接一个名为“kafka:9092”的地址,而不是我在实例化kafkajs时设定的主机地址,这就导致我无法连接到正确的服务器,我通过在计算机的hosts文件中添加192.168.21.11 kafka解决了这个问题,但是我还是希望你们能修复这个问题。

我的项目依赖:

{
  "dependencies": {
    "express": "^4.19.2",
    "kafkajs": "^2.2.4"
  }
}

出现问题时的日志:

{"level":"ERROR","timestamp":"2024-05-26T09:41:29.695Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"kafka:9092","clientId":"test"}

我的代码

import * as express from 'express';
import { Express, Request, Response } from 'express';
import { Kafka, Partitioners, Producer } from 'kafkajs';

const app: Express = express();

const kafka: Kafka = new Kafka({
    clientId: 'test',
    brokers: ['kafka:9092'],
});
const producer: Producer = kafka.producer({
    createPartitioner: Partitioners.LegacyPartitioner,
    allowAutoTopicCreation: true,
});

app.use(express.json());

app.post('/data', async (req: Request, res: Response): Promise<void> => {
    await producer.connect();
    await producer.send({
        topic: 'test-topic',
        messages: [
            {
                value: JSON.stringify(req.body),
            },
        ],
    });
    await producer.disconnect();
    res.status(200).send('请求成功');
});

app.listen(25551, (): void => {
    console.log('服务启动在:localhost:25551');
});

/**
 * @File: client.ts
 * @author: 夏花
 * @time: 2024-05-26
 */

import { Consumer, Kafka } from 'kafkajs';

const kafka: Kafka = new Kafka({
    clientId: 'test',
    brokers: ['192.168.21.11:9092'],
});
const consumer: Consumer = kafka.consumer({
    groupId: 'test-group',
});

(async (): Promise<void> => {
    await consumer.connect();
    await consumer.subscribe({
        topic: 'test-topic',
        fromBeginning: true,
    });
    await consumer.run({
        eachMessage: async ({ topic, partition, message }): Promise<void> => {
            console.log({
                topic,
                partition,
                offset: message.offset,
                value: message.value?.toString(),
            });
        },
    });
})();
iejixudong commented 3 months ago

const kafka: Kafka = new Kafka({ clientId: 'test', brokers: ['kafka:9092'], }); 这个是你自己配置的

Dr-SummerFlower commented 3 months ago

const kafka: Kafka = new Kafka({ clientId: 'test', Brokers: ['kafka:9092'], });这是你自己配置的

问题在于我使用ip也是这样,返回的日志任然是"kafka:9092",而不是我配置的地址

JavenLaw commented 3 months ago

我也遇到同样的问题

新配置的地址启动时候能生效 但是当重连的时候,就会使用旧的地址

JavenLaw commented 3 months ago

遇到同样的问题 开始使用的测试连接地址是A 后面开始使用正式的地址是B

在启动的时候,实例化的地址确实为B 但是当这期间网络断开,kafkajs重新连接时,就会一直重连地址A 并报TIMEOUT错误

但是搜索整个项目和配置,都已经不存在地址A 就像此问题说的:连接地址被固定了

Dr-SummerFlower commented 3 months ago

遇到同样的问题 开始使用的测试连接地址是A 后面开始使用正式的地址是B

在启动的时候,实例化的地址确实为B 但是当这期间网络断开,kafkajs重新连接时,就会一直重连地址A 并报TIMEOUT错误

但是搜索整个项目和配置,都已经不存在地址A 就像此问题说的:连接地址被固定了

尝试在hosts中添加“<kafka的主机地址> kafka”暂时解决问题吧,然后等待更新

JavenLaw commented 3 months ago

遇到同样的问题 开始使用的测试连接地址是A 后面开始使用正式的地址是B 在启动的时候,实例化的地址确实为B 但是当这期间网络断开,kafkajs重新连接时,就会一直重连地址A 并报TIMEOUT错误 但是搜索整个项目和配置,都已经不存在地址A 就像此问题说的:连接地址被固定了

尝试在hosts中添加“<kafka的主机地址> kafka”暂时解决问题吧,然后等待更新

不过我的配置直接是:brokers: [“127.0.0.1:9001”]的格式也可以吗?在hosts如果想把127.0.0.1改为192.168.1.1应该如何改呢

Dr-SummerFlower commented 3 months ago

遇到同样的问题开始使用的测试连接地址是A后面开始使用的正式地址是B 在启动的时候,实例化的地址确实为B但是当这期间网络断开,kafkajs重新连接时,就会一直重连接地址A并报超时错误 但是搜索整个项目和配置,都已经不存在地址A就像此问题所说的:连接地址已固定了

尝试在hosts中添加“<kafka的主机地址> kafka”暂时解决问题吧,等待然后更新

我的配置直接是:brokers: [“127.0.0.1:9001”]的格式也可以吗?在hosts的话如果想把127.0.0.1改为192.168.1.1应该怎么改呢

我没有尝试过修改127.0.0.1,但是我想它是可以的 192.168.1.1应该是你的路由器向你的电脑分配的地址,同样也可以添加到hosts文件中 你可以在hosts文件中添加

127.0.0.1 kafka
或者
192.168.1.1 kafka

两个只需要选择其中一个添加就可以,他们的效果应该是一样的

JavenLaw commented 3 months ago

遇到同样的问题开始使用的测试连接地址是A后面开始使用的正式地址是B 在启动的时候,实例化的地址确实为B但是当这期间网络断开,kafkajs重新连接时,就会一直重连接地址A并报超时错误 但是搜索整个项目和配置,都已经不存在地址A就像此问题所说的:连接地址已固定了

尝试在hosts中添加“<kafka的主机地址> kafka”暂时解决问题吧,等待然后更新

我的配置直接是:brokers: [“127.0.0.1:9001”]的格式也可以吗?在hosts的话如果想把127.0.0.1改为192.168.1.1应该怎么改呢

我没有尝试过修改127.0.0.1,但是我想它是可以的 192.168.1.1应该是你的路由器向你的电脑分配的地址,同样也可以添加到hosts文件中 你可以在hosts文件中添加

127.0.0.1 kafka
或者
192.168.1.1 kafka

两个只需要选择其中一个添加就可以,他们的效果应该是一样的

我的意思是:我没有使用brokers: ['kafka:9092'],而是直接brokers: ['127.0.0.1:9092'] 此时如何在hosts中配置127.0.0.1 kafka?

JavenLaw commented 3 months ago

实验了一下,即使hosts改了,在重新连接时还是会去连接旧地址

Dr-SummerFlower commented 3 months ago

这就是我遇到的问题了,它似乎是将brokers: ['kafka:9092']直接写入到npm包中了,导致我们写的代码中的地址无法被npm包接受作为链接地址

mxdmly commented 3 weeks ago

建议用英文描述这个问题