tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 528 forks source link

Throwing error out of eachBatch/eachMessage causes all concurrent partitions to restart processing their offset #945

Open hughlivingstone opened 4 years ago

hughlivingstone commented 4 years ago

Describe the bug We are usingpartitionsConsumedConcurrently: 3so we concurrently consume messages. However we are finding that when we throw an error out of our eachBatch/eachMessage handler it causes all partitions to start processing the messages again even though the partitions that did not fail are still being processed.

It leads to us processing the same message multiple times in parallel. The log output of a test we ran illustrates it below. We are also seeing this same error in our production environment.

2020-11-06 13:32:23 info BEGIN processing message0 on partition 0
2020-11-06 13:32:23 info BEGIN processing message1 on partition 1
2020-11-06 13:32:23 info BEGIN processing message2 on partition 2
2020-11-06 13:32:26 info END Sleeping message0 on partition 0
  console.error node_modules/kafkajs/src/loggers/console.js:15
    {"level":"ERROR","timestamp":"2020-11-06T13:32:26.802Z","logger":"kafkajs","message":"[Runner] Error when calling eachMessage","topic":"hugh","partition":0,"offset":"0","stack":"Error: TEST SHOULD RETRY\n    at Runner.eachMessage (/Users/hughlivingstone/Dev/code/packages/testing-tools/integration-tests/kafka.integration.test.ts:82:17)\n    at Runner.processEachMessage (/Users/hughlivingstone/Dev/code/packages/testing-tools/node_modules/kafkajs/src/consumer/runner.js:185:9)\n    at onBatch (/Users/hughlivingstone/Dev/code/packages/testing-tools/node_modules/kafkajs/src/consumer/runner.js:321:9)\n    at /Users/hughlivingstone/Dev/code/packages/testing-tools/node_modules/kafkajs/src/consumer/runner.js:373:15","error":{}}

2020-11-06 13:32:26 info BEGIN processing message2 on partition 2
2020-11-06 13:32:26 info BEGIN processing message1 on partition 1
2020-11-06 13:32:28 info END processing message1 on partition 1
2020-11-06 13:32:29 info END processing message2 on partition 2
2020-11-06 13:32:31 info END processing message1 on partition 1
2020-11-06 13:32:32 info END processing message2 on partition 2

I have attached a file below that we used to produce a similar output to above. kafka.concurrency.test.ts.zip

It is typescript and you will need to be running a kafka container and configure the host/port appropriately

I have also attached a log with debug enabled. test-output.log

To Reproduce

  1. Run a producer to produce to 3 different partitions on a topic
  2. Run a consumer that subscribes to that topic
  3. throw an error in the eachBatch handler for one of the messages
  4. You will see the consumer start processing all 3 messages from the start immediately after the error is thrown. This happens even though the 2 that did not fail are still being processed.

Expected behavior The message processing on the partition where the error was throw should retry, the other 2 partitions should not start being processed again

Observed behavior You will see the consumer start processing all 3 messages from the start immediately after the error is thrown. This happens even though the 2 that did not fail are still being processed.

Environment:

Additional context Add any other context about the problem here.

t-d-d commented 3 years ago

@Nevon @ankon Looking at the code I believe I can see what the issue is here. I can propose a fix but before I do I want to confirm what the desired behaviour actually is. When an error is thrown from eachBatch (or from a broker fetch request) is it correct that we need to: 1) wait for any remaining fetch requests to complete (or at least dump any returned batches) 2) wait for any currently executing eachBatch calls to complete 3) abandon processing of any more batches 4) re-throw the error ?

Currently the code doesn't seem to do 1-3 - the barrier just resolves on the first error

arrudacaio commented 2 years ago

help us!!!