Closed posquit0 closed 5 years ago
When the retries are exhausted, KafkaJS simply re-throws the error, which means that it will bubble up to your own eachMessage
. This means that you can add your own error handling there. Simply process your message in a try-catch. In the catch, send the message to your DLQ and don't re-throw the error. Just make sure that if you fail to send to the DLQ, you re-throw the error so that KafkaJS doesn't commit the offset for that message and re-processes it after restarting,
For example:
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
await processMessage(message)
} catch (e) {
try {
console.warn('Failed to process message, sending to DLQ', { topic, partition, offset: message.offset, error: e })
await producer.send({
topic: DLQ_TOPIC,
messages: [message]
})
} catch (e) {
console.error('Failed to send message to dead letter queue', { error: e })
// When failing to send the message to the DLQ, we re-throw so that we don't
// commit the offset, and instead re-consume the message
throw e
)
}
},
})
Thanks for the reply. :)
I'm confused a little bit. Is it possible to reuse retries
logic which kafkajs
provides before sending the message to the DLQ.
I want to do the following logic with a message which will be failed.
processMessage
is failed to process message
retries
times which is configured.I think that kafkajs
's retries
logic couldn't be used in your solution.
Please let me know if I have a misunderstanding.
I understand where the confusion is coming from. KafkaJS does not retry any error that is thrown from within your message processing - only errors that happen internally to KafkaJS. If your eachMessage
throws because you have some issue inside it, that does not get retried by KafkaJS.
You can try it yourself quite easily:
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
throw new Error('💣')
},
})
If you run that, you'll see an error message saying: Error when calling eachMessage
, along with the topic, partition and offset, but it won't do any retries. It will just crash and restart the consumer.
If you want to retry your own message processing before giving up and sending the message to your DLQ, you'll have to build that yourself within your eachMessage
handler. We don't export the retry functionality, so you'll have to do it yourself though. This seems like a reasonable approach if you don't want to roll your own.
Okay! I understand well. Thanks for your support. I'll try with async-retry
module.
By the way, the consumer was crashed after retry when I tried your above logic which just throw Error
in eachMessage
.
await this.consumer.run({
eachMessage: async event => {
throw new Error('hello');
}
}
{"level":"ERROR","timestamp":"2019-03-13T16:03:31.819Z","logger":"kafkajs","message":"[Runner] Error when calling eachMessage","topic":"auth-login","partition":0,"offset":"720","stack":"Error: hello\n at Runner.eachMessage (/[REDACTED]/application.js:113:15)\n at Runner.processEachMessage (//[REDACTED]//node_modules/kafkajs/src/consumer/runner.js:141:20)
\n at Runner.fetch (//[REDACTED]//node_modules/kafkajs/src/consumer/runner.js:236:20)\n at process._tickCallback (internal/process/next_tick.js:68:7)"}
{"level":"ERROR","timestamp":"2019-03-13T16:03:32.138Z","logger":"kafkajs","message":"[Runner] Error when calling eachMessage","topic":"auth-login","partition":0,"offset":"720","stack":"Error: hello\n at Runner.eachMessage (//[REDACTED]//lib/application.js:113:15)\n at Runner.processEachMessage (/[REDACTED]//node_modules/kafkajs/src/consumer/runner.js:141:20)
\n at Runner.fetch (/[REDACTED]/node_modules/kafkajs/src/consumer/runner.js:236:20)\n at process._tickCallback (internal/process/next_tick.js:68:7)"}
{"level":"ERROR","timestamp":"2019-03-13T16:03:32.757Z","logger":"kafkajs","message":"[Runner] Error when calling eachMessage","topic":"auth-login","partition":0,"offset":"720","stack":"Error: hello\n at Runner.eachMessage (/[REDACTED]/kafkoa/lib/application.js:113:15)\n at Runner.processEachMessage (/[REDACTED]/node_modules/kafkajs/src/consumer/runner.js:141:20)
\n at Runner.fetch (/[REDACTED]/node_modules/kafkajs/src/consumer/runner.js:236:20)\n at process._tickCallback (internal/process/next_tick.js:68:7)"}
{"level":"ERROR","timestamp":"2019-03-13T16:03:34.110Z","logger":"kafkajs","message":"[Runner] Error when calling eachMessage","topic":"auth-login","partition":0,"offset":"720","stack":"Error: hello\n at Runner.eachMessage (/[REDACTED]/lib/application.js:113:15)\n at Runner.processEachMessage (/[REDACTED]/node_modules/kafkajs/src/consumer/runner.js:141:20)
\n at Runner.fetch (/[REDACTED]/node_modules/kafkajs/src/consumer/runner.js:236:20)\n at process._tickCallback (internal/process/next_tick.js:68:7)"}
{"level":"ERROR","timestamp":"2019-03-13T16:03:37.001Z","logger":"kafkajs","message":"[Runner] Error when calling eachMessage","topic":"auth-login","partition":0,"offset":"720","stack":"Error: hello\n at Runner.eachMessage (/[REDACTED]/lib/application.js:113:15
That's a common request; it's similar to #69
but like @Nevon said, you can achieve this on the eachMessage
or eachBatch
level, where you wrap your message processing with the DLQ logic.
@posquit0 you can take a look at our retrier (https://github.com/tulios/kafkajs/tree/master/src/retry)
I'm a little surprised by this behaviour. As @posquit0 points out, if eachMessage throws, then it is indeed subject to the retry mechanism as set on the main client object. Is this the intended behaviour? I actually want different retry logic for my eachMessage code. UPDATE but looking at the code, it seems you can pass in retry settings to the consumer that override the global settings. So all good.
@Nevon when eachMessage is called with a message, can we assume there are no network errors or partial data up until that point? Meaning that message body and headers are definitely same with the data in broker?
when eachMessage is called with a message, can we assume there are no network errors or partial data up until that point?
You cannot get partial data in your callbacks. However, your question is unrelated to this thread. Github Issues is not the place for support. Try StackOverflow or the Slack community in the future.
I want to implement my own DLQ(Dead Letter Queue). When
retries=5
, the consumer retry.eachMessage
function 5 times and restart if the message handler throws the error.I want to implement my own handler by catching NumberOfRetriesExceed event. My own handler will have a logic that produces a message to a specific topic and commits the offset instead of restarting the Kafka client.
How can I catch the event?