SOHU-Co / kafka-node

Node.js client for Apache Kafka 0.8 and later.
MIT License
2.66k stars 627 forks source link

Cannot write data in the kafka by the kafka-node #268

Open bboalimoe opened 8 years ago

bboalimoe commented 8 years ago

i add a after save hook in strongloop models. and in the hook function ,i call the send msg method.. it starts right , but later, no response in the kafka broker that is cannot write data in the kafka.. thanks a lot!

var send_msg = function(msg, topic){

     //start new client
    p = a = 0;
    kafka = require('kafka-node');
    var Producer = kafka.Producer;
//var KeyedMessage = kafka.KeyedMessage;
    var Client = kafka.Client;
    var KeyedMessage = kafka.KeyedMessage;
    var client = new Client('ip:2181/kafka/q-63lnrnga');
    var topic = topic || 'motherfucker';
    var producer = new Producer(client, {requireAcks: 1});
    var keyedMessage = new KeyedMessage(topic, msg);
    producer.on('ready', function () {
        producer.send([
            {topic: topic, partition: p, messages: [keyedMessage], attributes: a}
        ], function (err, result) {
            if(!err){
                console.log("successfully send one msg")
            }else{
                console.log("error is " + err );
            }
            //process.exit(); //
        });

    });

    producer.on('error', function (err) {
        console.log('error', err);
    });

};

exports.kafka_msg_send = send_msg;
bboalimoe commented 8 years ago

@coldkite hello~ thanks~

estliberitas commented 8 years ago

@bboalimoe Is producer ready event emitted? You can have the situation you just don't connect to ZK and node-zookeeper-client has this issue - it endlessly tries to connect.

haio commented 8 years ago

As @estliberitas said, the message can only be sent when producer is ready. Btw, maybe you should create a producer instance outside send_msg function rather than create new producer every time.

bboalimoe commented 8 years ago

error message is

error is LeaderNotAvailable

and i change the code as below and it didn't even trigger the send method of producer once .. The above code can at least hold 200 msg sent , i know maybe this is because of too many connections...but how could i put producer.on outside the send_msg ? thanks a lot!

kafka = require('kafka-node');
p = a = 0;
var Producer = kafka.Producer;
//var KeyedMessage = kafka.KeyedMessage;
var Client = kafka.Client;
var client = new Client('ip:2181/kafka/q-63lnrnga'); //todo debug mode is public ip
                                                                // todo  production mode is private ip
var producer = new Producer(client, {requireAcks: 1});

var send_msg = function(msg, topic){

     //start new client

    var KeyedMessage = kafka.KeyedMessage;

    var topic = topic || 'motherfucker';
    console.log("fuck 1")
    var keyedMessage = new KeyedMessage(topic, msg);
    producer.on('ready', function () {
        console.log("fuck 2")
        producer.send([
            {topic: topic, partition: p, messages: [keyedMessage], attributes: a}
        ], function (err, result) {
            if(!err){
                console.log("successfully send one msg")
            }else{
                console.log("error is " + err );
            }
            //process.exit(); //不能有,会将该进程杀死
        });

    });

    producer.on('error', function (err) {
        console.log('error', err);
    });

};

exports.kafka_msg_send = send_msg;

@haio @estliberitas

bboalimoe commented 8 years ago

it is said i need the generic-pool to hold the kafka connections~ 。。。。