Open koresar opened 8 years ago
This error was not triggered by a consumer which logged the error, it happened in your handler. It is unclear what your handler does, is it producer or anything else and which call triggered the error.
One thing is clear though - you don't catch errors in your handler and let them flow into the consumer which has no idea what to do with your application errors.
Pitching in on this since I can generate the same error during a rolling upgrade of the kafka brokers. Even if it is a problem in the consumer, I'm keen on helping to improve the docs so it can be avoided in the future.
The error
11/22/2016 1:02:39 PM2016-11-22T12:02:39.155Z WARN test-consumer Handler for test-topic:40 failed with { [NoKafkaConnectionError: Error: read ECONNRESET]
11/22/2016 1:02:39 PM name: 'NoKafkaConnectionError',
11/22/2016 1:02:39 PM server: '10.42.78.217:9092',
11/22/2016 1:02:39 PM message: 'Error: read ECONNRESET' }
Consumer code
'use strict';
var kafka = require('no-kafka');
var Consumer = function() {
this.consumer = new kafka.GroupConsumer({
groupId: 'alpine-consumer',
clientId: 'test-consumer',
connectionString: process.env.KAFKA_BROKERS
});
var self = this;
var dataHandler = function(messageSet, topic, partition) {
var messagePromises = messageSet.map(function(message) {
var data = message.message.value.toString('utf8');
console.log(topic, partition, message.offset, data);
return self.consumer.commitOffset({
topic: topic,
partition: partition,
offset: message.offset
});
});
return Promise.all(messagePromises);
};
this.strategies = [
{
strategy: kafka.RoundRobinAssignment,
subscriptions: ['test-topic'],
handler: dataHandler
}
];
};
Consumer.prototype.run = function() {
return this.consumer.init(this.strategies);
};
Consumer.prototype.close = function() {
return this.consumer.end();
};
module.exports = Consumer;
We're using Kafka 0.10, but I don't know enough about its details to say if that makes a difference. A restart of the consumer picks up from the right offset again. It appears that messages still arrive at the consumer but that the commitOffset
is failing (i.e., console.log
is still executed in the dataHandler
).
commitOffset
?Also experiencing failures during commitOffset
when there're connection issues. The cause error is RebalanceInProgress
, however a few moments before that I do see connectivity issues as well:
2018-04-01T11:32:01.979Z ERROR no-kafka-client Full rejoin attempt failed: NoKafkaConnectionError [msc-k
fk3a.42.xxxx:6667]: Error: connect ECONNREFUSED 1.1.1.1:6667
2018-04-01T11:32:01.975Z ERROR no-kafka-client Sending heartbeat failed: NoKafkaConnectionError [msc-kf
k3a.42.xxxx:6667]: Error: read ECONNRESET
{ [NoKafkaConnectionError: Error: read ECONNRESET]
name: 'NoKafkaConnectionError',
server: 'msc-kfk3a.42.xxxx:6667',
message: 'Error: read ECONNRESET' }
[at apply (/app/node_modules/no-kafka/node_modules/lodash/lodash.js:482:27)]
In my case commitOffset
result promise is returned from the handler function as API states, but somehow I get unhandled rejection
event.
@oleksiyk any thoughts?
Thanks!
@hugebdu what exactly is unhandled rejection? ECONNRESET?
@oleksiyk Since you have disabled stack traces, I had to wrap the original error with my custom exception just to understand what's going on there.
This is the nested error from no-kafka
:
Caused By: KafkaError: RebalanceInProgress: Returned in heartbeat requests when the coordinator has begun rebalancing the group. This indicates to the client that it should rejoin the group.
Are you sure the commitOffset
call is not rejected:
https://github.com/oleksiyk/kafka/blob/master/lib/group_consumer.js#L277
I'm pretty sure it is rejected :)
I just don't understand whether there's anything special I need to do about it (since my handler returns that promise, I'd expect no-kafka
to handle rejections) and then why do I see unhandled rejections.
no-kafka
does not handle rejections returned by the subscription handler.
But I think it should probably retry commitOffset
internally few times.
I think I saw somewhere three retries.
But then how should I handle those? According to API my handler has to return commitOffsets
result.
These were retries no-kafka was trying to wait for Kafka to rebalance and register in a group. Your commitOffset
call came up right during the rebalance. And it was rejected.
According to API my handler has to return commitOffsets result.
No, it hasn't. Your handler has to return a Promise, any promise.
But then how should I handle those?
As I said above, no-kafka
will probably retry commitOffset
internally.
@oleksiyk then how come I get unhandled rejections?
I guess there's some kind of state machine in your lib, according to kafka protocol. Might there be a chanсe that at some state (caused by either rebalancing or connection issues) rejected promises returned by the handler are not handled with catch
?
There are 3 Kafka nodes we are running in production currently. We have applied rolling updates to kafka systems and kafka-broker-3 was made the primary for some topics temporarily (during the reboot of another instance). However once the partitions were re-balanced back onto the other brokers all our services are still trying to work with partitions that are no longer primary on the kafka-broker-3 system.
The first two server reboots on kafka worked fine as our services were using kafka-broker-3. When kafka-broker-3 was rebooted thats when the problems started occurring.
We think this might be a no-kafka bug.
The current topology for "TOPIC-1" is:
This partition is on kafka-broker-3 and it has issues...
This partition is NOT on kafka-broker-3 and is has issues
=====
To fix the issue we had to go and restart all our services. This is quite painful to do for microservice architectures.
It appears the no-kafka library ideally should retry for a little while to reconnect but if that fails, go back and perform a full connection setup again. In our case we had topology changes due to maintenance on the kafka side that change which partitions were host where.