Open SeanReece opened 3 months ago
Thanks for the feedback!
We don't yet plan to implement the on
stuff in the near future. Using await
on the connect method is identical to consumer.on('ready' ...), since internally, we're using that lifecycle event to determine when the connect promise resolves.
What states are you looking to track? I can try and help you with those particular cases (for instance, for rebalance, we have made a rebalance callback available), and think if a more generalized approach is possible.
Thanks for the quick response.
We need to know the ongoing health / connectivity for the clients. For example, if our consumer gets removed from the group, or disconnects for some reason, our service would report this to Kubernetes as unhealthy and our service will be gracefully shutdown.
Specifying rebalance_cb won't work since as soon as that callback is specified then we lose the default rebalancing behaviour, and need to implement it ourselves (we could, but that's overkill when all you want is the current state).
We're doing something like this with KafkaJS. Basically, just need to know if the consumer is connected or not.
export function isConsumerHealthy() {
return consumerConnected || rebalancing
}
const { CONNECT, DISCONNECT, REBALANCING, GROUP_JOIN } = kafkaConsumer.events
kafkaConsumer.on(CONNECT, async (e) => {
consumerConnected = true
rebalancing = false
})
kafkaConsumer.on(GROUP_JOIN, async () => {
consumerConnected = true
rebalancing = false
})
kafkaConsumer.on(DISCONNECT, (e) => {
consumerConnected = false
rebalancing = false
})
kafkaConsumer.on(REBALANCING, (e) => {
rebalancing = true
})
It would be much easier to just be able to access the state
of the consumer/producer/admin client. But events do give us a chance to react to state changes if necessary.
Any reason why there's no plans to expose events in the KafkaJS api? It's supported through the rd-kafka API so it doesn't seem too difficult to implement.
Make state
a public property getter on consumer/producer/admin.
kafkaConsumer.state === ConsumerStates.CONNECTED
Expose isConnected() etc from internalClient
kafkaProducer.isConnected()
Expose internalClient eventEmitter passthrough
kafkaConsumer.on('rebalancing', () => {})
Could be implemented by just passing the listener to internalClient
without having to expose internalClient
, I don't think we need to support event parity with KafkaJS, so just exposing KafkaConsumerEvents
would be fine.
Maybe something like this
// lib/kafkajs/_consumer.js
// Sorry for the typescript :)
on(event: KafkaConsumerEvents, listener: EventListener<KafkaConsumerEvents>) {
return #internalConsumer.on(event, listener)
}
Hey @SeanReece, we're generally very slow to expose any public interface because it means that we'll have to support it forever (we go as far as to deprecate something, but removing it completely is something and breaking something that previously worked is almost impossible).
In general, the idea of re-emitting events actually does make sense, because one can get them through the node-rdkafka API already. It's probably possible to emit a subset of the 'on' events, and throw an error if someone tries to register a function for an unsupported event type.
I'll discuss this internally and see if we can possibly expose such an API. We also need a way to expose client-level errors, which isn't present at the moment, so maybe we could use this for that reason too.
Two things specifically for your case though:
sessionTimeout
(default 30) seconds between the start of the rebalance and the assignment of partitions. You can specify rebalance_cb
in the KafkaJS consumer interface without needing to implement rebalancing, I've been testing this for a while. As long as you return undefined
, the client will take that as a signal you aren't changing the assignment and will use the default implementation, or whatever partitionAssigners you configured.
In my rebalance_cb, i apply a small debounce and then emit an event to higher levels of the system
First of all, I just want to say how happy I am to see confluent release an official JS client. Love the idea to make it compatible with KafkaJS/node-rdkafka. :+1:
Now for the issue
I don't see a way to get the current state / health of a consumer or producer using the KafkaJS api.
When using the node-rdkafka api, it looks like I can hook into lifecycle events using
consumer.on('ready',...)
etc.But when using the KafkaJS api, the
.on
implementation is not implemented, and thestate
is marked private. With KafkaJS we were previously hooking into these lifecycle events to keep track of the consumer state, which we used for our Kubernetes readiness probes.Is there any plan to implement events on the KafkaJS api?