tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.7k stars 524 forks source link

HeartbeatInterval not respected for parallel heartbeat calls #1025

Open Nevon opened 3 years ago

Nevon commented 3 years ago

If you make multiple calls to heartbeat in parallel, the heartbeatInterval will not be respected. For example:

await Promise.all([
  heartbeat(),
  heartbeat(),
  heartbeat(),
])

This will cause 3 hearbeat requests to be made, even though after the first one succeeds, the rest should be skipped since the last heartbeat was < heartbeatInterval ms ago. This could be an issue if you for example heartbeat after each message in a batch with parallel message processing, and you end up making 50 heartbeats at the same time.

https://github.com/tulios/kafkajs/blob/e1a9d9eecd4cfd29e719f1c02cae6e544df0894a/src/consumer/consumerGroup.js#L386-L401

We'll need to add a lock before aquiring the value of lastRequest.

XBeg9 commented 3 years ago

@Nevon looks like debouncing should be implemented here, what do you think?

Nevon commented 3 years ago

Kind of, but it's not just a matter of a regular debounce, as there are some additional concerns to keep in mind:

  1. We should still not resolve the promises until one has resolved.
  2. If the promise rejects, should we reject all waiting promises as well, or let the next one try (I'm leaning towards the latter).
  3. We should only update the lastRequest if the heartbeat actually succeedeed.

I think it's as simple as just adding a lock on line 389 and then releasing it after you either succeed or fail to heartbeat. We already have the lock functionality built, so it's just a matter of dropping that in.

t-d-d commented 3 years ago

I have an implementation for this as part of a POC I'm working on for independent broker fetches.

or let the next one try

I didn't go with this - I let them all fail. (They actually all share the same pending promise.) I'll raise a draft PR for discussion.

tulios commented 3 years ago

We should only update the lastRequest if the heartbeat actually succeeded.

This is already the case @Nevon

If the promise rejects, should we reject all waiting promises as well, or let the next one try (I'm leaning towards the latter).

In this case, it doesn't matter. One failed heartbeat will not break the consumer. In the next event, a new heartbeat will happen, so I would say that the best case is to reject all and wait for the next attempt.

I think @t-d-d PR can work, or a lock as you mentioned previously.

t-d-d commented 3 years ago

Yes, in this case (for the heartbeat) a lock would work as there is an interval and subsequent calls will short-circuit and not heartbeat. In the POC I'm working on, I actually use the 'shared promise' pattern in other places as well (refreshMetaData, joinAndSync.) So I guess I am making the case to add it to utils as it is a useful abstraction.

hermanator608 commented 2 years ago

Hey all, it looks like @t-d-d 's implementation to fix this bug has been merged to master. What is the cadence for releasing a new version of Kafkajs with this fix? Thanks in advance