tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 527 forks source link

Consumer Disconnect during Rebalance Race Condition and Potential Memory Leak #1074

Open AustinHunt opened 3 years ago

AustinHunt commented 3 years ago

Describe the bug Consumer does not disconnect during a rebalance, or throws an error.

To Reproduce

https://github.com/hailtrace/kafkajs-consumer-race-condition-example

Expected behavior

When multiple services using the same consumer groupId attempt to disconnect in parallel (within +/- 10s of each-other), they should all disconnect without throwing any errors.

Observed behavior

  1. Many consumers will disconnect without any issues.
  2. Some (in the test linked above) will throw an error 'ERR_STREAM_WRITE_AFTER_END'.
  3. In our more complex production implementation, some will not actually disconnect (they retain a connection).

The third case is a bit complex to replicate in a test project. The difference between that and the test project we listed, is that we are spinning up a "contact consumer" inside of a "command consumer" in order to stream an entire data set for every "command" that comes into the "command consumer." In this the "contact consumer" fails to disconnect about 80% of the time.

  1. The contactConsumer.disconnect() is called in parallel (32 instances in our case) within +/- 10s.
  2. The contactConsumer.disconnect() call resolves and the code continues execution.
  3. The contactConsumer logs out rebalancing logs after the contactConsumer.disconnect() has been resolved.
  4. When you look at the consumer group in Conduktor (or log it out in the console) you can see that about 20% of the consumers do not disconnect.

This more complex memory leak issue may be different than the one above, but I tend to think they are the same issue.

In both cases all the consumers and producers in the service are using the same Kafka() instance.

Environment: See example project. We are running in Node Alpine.

Additional context We've had a conversation about the issue with @Nevon and committed to get him an example.

Nevon commented 3 years ago

Thanks for this. This is very interesting, and I think ERR_STREAM_WRITE_AFTER_END is an important clue. This tells us that the stream has been closed, but something is still trying to write to it. It's great that your example doesn't use authentication, because it rules out what my initial hunch would have been, which is that the two-step SASL authentication handshake doesn't take disconnects in between the phases into account properly.

There are a few other issues that I think are the same as this. I'm gonna go ahead and link them all up to this one, so we can try to keep a single issue open for this.

iAlex97 commented 3 years ago

Hello, I tried migrating one of our services from sqs to kafka and upon testing it with a larger number of messages I begun observing a similar pattern. A node cluster app with worker processes which consume messagfes from kafka deployed on a pretty standard kubernetes cluster.

Due to kubernetes deployment and autoscaling, consumers are constantly being added/removed. Observing logs from an app which was running, I saw worker processes exiting from SIGKILL sporadically from time to time:

{"message":"worker 12568 exit normally from SIGKILL","level":"info","workerId":0,"env":"prod"}

followed by a SIGABRT from exhausting the heap memory:

{"level":"ERROR","timestamp":"2021-09-14T21:10:04.516Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 3)","broker":"kafka04.soleadify.com:9093","clientId":"soleadify-web-crawler","error":"The group is rebalancing, so a rejoin is needed","correlationId":48,"size":10}

<--- Last few GCs --->

[13108:0x42a6e00]   613833 ms: Scavenge 1961.1 (1968.2) -> 1960.3 (1968.5) MB, 23.1 / 0.0 ms  (average mu = 0.379, current mu = 0.475) allocation failure 
[13108:0x42a6e00]   616435 ms: Mark-sweep 1961.2 (1968.5) -> 1960.2 (1968.2) MB, 2598.7 / 0.1 ms  (average mu = 0.326, current mu = 0.281) allocation failure scavenge might not succeed
[13108:0x42a6e00]   616492 ms: Scavenge 1961.2 (1968.2) -> 1960.4 (1968.5) MB, 54.3 / 0.0 ms  (average mu = 0.326, current mu = 0.281) allocation failure 

<--- JS stacktrace --->

==== JS stack trace =========================================

    0: ExitFrame [pc: 0x140a8f9]
    1: StubFrame [pc: 0x1391661]
Security context: 0x0202ac0808d1 <JSObject>
    2: decodeMessages [0x107a115242e9] [/app/node_modules/kafkajs/src/protocol/requests/fetch/v4/decodeMessages.js:~11] [pc=0xe0cf2098f2f](this=0x3b97e2402301 <JSGlobal Object>,0x1889f2fbfa21 <Decoder map = 0x20e842e13bc9>)
    3: /* anonymous */ [0x6821db51471](this=0x3b97e2402301 <JSGlobal Object>,0x06821db514d1 <Object map = 0x...

FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory
 1: 0xa18150 node::Abort() [/usr/local/bin/node]
 2: 0xa1855c node::OnFatalError(char const*, char const*) [/usr/local/bin/node]
 3: 0xb9715e v8::Utils::ReportOOMFailure(v8::internal::Isolate*, char const*, bool) [/usr/local/bin/node]
 4: 0xb974d9 v8::internal::V8::FatalProcessOutOfMemory(v8::internal::Isolate*, char const*, bool) [/usr/local/bin/node]
 5: 0xd54755  [/usr/local/bin/node]
 6: 0xd54de6 v8::internal::Heap::RecomputeLimits(v8::internal::GarbageCollector) [/usr/local/bin/node]
 7: 0xd616a5 v8::internal::Heap::PerformGarbageCollection(v8::internal::GarbageCollector, v8::GCCallbackFlags) [/usr/local/bin/node]
 8: 0xd62555 v8::internal::Heap::CollectGarbage(v8::internal::AllocationSpace, v8::internal::GarbageCollectionReason, v8::GCCallbackFlags) [/usr/local/bin/node]
 9: 0xd6500c v8::internal::Heap::AllocateRawWithRetryOrFail(int, v8::internal::AllocationType, v8::internal::AllocationOrigin, v8::internal::AllocationAlignment) [/usr/local/bin/node]
10: 0xd2ba2b v8::internal::Factory::NewFillerObject(int, bool, v8::internal::AllocationType, v8::internal::AllocationOrigin) [/usr/local/bin/node]
11: 0x106dffe v8::internal::Runtime_AllocateInYoungGeneration(int, unsigned long*, v8::internal::Isolate*) [/usr/local/bin/node]
12: 0x140a8f9  [/usr/local/bin/node]
{"message":"worker 13108 exit normally from SIGABRT","level":"info","workerId":0,"env":"prod"}

Testing locally showed that if I start a single worker process which is the only consumer in its group, memory consumption is constant. However, if I joined this process to the existing kubernetes group which is constantly re-balancing, memory began spiking up after a consumer re-join.

I also recorded a 200mb heap snapshot which took about 10 minutes to process on fairly new hardware (core i7, 16gb ram), but I couldn't trace the exact retention cause for the largest object. Let me know if that might help debug this issue.

Env:

iAlex97 commented 3 years ago

I also tested against the latest beta version and the previous version 1.14.0 with the same results

emimarz commented 2 years ago

still happen 2022, any news?

iAlex97 commented 2 years ago

While we migrated that particular project to node-rdkafka, we are still looking at implementing kafkajs@2.0 on apps with few clients and little traffic due to it being js only.

joerg-walter-de commented 2 years ago

It seems that I have the same problem with v2.2.0.

I have multiple consumers and lately a client per consumer. I am doing multiple concurrent disconnect()'s.

Am am getting errors like {"level":"WARN","timestamp":"2022-09-01T12:15:11.464Z","logger":"kafkajs","message":"[Runner] The group is rebalancing, re-joining","groupId":"C9C4A4A8-92B6-4A4C-9EF3-7CCE4E26A1BF","memberId":"XXXX","error":"The group is rebalancing, so a rejoin is needed"} for consumers I just and apparently successfully disconnected.