Open destumme opened 2 months ago
If you're using 'enable.auto.commit': true
(the default) then you can just call disconnect right away and that'll take care of committing too.
However, in the consumer-flow.md example, it's set to false. This is more complex, and there seem to be issues in the library code that make it a little more convoluted than it needs to be.
Ideally, we would want to handle the final commit in the rebalance callback, but due to what seems to be a bug, committing in the rebalance callback doesn't seem to work as it should.
So I recommend something like this for now, where myAssignment is the topic/partition assignment owned by you.
process.once('SIGINT', () => {
if (myAssignment.length > 0) {
consumer.pause(myAssignment);
try {
consumer.commitSync();
} catch (e) {
if (e.code !== Kafka.CODES.ERRORS.ERR__NO_OFFSET) {
throw e;
} else { /* no - problem, the no offsets stored error is informative in nature */ }
}
}
consumer.disconnect();
});
However, this isn't sufficient in case you're manually committing - in case you encounter a rebalance for some reason, you need to commit during the rebalance callback, too. For this reason, with manual committing, you need to commit in the rebalance callback too, whenever partitions are revoked.
I've provided a complete example below with some comments:
let myAssignment = [];
let endingConsumeLoop = false;
var consumer = new Kafka.KafkaConsumer({
// ...
'enable.auto.commit': false,
rebalance_cb: function(err, assignment) {
console.log("Rebalance callback: ");
if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
console.log("Assigned partitions: ", assignment);
myAssignment = assignment;
consumer.assign(assignment);
} else if (err.code === Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
console.log("Revoked partitions: ", assignment);
// We want to commit in case we're revoking partitions except in 2 cases:
// 1. either we're terminating the consumer, in which case commit doesn't work, or,
// 2. the assignment is lost rather than a proper revoke (this indicates some error)
if (!endingConsumeLoop && !consumer.assignmentLost())
consumer.commitSync(); // Commit without arguments to commit all stored offsets - should add error handling here as below
consumer.unassign();
myAssignment = [];
}
}
});
var topicName = 'test-topic';
//logging debug messages, if debug is enabled
consumer.on('event.log', function(log) { /* ... */ });
//logging all errors
consumer.on('event.error', function(err) { /* ... */ });
//counter to commit offsets every numMessages are received
var counter = 0;
var numMessages = 5;
// Start the consume loop
consumer.on('ready', function(arg) { /* ... */ });
consumer.on('data', function(m) { /* ... */ });
consumer.on('disconnected', function(arg) {
console.log('consumer disconnected. ' + JSON.stringify(arg));
});
process.once('SIGINT', () => {
endingConsumeLoop = true;
consumer.pause(myAssignment);
try {
consumer.commitSync();
} catch (e) {
if (e.code !== Kafka.CODES.ERRORS.ERR__NO_OFFSET) {
throw e;
} else { /* no - problem, the error message about no stored offsets is just informative */ }
}
consumer.disconnect();
});
consumer.connect();
This will need to be changed to use incremental assign/unassign in case cooperative-sticky assignor is used.
I will be filing another issue to allow calling commitSync from the final rebalance callback so there will be no need for pausing, and we can just use the rebalance callback to commit the final set of offsets.
Very much appreciate the in depth reply, this looks similar to the direction I was heading. Appreciate the guidance.
This is more a question than a bug report or issue.
Using the node-rdkafka methods forked into this repo, what is the proper way to stop a consumer that has started flowing via registering an
on('data')
callback and beginning the consume loop with theconsume()
method (no params)? I am trying to achieve a graceful shutdown, so the consumer still needs to process and commit any messages still in memory, (processing one at a time, calling commitMessage at the end of processing).Between,
pause()
,unassign()
, andunsubscribe()
I'm not really sure what the recommendation is here. Really I just need to stop the consume loop itself, not actually unassign or unsubscribe the consumer, as I still need to be able to commit the messages being worked on before I call disconnect on the consumer.Put more plainly, given the consumer-flow.md example, what would be the recommended approach to implementing a graceful shutdown on the consumer (finishing all messages that have started processing) if that example program were to register an event listener for SIGINT