Open braj1999 opened 4 years ago
I have faced a similar issue, i don't have a proper solution for this, but it turns out that usually, the code will work if the constructor of the producer is called in the same scope as the actual producer.on('ready',CB) call. I don't know why this is the case, but that's just how it is In your case, I believe, if you replaced the call to the _getProducerInstance() function with the actual code, it will probably work
I am also facing the same issue. Is there any solution for that ? The above workaround is also not working for me.
I am also facing the same issue. Is there any solution for that ? The above workaround is also not working for me.
I actually found out the proper solution to this, a few days ago change the producer.on('ready', CB) to if(producer.ready){ // do some work }
Apparently, every time the constructor for the producer is called, it emits a ready event, which is what producer.on('ready') looks for. However, this event is only emitted once, hence the producer.on('ready') doesn't work in a different scope from the constructor apparently. The proper solution is quite simple. once the event is emitted at first, it sets the ready property of the producer object to true, which should then be used as the condition to check if Kafka connection is live and then you may use producer.send({topic, payloads}) or whatever else it is you need to do
I am still not able to publish the data. This is my code structure. I am getting timeOutError in send method callback. Earlier, without SASL setup on kafka, it was working fine, but after adding SASL, I was able to create a KafkaClient, but producers were not working.
// kafka Service
import { KafkaClient, HighLevelProducer } from 'kafka-node';
export class Service {
private static instance: Service;
private kafkaClient: any;
private producer: any;
private constructor() {}
static getInstance() {
if (!Service.instance) {
Service.instance = new Service();
Service.instance.kafkaClient = new KafkaClient({
kafkaHost: SOME_KAFKA_HOST,
sasl: { mechanism: 'plain', username: 'some_user', password: 'some_password' },
sslOptions: {
rejectUnauthorized: false
}
});
Service.instance.producer = new HighLevelProducer(Service.instance.kafkaClient);
}
return Service.instance;
}
get _kafkaClient(): KafkaClient {
return this.kafkaClient;
}
get _producer(): HighLevelProducer {
return this.producer;
}
initProducer = () => {
this._producer.on('ready', () => {
console.log('Producer is ready');
});
}
}
// publish method:
const ServiceInstance = Service.getInstance();
const kafkaClient = ServiceInstance._kafkaClient;
const producer: any = ServiceInstance._producer;
const publish = async (req: any) => {
const { topic, message = '' } = req;
const payloads = [
{ topic: topic, messages: message }
];
if(producer['ready']) {
producer.send(payloads, (err: any, data: any) => {
console.log('If any error: ', err);
console.log('Published data: ', data);
});
};
}
I am getting this Error in send method:
If any error: TimeoutError: Request timed out after 30000ms
at new TimeoutError (......./node_modules/kafka-node/lib/errors/TimeoutError.js:6:9)
at Timeout._onTimeout (......./node_modules/kafka-node/lib/kafkaClient.js:1012:16)
at listOnTimeout (internal/timers.js:549:17)
at processTimers (internal/timers.js:492:7) {
message: 'Request timed out after 30000ms'
}
Please suggest what could be the solution here.
Commenting further on this issue, I work with @amit-kumar27 and we've troubleshooted the issue.
We'd used IPs instead of hostnames in our connection string, resulting in Kafka never completing the handshake. Our configuration is SASL+SSL, in which Kafka should care about the hostname. Interestingly server does show SSL connection failure in the case wrong hostname. But it never shares why the SSL connection failed. It seems Kafka leaves the connections hung up, instead of closing them with a message or dropping the connection.
To explore this issue further we'd used the wrong port, but here we got no message in the server logs obviously. But the behaviour for the application is identical.
I hope this helps someone who checks this out later. Thanks for helping out guys! 👍
@prafiles I am facing kind of similar issue. When ssl handsake fails, it neither emit error event, nor emit ready event. Hope someone take a look at this.
We are trying to create a kafka-node Producer and send message using it. However, the 'Ready' event is not emitted by the Producer and hence cannot proceed with the send message code.
Environment Information : OS: Windows 10 Node Version : 10.16.3 NPM Version : 6.9.0 kafka-node version: 5.0.0
Steps to Reproduce : Following is the POC that we are trying.