SOHU-Co / kafka-node

Node.js client for Apache Kafka 0.8 and later.
MIT License
2.66k stars 628 forks source link

ConsumerGroup commits wrong offset to partition #1421

Open StanislavMikhailenko opened 4 years ago

StanislavMikhailenko commented 4 years ago

Bug Report

There are total 4 message in 2 partition and one ConsumerGroup client that processes messages after processing the first message offsets are incorrect image

const topicNameTasks = 'smm_cloud_editor_tasks'
const groupId = 'smm_edit_tasks_consumer'
const createConsumer = () => {
  let messageInProcess
  const options = {
    kafkaHost: config.kafkaBroker,
    groupId,
    autoCommit: false,
    fromOffset: 'earliest',
    commitOffsetsOnFirstJoin:false,
  }
  const consumer = new kafka.ConsumerGroup(options, [topicNameTasks])
  consumer.on('message', async message => {
    try {
      if (messageInProcess) {
        return
      }
      messageInProcess = message
      consumer.pause() 
      // await for all other messages are skiped
      await new Promise(resolve => {
        consumer.once('done', resolve)
      })
      const { topic, partition, value, offset } = message   
      consumer.setOffset(topic, partition, offset+1)
      await new Promise((resolve, reject) => {
        consumer.commit(true, err => {
          if (err) {
            reject(`commit offset error for ${err}`)
          } else {
            resolve()
          }
        })
      })
      messageInProcess = undefined
      consumer.resume()     
    } catch (e) {
      messageInProcess = undefined
      console.error(e)
    }
  })

Environment

For specific cases also provide

StanislavMikhailenko commented 4 years ago

Is it a bug? or wrong usage? I expect that the offset should be 1 on first partition and 0 on second