tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 526 forks source link

Consumer leaves group prematurely on disconnect #751

Open andrewreineke opened 4 years ago

andrewreineke commented 4 years ago

Describe the bug DISCLAIMER: This could 100% be abuse/misuse of the library, and I'd be happy if this was a case of RTFM, so hopefully someone can point out something I'm doing obviously wrong 😜

I'm attempting to use a combination of eachBatch and commitOffsetsIfNecessary in order to commit resolved offsets in the face of a rebalance (detected by try-catching heartbeat() and matching err.type === "REBALANCE_IN_PROGRESS").

This works well until consumer.disconnect() is called, at which point the OffsetCommit request fails because the consumer has already issued LeaveGroup. (The coordinator is not aware of this member)

This results in frequent, unnecessary redeliveries on rebalances.

I believe this is due to a race condition on the runner.consuming bit, but I'm not positive.

Below is a repo with more detailed code/repro steps, but the pared down version of my eachBatch function is:

handleBatch: async (args) => {
    const { batch } = args;
    const { topic, partition } = batch;

    const tryCommit = () =>
      args.commitOffsetsIfNecessary(args.uncommittedOffsets());

    for (const message of batch.messages) {
      if (!args.isRunning()) {
        break;
      }

      try {
        await handleMessage(message)
        args.resolveOffset(message.offset);
      } catch (err) {
        await tryCommit();
        throw err;
      }

      try {
        await args.heartbeat();
      } catch (err) {
        if (err.type === "REBALANCE_IN_PROGRESS") {
          await tryCommit();
        }
        throw err;
      }
    }

    await tryCommit();
  }

To Reproduce https://github.com/andrewreineke/kjs-rebalance-test (Setup/repro steps in README)

Expected behavior When gracefully disconnecting such a consumer, the resolved offsets should be committed successfully on teardown.

Observed behavior When consumer.disconnect() is called, the OffsetCommit request fails because the consumer has already issued LeaveGroup. (The coordinator is not aware of this member)

{"level":"ERROR","timestamp":"2020-06-09T02:41:23.648Z","logger":"kafkajs","message":"[Connection] Response OffsetCommit(key: 8, version: 3)","broker":"kafka.cluster.local:31090","clientId":"test","error":"The coordinator is not aware of this member","correlationId":27,"size":41}

Environment:

Additional context My understanding of what's happening is:

Why is this.consuming === false? I manually instrumented runner.js with a getter/setter with logging to understand what's going on. I found that after a rebalance occurs and new partition assignments are received, the flag is being set to true and then immediately set false. (Log excerpt below).

I think what's happening is this.consuming = false in the finally block of runner.scheduleFetch() is a promise microtask that gets scheduled after this.consuming = true at the top of scheduleFetch() when scheduleFetch() is fire-and-forget invoked in the catch block

Processing:  { partition: 4, offset: '16' }
Processing:  { partition: 4, offset: '17' }
Processing:  { partition: 4, offset: '18' }
Processing:  { partition: 4, offset: '19' }
{"level":"DEBUG","timestamp":"2020-06-09T02:40:56.575Z","logger":"kafkajs","message":"[Connection] Request Heartbeat(key: 12, version: 1)","broker":"kafka.cluster.local:31090","clientId":"test","correlationId":9,"expectResponse":true,"size":75}
{"level":"ERROR","timestamp":"2020-06-09T02:40:56.578Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 1)","broker":"kafka.cluster.local:31090","clientId":"test","error":"The group is rebalancing, so a rejoin is needed","correlationId":9,"size":10}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:56.578Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 1)","broker":"kafka.cluster.local:31090","clientId":"test","error":"The group is rebalancing, so a rejoin is needed","correlationId":9,"payload":{"type":"Buffer","data":"[filtered]"}}
HEARTBEAT ERROR - Partition 4 - Offset: 19 KafkaJSProtocolError: The group is rebalancing, so a rejoin is needed
    at createErrorFromCode (/Users/andrew/src/rebalance-test/node_modules/kafkajs/src/protocol/error.js:537:10)
    at Object.parse (/Users/andrew/src/rebalance-test/node_modules/kafkajs/src/protocol/requests/heartbeat/v0/response.js:20:11)
    at Connection.send (/Users/andrew/src/rebalance-test/node_modules/kafkajs/src/network/connection.js:311:35)
    at processTicksAndRejections (internal/process/task_queues.js:97:5)
    at async Broker.heartbeat (/Users/andrew/src/rebalance-test/node_modules/kafkajs/src/broker/index.js:314:12)
    at async ConsumerGroup.heartbeat (/Users/andrew/src/rebalance-test/node_modules/kafkajs/src/consumer/consumerGroup.js:310:7)
    at async Object.heartbeat (/Users/andrew/src/rebalance-test/node_modules/kafkajs/src/consumer/runner.js:223:11)
    at async Runner.handleBatch [as eachBatch] (/Users/andrew/src/rebalance-test/handle-batch.js:54:9)
    at async Runner.processEachBatch (/Users/andrew/src/rebalance-test/node_modules/kafkajs/src/consumer/runner.js:199:7)
    at async onBatch (/Users/andrew/src/rebalance-test/node_modules/kafkajs/src/consumer/runner.js:300:9) {
  retriable: false,
  helpUrl: 'https://kafka.js.org/docs/faq#what-does-it-mean-to-get-rebalance-in-progress-errors',
  type: 'REBALANCE_IN_PROGRESS',
  code: 27
}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:56.583Z","logger":"kafkajs","message":"[Connection] Request OffsetCommit(key: 8, version: 3)","broker":"kafka.cluster.local:31090","clientId":"test","correlationId":10,"expectResponse":true,"size":124}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:56.586Z","logger":"kafkajs","message":"[Connection] Response OffsetCommit(key: 8, version: 3)","broker":"kafka.cluster.local:31090","clientId":"test","correlationId":10,"size":41,"data":{"throttleTime":0,"responses":[{"topic":"rebalance.test.30","partitions":[{"partition":4,"errorCode":0}]}]}}
{"level":"ERROR","timestamp":"2020-06-09T02:40:56.586Z","logger":"kafkajs","message":"[Runner] The group is rebalancing, re-joining","groupId":"group.30","memberId":"test-8a384dfd-4d38-4ea6-9222-a62ca0a050c5","error":"The group is rebalancing, so a rejoin is needed","retryCount":0,"retryTime":354}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:56.586Z","logger":"kafkajs","message":"[Connection] Request GroupCoordinator(key: 10, version: 1)","broker":"kafka.cluster.local:31090","clientId":"test","correlationId":11,"expectResponse":true,"size":29}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:56.588Z","logger":"kafkajs","message":"[Connection] Response GroupCoordinator(key: 10, version: 1)","broker":"kafka.cluster.local:31090","clientId":"test","correlationId":11,"size":45,"data":{"throttleTime":0,"errorCode":0,"errorMessage":"NONE","coordinator":{"nodeId":0,"host":"kafka.cluster.local","port":31090}}}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:56.596Z","logger":"kafkajs","message":"[Cluster] Found group coordinator","nodeId":0}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:56.596Z","logger":"kafkajs","message":"[Connection] Request JoinGroup(key: 11, version: 2)","broker":"kafka.cluster.local:31090","clientId":"test","correlationId":12,"expectResponse":true,"size":146}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:57.821Z","logger":"kafkajs","message":"[Connection] Response JoinGroup(key: 11, version: 2)","broker":"kafka.cluster.local:31090","clientId":"test","correlationId":12,"size":124,"data":{"throttleTime":0,"errorCode":0,"generationId":18,"groupProtocol":"RoundRobinAssigner","leaderId":"test-b1988d0e-39ec-47ac-bd8a-198c925c07a4","memberId":"test-8a384dfd-4d38-4ea6-9222-a62ca0a050c5","members":[]}}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:57.821Z","logger":"kafkajs","message":"[Connection] Request SyncGroup(key: 14, version: 1)","broker":"kafka.cluster.local:31090","clientId":"test","correlationId":13,"expectResponse":true,"size":79}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:57.832Z","logger":"kafkajs","message":"[Connection] Response SyncGroup(key: 14, version: 1)","broker":"kafka.cluster.local:31090","clientId":"test","correlationId":13,"size":55,"data":{"throttleTime":0,"errorCode":0,"memberAssignment":{"type":"Buffer","data":[0,1,0,0,0,1,0,17,114,101,98,97,108,97,110,99,101,46,116,101,115,116,46,51,48,0,0,0,2,0,0,0,1,0,0,0,4,0,0,0,0]}}}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:57.832Z","logger":"kafkajs","message":"[ConsumerGroup] Received assignment","groupId":"group.30","generationId":18,"memberId":"test-8a384dfd-4d38-4ea6-9222-a62ca0a050c5","memberAssignment":{"rebalance.test.30":[1,4]}}
{"level":"INFO","timestamp":"2020-06-09T02:40:57.832Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"group.30","memberId":"test-8a384dfd-4d38-4ea6-9222-a62ca0a050c5","leaderId":"test-b1988d0e-39ec-47ac-bd8a-198c925c07a4","isLeader":false,"memberAssignment":{"rebalance.test.30":[1,4]},"groupProtocol":"RoundRobinAssigner","duration":1246}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:57.833Z","logger":"kafkajs","message":"[Runner] SETTING CONSUMING=true","stack":"Error\n    at Runner.set consuming [as consuming] (/Users/andrew/src/rebalance-test/node_modules/kafkajs/src/consumer/runner.js:53:58)\n    at /Users/andrew/src/rebalance-test/node_modules/kafkajs/src/consumer/runner.js:341:24\n    at retry (/Users/andrew/src/rebalance-test/node_modules/kafkajs/src/retry/index.js:43:5)\n    at /Users/andrew/src/rebalance-test/node_modules/kafkajs/src/retry/index.js:61:5\n    at new Promise ()\n    at Runner.retrier (/Users/andrew/src/rebalance-test/node_modules/kafkajs/src/retry/index.js:58:10)\n    at Runner.scheduleFetch (/Users/andrew/src/rebalance-test/node_modules/kafkajs/src/consumer/runner.js:339:17)\n    at /Users/andrew/src/rebalance-test/node_modules/kafkajs/src/consumer/runner.js:365:16\n    at processTicksAndRejections (internal/process/task_queues.js:97:5)"}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:57.833Z","logger":"kafkajs","message":"[Runner] SETTING CONSUMING=false","stack":"Error\n    at Runner.set consuming [as consuming] (/Users/andrew/src/rebalance-test/node_modules/kafkajs/src/consumer/runner.js:53:58)\n    at /Users/andrew/src/rebalance-test/node_modules/kafkajs/src/consumer/runner.js:404:24\n    at processTicksAndRejections (internal/process/task_queues.js:97:5)"}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:57.833Z","logger":"kafkajs","message":"[Connection] Request OffsetFetch(key: 9, version: 3)","broker":"kafka.cluster.local:31090","clientId":"test","correlationId":14,"expectResponse":true,"size":63}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:57.836Z","logger":"kafkajs","message":"[Connection] Response OffsetFetch(key: 9, version: 3)","broker":"kafka.cluster.local:31090","clientId":"test","correlationId":14,"size":69,"data":{"throttleTime":0,"responses":[{"topic":"rebalance.test.30","partitions":[{"partition":1,"offset":"193","metadata":"","errorCode":0},{"partition":4,"offset":"20","metadata":"","errorCode":0}]}],"errorCode":0}}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:57.836Z","logger":"kafkajs","message":"[ConsumerGroup] Fetching from 2 partitions for 1 out of 1 topics","topics":["rebalance.test.30"],"activeTopicPartitions":[{"topic":"rebalance.test.30","partitions":[1,4]}],"pausedTopicPartitions":[]}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:57.837Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 7)","broker":"kafka.cluster.local:31090","clientId":"test","correlationId":15,"expectResponse":true,"size":122}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:57.844Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 7)","broker":"kafka.cluster.local:31090","clientId":"test","correlationId":15,"size":5437,"data":"[filtered]"}
Processing:  { partition: 4, offset: '20' }
{"level":"DEBUG","timestamp":"2020-06-09T02:40:58.105Z","logger":"kafkajs","message":"[Connection] Request Heartbeat(key: 12, version: 1)","broker":"kafka.cluster.local:31090","clientId":"test","correlationId":16,"expectResponse":true,"size":75}
{"level":"DEBUG","timestamp":"2020-06-09T02:40:58.106Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 1)","broker":"kafka.cluster.local:31090","clientId":"test","correlationId":16,"size":10,"data":{"throttleTime":0,"errorCode":0}}

Possible Solution

I have verified that the CommitOffsets request succeeds when wrapping all recursive calls to scheduleFetch() in setImmediate(() => this.scheduleFetch()), which seems to ensure that the finally block completes before touching this.consuming on the next iteration. But I'm not sure this is the optimal solution (and maybe this is working as intended per the intent of runner.consuming and I'm misinterpreting)

Nevon commented 4 years ago

This is an interesting observation, which caused me to have to re-think some of my assumptions regarding how the join/sync barrier works, so thanks for that.

To set the scene, the way that we've thought about rebalances and the offset commit mechanism is that as soon as a rebalance is necessary, we drop all ongoing work and re-join and sync as soon as possible. Essentially we regarded rebalances as a hard barrier where the best thing was to get the group into sync as soon as possible. As you noted, this can lead to unnecessary reprocessing of messages, and while we knew there were things we could do to improve this, we didn't think the additional complexity was with it. What we were primarily thinking about was ways to continue from the same offset where you left off, assuming that you were still assigned the same partitions after a rebalance, but without committing.

The tricky thing about committing after a rebalance is detected is that you don't know if you are still assigned the same partitions. By the time you realize that the group is rebalancing, the group might have already kicked you out, at which point any commit you make would fail. Now, we could of course handle this, but we didn't design any of the interfaces to accommodate for that, so I don't know off the top of my head if we could do it in a way that makes sense. The Java client seems to do it like this, which I find a bit strange, but then I'm not an expert on the Java client design.

BUT, this is just the first part of the issue, and one that you seem to have handled in user-land (even though I think maybe we should try to handle it inside KafkaJS instead, now that I know it's a possibility).

I think your diagnosis is correct. The finally block will indeed be executed as a microtask, which would get executed only after the next scheduleFetch is synchronously executed. Wrapping all the calls to scheduleFetch in a setImmediate would solve this, as it would then be scheduled after the finally block. It's a little brittle, in my opinion, but I also don't see an immediately (heh) better way of handling it.

Blutude commented 2 years ago

We are having the same issue where we are detecting a rebalance in the middle of processing a batch, and so we commit the last handled offset and exit the batch. We observed that the commit did not take any effect but somehow none of the commit function calls we made threw an error. Maybe a useful change would be to make that function throw if it is not able to commit because of a rebalance