Closed GlenTiki closed 8 years ago
Added a commit which fixes #11, you can now leave the partition empty when calling .poll()
and the callback will automatically poll data for EACH partition.
Example:
var kafkaesque = require('kafkaesque');
var consumer = kafkaesque({
brokers: [
{host: 'localhost', port: 9093}
],
clientId: 'fish',
group: 'multiconsume',
maxBytes: 2000000
});
consumer.tearUp(function() {
consumer.poll({topic: 'my-replicated-partitioned-topic'}, poll);
var i = 0;
function poll(err, kafka) {
var consumer = i++;
console.log('*** in here ', consumer);
if (err) console.log('error', err);
kafka.on('message', function(offset, message, commit) {
console.log('received msg for consumer of partition: ' + consumer + '.', 'offset: ' + offset + '.', message.value);
commit();
});
kafka.on('debug', console.log.bind(null, 'debug ' + consumer));
kafka.on('error', function(error) {
console.log('error', JSON.stringify(error));
});
}
});
Say 'my-replicated-partitioned-topic' has 3 partitions, then the poll
function/callback will be called 3 times, where the kafka
EventEmitter object is PER partition. This allows you to poll all partitions in a topic.
@pelger this needs test automation and some more tests I think? I have no access to pull this in.
@thekemkid Whats done LGTM but as above I feel a few more tests and at least reporting to travis is needed.
Ah, this is an outdated pr. Closing and opening a new one, sorry
👍
Fixes #10. Fixes #11.
This commit is the base of my work so far, some refactoring is included and the base file in which I will write tests for specific scenarios.
This commit fixes a bug where a single consumer couldn't consume several messages which had arrived in its buffer at once. I now loop over the buffer and clean it out before moving on. This fixes #10.