SOHU-Co / kafka-node

Node.js client for Apache Kafka 0.8 and later.
MIT License
2.66k stars 627 forks source link

HighLevelConsumer Introduces Lag #86

Closed brenley closed 9 years ago

brenley commented 10 years ago

I'm using an offset checker for monitoring the performance of Kafka and the consumer process (saving the message to a DB) - the tool I'm using is http://quantifind.com/KafkaOffsetMonitor/ but the same issue is found when using the command line tool that is packaged with kafka: ./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker.

The issue is that the tool will always show a "lag" value and I think it might be due to differences in the interpretation of how the offset should be used with the High Level Consumer versus the Java API functionality. This package looks like it does the following:

  1. fetchOffset => topics (with starting offsets, initially at -1)
  2. updateOffsets(topics) => adds 1 to value found in each topic
  3. fetch => sends the topicPayloads modified above as the request

When a message comes through, the offset is attempted to be set to a value beyond the current message.

e.g.

// the message
{
   ...
   offset: 2
}

// okay great now set the offset ahead by 1
consumer.setOffset(message.topic, message.partition, message.offset + 1);

When commit gets called, this line sets the offset back: https://github.com/SOHU-Co/kafka-node/blob/master/lib/highLevelConsumer.js#L412

This package appears to work quite well (despite not being production ready) as far as getting the messages and keeping track of the previous value while handling rebalances, but the lag value will always show up as at least 1 because the logSize will initially be 1 on the first message (this counts from 0), but the package always holds back because it expects to add to the offsets retrieved from Zookeeper before reading.

The java-based High Level Consumer API doesn't exhibit this behavior in terms of the lag though I'm not all to familiar with Java so unfortunately I can't say whether the code follows the approach of trying to read from the current offset set in ZK (this would be my assumption).

jezzalaycock commented 10 years ago

I too use the kafkaOffsetMonitor and have noted the lag. Functionally if you produce 10 messages 10 are consumed, so the lag is simply a difference in base (0 or 1). Changing to be 1 based and not 0 would also involve updates to the fetchOffset request. Again the same code is used in both consumers types

dmitrig01 commented 9 years ago

just ran into this as well – it'd be great to document this at least. changing would be difficult/impossible because of the upgrade path. I figured it was probably an off-by-one thing, but it would be great to have it written down somewhere :-)

haio commented 9 years ago

The lag should be removed in the latest version