tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 527 forks source link

Pause and resume while throwing error in eachMessage callback #1592

Closed unematiii closed 8 months ago

unematiii commented 1 year ago

According to documentation, we can pause and resume consuming per topic and/or partition (to manage pressure for external services). I'm following this example:

await consumer.run({ eachMessage: async ({ topic, message }) => {
    try {
        await sendToDependency(message)
    } catch (e) {
        if (e instanceof TooManyRequestsError) {
            consumer.pause([{ topic }])
            setTimeout(() => consumer.resume([{ topic }]), e.retryAfter * 1000)
        }

        throw e
    }
}})

Which works fine (pause and resume part), except that the thrown errors accumulate in terms of retry count over the lifetime of a consumer, causing the consumer to crash when retries in client retry options has been reached. Now, I don't want to increase retries as this is something that will affect ALL errors. As far as I understand, I still need to re-throw error in eachMessage callback, to prevent from committing last offset etc.

Has anyone got an idea, how to keep consumer running no-matter how many times I throw specific error in eachMessage callback (e.g. TooManyRequestsError)?

unematiii commented 1 year ago

Probably related: https://github.com/tulios/kafkajs/issues/1415

unematiii commented 1 year ago

Today I learned this behavior actually affects all errors, and counter never resets. Here's a sample log of 5 errors over roughly 1.5 hours (4 of them of the same cause) that cause consumer to crash:

[
  {
    "log": "INFO [2023-06-27 13:30:21]: [ConsumerGroup] Consumer has joined the group\n    logger: \"kafkajs\"\n    groupId: \"example-group-id\"\n    memberId: \"example-project-2bb16b4d-42d8-4622-85dd-f7a60d730b62\"\n    leaderId: \"example-project-2bb16b4d-42d8-4622-85dd-f7a60d730b62\"\n    isLeader: true\n    memberAssignment: {\n      \"topic.name\": [\n        0,\n        1,\n        10,\n        100,\n        101,\n        102,\n        103,\n        104,\n        105,\n        106,\n        107,\n        108,\n        109,\n        11,\n        110,\n        111,\n        112,\n        113,\n        114,\n        115,\n        116,\n        117,\n        118,\n        119,\n        12,\n        120,\n        121,\n        122,\n        123,\n        124,\n        125,\n        126,\n        127,\n        128,\n        129,\n        13,\n        130,\n        131,\n        132,\n        133,\n        134,\n        135,\n        136,\n        137,\n        138,\n        139,\n        14,\n        140,\n        141,\n        142,\n        143,\n        144,\n        145,\n        146,\n        147,\n        148,\n        149,\n        15,\n        150,\n        151,\n        152,\n        153,\n        154,\n        155,\n        156,\n        157,\n        158,\n        159,\n        16,\n        160,\n        161,\n        162,\n        163,\n        164,\n        165,\n        166,\n        167,\n        168,\n        169,\n        17,\n        170,\n        171,\n        172,\n        173,\n        174,\n        175,\n        176,\n        177,\n        178,\n        179,\n        18,\n        180,\n        181,\n        182,\n        183,\n        184,\n        185,\n        186,\n        187,\n        188,\n        189,\n        19,\n        190,\n        191,\n        192,\n        193,\n        194,\n        195,\n        196,\n        197,\n        198,\n        199,\n        2,\n        20,\n        21,\n        22,\n        23,\n        24,\n        25,\n        26,\n        27,\n        28,\n        29,\n        3,\n        30,\n        31,\n        32,\n        33,\n        34,\n        35,\n        36,\n        37,\n        38,\n        39,\n        4,\n        40,\n        41,\n        42,\n        43,\n        44,\n        45,\n        46,\n        47,\n        48,\n        49,\n        5,\n        50,\n        51,\n        52,\n        53,\n        54,\n        55,\n        56,\n        57,\n        58,\n        59,\n        6,\n        60,\n        61,\n        62,\n        63,\n        64,\n        65,\n        66,\n        67,\n        68,\n        69,\n        7,\n        70,\n        71,\n        72,\n        73,\n        74,\n        75,\n        76,\n        77,\n        78,\n        79,\n        8,\n        80,\n        81,\n        82,\n        83,\n        84,\n        85,\n        86,\n        87,\n        88,\n        89,\n        9,\n        90,\n        91,\n        92,\n        93,\n        94,\n        95,\n        96,\n        97,\n        98,\n        99\n      ]\n    }\n    groupProtocol: \"RoundRobinAssigner\"\n    duration: 3039\n"
  },
  {
    "log": "ERROR [2023-06-27 13:30:26]: [Connection] Connection timeout\n    logger: \"kafkajs\"\n    broker: \"b-1.example.kafka.eu-west-1.amazonaws.com:9096\"\n    clientId: \"example-project\"\n"
  },
  {
    "log": "ERROR [2023-06-27 13:41:49]: [Connection] Response OffsetCommit(key: 8, version: 5)\n    logger: \"kafkajs\"\n    broker: \"b-2.example.kafka.eu-west-1.amazonaws.com:9096\"\n    clientId: \"example-project\"\n    correlationId: 41885\n    size: 81\n    error: \"The request timed out\"\n"
  },
  {
    "log": "ERROR [2023-06-27 14:03:24]: [Connection] Response OffsetCommit(key: 8, version: 5)\n    logger: \"kafkajs\"\n    broker: \"b-2.example.kafka.eu-west-1.amazonaws.com:9096\"\n    clientId: \"example-project\"\n    correlationId: 118226\n    size: 81\n    error: \"The request timed out\"\n"
  },
  {
    "log": "ERROR [2023-06-27 14:22:27]: [Connection] Response OffsetCommit(key: 8, version: 5)\n    logger: \"kafkajs\"\n    broker: \"b-2.example.kafka.eu-west-1.amazonaws.com:9096\"\n    clientId: \"example-project\"\n    correlationId: 184000\n    size: 81\n    error: \"The request timed out\"\n"
  },
  {
    "log": "ERROR [2023-06-27 14:50:55]: [Connection] Response OffsetCommit(key: 8, version: 5)\n    logger: \"kafkajs\"\n    broker: \"b-2.example.kafka.eu-west-1.amazonaws.com:9096\"\n    clientId: \"example-project\"\n    correlationId: 311554\n    size: 81\n    error: \"The request timed out\"\n"
  },
  {
    "log": "ERROR [2023-06-27 14:52:47]: [Consumer] Crash: KafkaJSNumberOfRetriesExceeded: The request timed out\n    logger: \"kafkajs\"\n    groupId: \"example-group-id\"\n    retryCount: 5\n    stack: \"KafkaJSNonRetriableError\\n  Caused by: KafkaJSProtocolError: The request timed out\\n    at createErrorFromCode (/app/node_modules/kafkajs/src/protocol/error.js:581:10)\\n    at Object.parse (/app/node_modules/kafkajs/src/protocol/requests/offsetCommit/v0/response.js:36:11)\\n    at Connection.send (/app/node_modules/kafkajs/src/network/connection.js:433:35)\\n    at runMicrotasks (<anonymous>)\\n    at processTicksAndRejections (node:internal/process/task_queues:96:5)\\n    at async Broker.[private:Broker:sendRequest] (/app/node_modules/kafkajs/src/broker/index.js:904:14)\\n    at async Broker.offsetCommit (/app/node_modules/kafkajs/src/broker/index.js:494:12)\\n    at async OffsetManager.commitOffsets (/app/node_modules/kafkajs/src/consumer/offsetManager/index.js:264:7)\\n    at async ConsumerGroup.commitOffsets (/app/node_modules/kafkajs/src/consumer/consumerGroup.js:435:5)\\n    at async onBatch (/app/node_modules/kafkajs/src/consumer/runner.js:457:7)\"\n"
  }
]

So I suppose, I should set the retries option to some very large number, to keep the consumer running?