The kafka-native client provides consume and produce functionality for Kafka, using the librdkafka native library for performance. Periodic stats on local and kafka queue lengths are provided for monitoring and analysis. Message and offset progress is atomically recorded to local storage. Multi-core consume side processing is possible via node cluster.
var kafka_native = require('kafka-native');
var broker = 'localhost:9092';
var topic = 'example';
var producer = new kafka_native.Producer({
broker: broker
});
var consumer = new kafka_native.Consumer({
broker: broker,
topic: topic,
offset_directory: './kafka-offsets',
receive_callback: function(data) {
data.messages.forEach(function(m) {
console.log('message: ', m.topic, m.partition, m.offset, m.payload);
});
return Promise.resolve();
}
});
producer.partition_count(topic)
.then(function(npartitions) {
var partition = 0;
setInterval(function() {
producer.send(topic, partition++ % npartitions, ['hello']);
}, 1000);
return consumer.start();
});
See the examples
directory for more.
Install via npm, which will install dependencies and compile the node addon:
$ npm install kafka-native
If needed, the script scripts/test-server.sh
can be used to create a test/dev kafka instance. This uses docker-compose to launch kafka and zookeeper containers. If you're using Mac/Windows, make sure you've run the appropriate eval $(docker-machine env ...)
command before launching the test-server control script.
$ # Mac/Windows: eval $(docker-machine env <your-default-machine-name>)
$ eval $(scripts/test-server.sh start)
$ npm test
$ scripts/test-server.sh stop
If you already have a running kafka instance, you can export its brokerlist to NODE_KAFKA_NATIVE_BROKER
before running the tests or examples. Note that the mocha tests will fail if automatic topic creation is not allowed, or if topics are created with just one partition.
Code coverage report is available by running:
$ npm test --coverage
You can then browse locally to $REPO/coverage/lcov-report/index.html
to see Istanbul's report.
Creating a new Producer
requires only a broker, after which, you can push messages into Kafka via the producer.send(topic, partition, messages)
call.
To find the number of partitions for a topic, you can use the producer.partition_count(topic)
call. It's up to the user of Producer
to determine how it wants to distribute any messages across a topic's partitions. The examples in the examples
directory all round-robin messages.
The periodic stats from the Producer report the queue length of messages awaiting transmission via send_queue_length
. This is also reported from each send
call. You can cap the maximum number of locally enqueued messages via the queue_buffering_max_messages
option; attempts to send messages when over this limit will cause the messages to be dropped.
The full librdafka JSON statistics object is returned as rdkafka_stats
, in case you want to parse out further stats.
The Consumer
requires a local directory to record processed message offsets, as well as the broker and topic to use. Once a Consumer
has been created, its start()
method will initiate pulling messages from Kafka, and feeding them back to the configured receive_callback
. The Consumer
will keep trying to pull messages from Kafka in the background, up to the number of kilobytes of memory specified by the queued_max_messages_kbytes
option, so that it can feed the receive_callback
as soon as possible.
When your receive_callback
returns - or if it returns a Promise, when that Promise is fulfilled or rejected - the message offsets will be recorded locally, to ensure that any future Consumer instances don't ask for that message offset again. This information is recorded as a set of files in the offset_directory
, one per topic-partition.
Flow control is possible via the pause()
and resume()
methods. With these methods, you can easily match the rate of pulling new messages from Kafka with your processing rate:
var consumer = new Consumer({
receive_callback: function(data) {
consumer.pause();
return process_messages(data.messages)
.finally(function() {
consumer.resume();
});
}
});
For Kafka topics with multiple partitions, you may want multiple consumers processing the topic. At Consumer
initialization time, via the num_workers
and worker_slot
options, you may configure the Consumer to work with only a subset of the available partitions. See examples/node-cluster.js
for a working example that uses the node cluster module to automatically create several consumer processes.
The Consumer
will report periodic stats if the stats_callback
option is supplied:
waiting_kafka
: The number of messages that are sitting at the Kafka broker waiting to be pulled.waiting_local
: The number of messages that have been pulled from kafka, and are sitting in local memory, but have not yet been handed to the caller's receive_callback
.kafka_log_size
: The number of messages sitting in Kafka's log storage for the Consumer's partitions. This number does include messages that have already been pulled and processed, so its mostly useful to ensure your broker message expiration parameters are working as you expect.Note that the Consumer
stats report only on their slice of the topic, as configured via the num_workers
and worker_slot
options. That is, if you configure multiple Consumers to split the work on a topic, you'll want to sum/aggregate their statistics in your analytics backend to get a full picture of topic processing.
The full librdafka JSON statistics object is returned as rdkafka_stats
, in case you want to parse out further stats.
Although Kafka can support binary keys and payloads, the consumer will try to stringify any payload received, which won't be useful if you're pushing binary data into a topic. There's currently no way to produce key/payload pairs; the Producer's send call only takes an array of payloads at this time.
The librdkafka version in use doesn't currently support any interaction with Zookeeper, nor is its broker offset commit functionality useful. This is why the Consumer
manages its offsets locally.
var producer = new Producer(options);
options:
stats_callback(info)
info:
delivery_report_callback(info)
info:
producer.send(topic, partitions, payloads)
producer.partition_count(topic)
producer.stop(options)
var consumer = new Consumer(options);
options:
console
.receive_callback(info)
info:
stats_callback(info)
info:
waiting_kafka
: The number of messages that are sitting at the Kafka broker waiting to be pulled.waiting_local
: The number of messages that have been pulled from kafka, and are sitting in local memory, but have not yet been handed to the caller's receive_callback
.kafka_log_size
: The number of messages sitting in Kafka's log storage for the Consumer's partitions. This number does include messages that have already been pulled and processed, so its mostly useful to ensure your broker message expiration parameters are working as you expect.consumer.start()
consumer.pause()
Prevents further receive_callback invocations until a consumer.resume() call.
consumer.resume()
Re-allows receive_callback invocations after using consumer.pause().
consumer.stop()
The wrapper library and addon are licensed for distribution by the MIT License.
The repository also includes a copy of the librdkafka library (from https://github.com/edenhill/librdkafka) which is licensed as specified in deps/librdkafka/LICENSE.