SOHU-Co / kafka-node

Node.js client for Apache Kafka 0.8 and later.
MIT License
2.66k stars 628 forks source link

High memory usage 1gb+ for 100k+ messages to kafka-node #179

Closed mwaarna closed 9 years ago

mwaarna commented 9 years ago

What kind of memory usage is everyone seeing with Kafka node?

I am sending 4kb messages in bulk(100k+) to kafka node and I am seeing ram usage climb rapidly to 1gb+.

Testing on Ubuntu 12.04 and Windows 7 64 bit. Both running Node V 0.12.0

Kafka version 8.1.1

Sample Node.js code to simulate:

var express = require('express');       // call express
var kafka = require('kafka-node');
var os = require('os');
var bodyParser = require('body-parser');
var log4js = require('log4js');

var app = express();                // define our app using express
var iTotalPosts = 0;
var iTotalPostsSuccess = 0;
var iTotalPostsError = 0;
var bProducerReady = false;
var StartUpTime = new Date();
var LastSuccessFulConnection = null;
var dLastErrorEmailSent = null;

var hostName = os.hostname();

console.log('process.cwd(): ' + process.cwd());
console.log('os.hostname(): ' + hostName);

var config = require('config');

console.log('NODE_CONFIG_DIR: ' + config.util.getEnv('NODE_CONFIG_DIR'));

log4js.configure(config.get('logger.configPath'), {});

var logger_log4js = log4js.getLogger(config.get('logger.category'));
logger_log4js.setLevel('DEBUG');
logger_log4js.debug('Kafka Rest Services Starting!');

var nodemailer = require('nodemailer');

var transporter = nodemailer.createTransport({
    host: config.get('mail.options.host'),
    port: config.get('mail.options.port'),
    ignoreTLS: true,
    maxConnections: 5,
    maxMessages: 10
});

var mailOptions = {
    from: config.get('config.fromAddress'), // sender address
    to: config.get('config.toAddress'), // list of receivers
    subject: "Kafka Post"
};

//https://github.com/alexguan/node-zookeeper-client
var zkOptionsSessionTimeOut = 30000; //Session timeout in milliseconds, defaults to 30 seconds.
var zkOptionsSpinDelay = 30; //The delay (in milliseconds) between each connection attempts.
var zkOptionsRetries = 0; //The number of retry attempts for connection loss exception.
var zkOptions = {"sessionTimeOut": zkOptionsSessionTimeOut, "spinDelay": zkOptionsSpinDelay, "retries": zkOptionsRetries };
var zkConnectionStr = config.get('config.zkConnectionStr');
var HighLevelProducer = kafka.HighLevelProducer;
var client = new kafka.Client(zkConnectionStr, 'NodeJsProducer', zkOptions),
    producer = new HighLevelProducer(client);

producer.on('ready', function () {
    console.log("Producer to Kafka ready!");
    var kafkaTopics = ['TestTopic'];
    producer.createTopics(kafkaTopics, false, function (err, data) {
        if (data)
            console.log('kafka topic created Successfully ' + data);
        else if (err)
            console.log('kafka topic Creation Error= ' + err);
        bProducerReady = true;
        LastSuccessFulConnection = new Date();
    });
});

console.log('zkConnectionStr: ' + zkConnectionStr);

app.use(bodyParser.urlencoded({ extended: true }));
app.use(bodyParser.json());

var port = 8888;        // set our port

// ROUTES FOR OUR API
// =============================================================================
var router = express.Router();              // get an instance of the express Router

router.post('/heartbeat/', function (req, res) {
// get Basic statistics back
    res.json({StartUpTime: StartUpTime,
        iTotalPosts: iTotalPosts,
        iTotalPostsSuccess: iTotalPostsSuccess,
        iTotalPostsError: iTotalPostsError,
        bProducerReady: bProducerReady,
        zkConnectionStr: zkConnectionStr,
        LastSuccessFulConnection: LastSuccessFulConnection});

});

//  Kafka Receiver
router.post('/kafka/:async', function (req, res) {

    var bAsync = ( req.params.async.toUpperCase() == "TRUE")
    var kafkaTopic = "TestTopic";
    iTotalPosts++;
        try {

            if (bProducerReady) {
                producer.send([{ topic: kafkaTopic, messages: JSON.stringify(req.body)}], function (err, data) {
                    if (data) {
                        LastSuccessFulConnection = new Date();
                        iTotalPostsSuccess++;
                        console.log(data);

                        if (!bAsync)
                            res.json({KafkaResponse: data});
                    }
                    if (err) {
                        handleError("ERROR producer.send " + (err != null ? err.toString() : ""), res, req, bAsync);
                    }
                });
            }

            producer.on('error', function (err) {
                handleError("producer.on 'error' " + (err != null ? err.toString() : ""), res, req, bAsync);
            });

        }
        catch (e) {
            handleError("try catch " + e.toString(), res, req, bAsync);
        }

        if (bAsync && bProducerReady)
            res.json({KafkaResponse: "Async Kafka Request received!"});
        else if (!bProducerReady)
            handleError("Producer Is Not Ready", res, req, bAsync);
});

function handleError(err, res, req, bAsync) {
    try {
        iTotalPostsError++;
        console.log(err);
        logger_log4js.error("ERROR Kafka for " + JSON.stringify(req.body));
        logger_log4js.error(err);

        // Only send out 1 error email every 10 minutes
        if (dLastErrorEmailSent == null || ( dLastErrorEmailSent != null && ( new Date().valueOf() - dLastErrorEmailSent.valueOf()) > 600000 )) {
            dLastErrorEmailSent = new Date();
            mailOptions.subject = hostName + " Error! " + mailOptions.subject;
            mailOptions.html = JSON.stringify(req.body) + " Error " + err;
            transporter.sendMail(mailOptions);// Only Send Error Emails
        }

        if (!bAsync)
            res.status(500).send(err);
    }
    catch (e) {
        console.log(e);
    }
}

// REGISTER OUR ROUTES -------------------------------
// all of our routes will be prefixed with /api
app.use('/api', router);
app.use(function (req, res, next) {
    res.status(404).send('That is not the route you are looking for!');
});

// START THE SERVER
// =============================================================================
app.listen(port, config.get('config.listeningIP'));
console.log('Listening on IP ' + config.get('config.listeningIP'));
console.log('listening on port:' + port)

Simple Classic ASP to hammer the Node app:

function ProduceToKafka(sData, iPostCount, bAsyncRequest )
{
    var bSuccess = true;
    var objHTTP = null;
    var szPostURL = "http://localhost:8888/api/kafka/"+bAsyncRequest;
    try
    {
       objHTTP = Server.CreateObject("WinHttp.WinHttpRequest.5.1");
       objHTTP.SetTimeouts(0,      // ResolveTimeout
                            5000,   //ConnectTimeout
                            5000,   //SendTimeout
                            5000);  //ReceiveTimeout

        for(x=0;x<iPostCount;x++)
        {
            objHTTP.open("POST", szPostURL, false);
            objHTTP.setRequestHeader("Content-Type", "application/json; charset=utf-8");
            objHTTP.send(sData);

            if ( objHTTP.status != 200 )
            {
                bSuccess = false;
            }
        }

    }
    catch(e)
    {
        bSuccess = false;
        Response.Write("<br>Error Posting to Kafka = " + e.description );
    }

    if (objHTTP != null)
        objHTTP = null;

    return bSuccess;
}
kadishmal commented 9 years ago

It maybe just the bottleneck of a single connection per broker used by kafka-node which cannot keep up with the number of incoming requests. You can start up several server processes load balanced behind a proxy, then you can keep adding more processes if you need more TPS.

@haio can probably tell the exact reason why kafka-node would consumer so much memory.

haio commented 9 years ago

Just have a look of your code, that can't explain all the 1gb+ memory is used by kafka-node, I make a sample test that wirite data to a 8 partitions topic use HighLevelProducer , the memory used is always less than 90M, the code

var kafka = require('kafka-node');
var HighLevelProducer = kafka.HighLevelProducer;
var Client = kafka.Client;
var client = new Client();
var topic = 'test-topic-2';
var count = 100*1024, rets = 0;
var producer = new HighLevelProducer(client);
var data = require('fs').readFileSync('4k.txt').toString()

producer.on('ready', function () {
    setInterval(function () {
      send(data);
    }, 5)
});

function send(message) {
    producer.send([
        {topic: topic, messages: [message] }
    ], function (err, data) {
        if (err) console.log(err);
        if (rets % 1000 === 0) console.log('sent %d', rets);
        if (++rets === count) process.exit();
    });
}
mwaarna commented 9 years ago

The issue was not in Kafka.

borushka commented 8 years ago

Having same problem. Can you explain what was the issue?

michallevin commented 6 years ago

@mwaarna Where was the issue?

alessioerosferri commented 5 years ago

Same here, @mwaarna what was the issue?