oleksiyk / kafka

Apache Kafka 0.9 client for Node
MIT License
297 stars 85 forks source link

Use of Promise.each() inside GroupConsumer dataHandler #258

Closed johncmunson closed 4 years ago

johncmunson commented 4 years ago

A little background... I'm going through a codebase on a project I joined and cleaning up some tech debt.

In this codebase, a GroupConsumer is being used, but I noticed the setup isn't at all like what's shown in your example.

Here's your example...

var dataHandler = function (messageSet, topic, partition) {
    return Promise.each(messageSet, function (m){
        console.log(topic, partition, m.offset, m.message.value.toString('utf8'));
        // commit offset
        return consumer.commitOffset({topic: topic, partition: partition, offset: m.offset, metadata: 'optional'});
    });
};

and here is the pattern we are using...

var dataHandler = function (messageSet, topic, partition) {
  messageSet.forEach(m => {
    somePromiseFn(m)
      .then(() => {
        consumer.commitOffset({topic: topic, partition: partition, offset: m.offset, metadata: 'optional'})
      })
      .catch(handleErr)
  })
}

The questions I have...

Any advice would be greatly appreciated. Great library btw, thank you for all the hard work on it.

oleksiyk commented 4 years ago

Is there a concrete reason we should be using Promise.each(), which waits for each promise to resolve before starting the next, like is shown in you example?

The reason is commitOffset. If you start in parallel it will be possible you will commit offset say 11 while message with offset 7 is still being processed. And then In case of fatal error or improper shutdown you won't be able to restart at proper offset.

So, should the error be handled locally, or should it be allowed to bubble up?

Your application should fully handle its own errors. no-kafka doesn't know what to do with application errors.

Our dataHandler has no return value. Do you see any downside to this?

No problem.

johncmunson commented 4 years ago

Thanks so much for the input. Regarding the error handling, we would of course handle the error eventually. I'm just wondering whether it should be handled inside of Promise.each(), or whether it should bubble up and therefore cause Promise.each() to abort. Below is an example of what I'm talking about...

// Option 1: Handle errors locally, allowing Promise.each() to fully iterate through messageSet
var dataHandler = function (messageSet, topic, partition) {
  return Promise.each(messageSet, async function (m) {
    try {
      // await some async work, which might throw an error
      return consumer.commitOffset({topic: topic, partition: partition, offset: m.offset, metadata: 'optional'})
    } catch (error) {
      handleError(error)
    }
  })
}

In Option 1, should consumer.commitOffset be moved to just after the try/catch, so that it is called regardless of an error? In the example above, Promise.each will fully iterate through messageSet, but commitOffset isn't guaranteed to be called every time.

// Option 2: Handle errors outside of Promise.each, causing Promise.each to break it's iteration
var dataHandler = async function (messageSet, topic, partition) {
  try {
    await Promise.each(messageSet, async function (m) {
      // await some async work, which might throw an error
      return consumer.commitOffset({topic: topic, partition: partition, offset: m.offset, metadata: 'optional'})
    })
  } catch (error) {
    handleError(error)
  }
}

My apologies, I realize these are more general Kafka topics I'm trying to understand and not necessarily specific to your library.

oleksiyk commented 4 years ago

I prefer to commit offset only if message was successfully processed. I prefer that application be designed in such way that in case of error that bubbled up to no-kafka message handler it must report the problem to the monitoring and maybe even exit because its definitely something very serious happened.