Open aikar opened 6 years ago
To give some example on what I have done in my implementation that I want to see in the library directly:
const consumer = this._consumer = new ConsumerGroupStream({
kafkaHost: this._kafkaHost,
groupId: this._groupId,
sessionTimeout: consumerConfig.getIntConfig('session-timeout', 30000),
protocol: consumerConfig.getConfig('protocol', ['roundrobin']),
autoCommit: false,
autoCommitIntervalMs: 1000, // This is still used for batching in non force mode
heartbeatInterval: consumerConfig.getIntConfig("heartbeat", 3000),
onRebalance: wrapErrors((isMember, cb) => this._onRebalance(consumer, isMember, cb), this._logger, "Rebalance"),
}, this._topics);
consumer.consumerGroup.on('connect', this._onConnect.bind(this, consumer));
consumer.on('error', this._onError.bind(this, consumer));
consumer.on('data', this._onMessage.bind(this, consumer))
_onMessage(consumer: ConsumerGroupStream, msg: KafkaMessage) {
this.logger.debug("Message", msg);
let queue = this._queue.get(msg.topic, msg.partition);
if (!queue) {
queue = [];
this._queue.set(msg.topic, msg.partition, queue);
}
queue.push(msg);
this._processQueue(msg.topic, msg.partition);
}
async _onRebalance(consumer: ConsumerGroupStream, isMember: boolean, cb: Function) {
if (consumer.consumerGroup.closing) {
// Can potentially fire during close, I don't know why
return;
}
try {
createPendingResolvable();
await this.emit("rebalance");
if (isMember) {
this.logger.info("Rebalancing");
// Clear and wait for any pending event dispatches
await this._clearAndWaitForQueue();
// Now commit anything in the queue
const commit = resolvable();
consumer.commit(null, true, commit.callback);
await commit;
}
} catch (e) {
this.logger.error("ErrorKafkaRebalance", e);
} finally {
cb();
}
}
async _clearAndWaitForQueue() {
// Stop any more events from firing
this._queue.clear();
// Await on any pending event dispatches and commit them.
for (const [t, p, pending] of this._pendingEvents.entries()) {
try {
this.logger.debug("WaitingOnPendingEvent", {t, p});
await pending;
} catch (e) {
this.logger.error("ErrorWaitingOnEventDispatch", e);
}
}
}
_processQueue(topic: string, partition: number) {
if (this.api.isShuttingDown() || !this._consumer) {
return;
}
const pending = this._pendingEvents.get(topic, partition);
if (pending) {
// We have a pending event for this partition. When that one finishes, it will call processQueue
return;
}
const queue = this._queue.get(topic, partition);
if (queue && queue.length) {
const consumer = this._consumer;
this.api.runTask(this._dispatchEvent(queue.shift()).then(() => {
this._processQueue(topic, partition);
}), "KafkaConsumerDispatchEvent").catch(e => {
this.logger.error("ErrorProcessingQueue", {topic, partition}, e);
if (consumer === this._consumer) {
// If we had multiple failures and somethings already kicked the restart
// We don't need to do it again
this.reconnectConsumer();
}
});
} else {
this._queue.delete(topic, partition);
}
}
async _dispatchEvent(msg: KafkaMessage) {
const pendingEvent = resolvable();
this._pendingEvents.set(msg.topic, msg.partition, pendingEvent);
const logger = this.logger;
logger.debug("DispatchEvent", {topic: msg.topic, partition: msg.partition, offset: msg.offset});
try {
let event;
try {
event = JSON.parse(msg.value);
if (!event || !event.event) {
// noinspection ExceptionCaughtLocallyJS
throw new Error("Message did not specify event name");
}
} catch (e) {
logger.severe("DiscardMalformedMessageJson", msg.value, e);
await this._commitMessage(msg);
return;
}
const job = await this.master.dispatchEvent(event);
try {
await this._commitMessage(msg);
job.execute();
} catch (e) {
logger.severe("CommitFailedAbortJob", e);
job.abort();
}
} finally {
this._pendingEvents.delete(msg.topic, msg.partition);
pendingEvent.resolve();
}
}
async _commitMessage(msg: KafkaMessage) {
const done = resolvable();
const commitDebug = {topic: msg.topic, partition: msg.partition, offset: msg.offset};
this.logger.debug("Committing", commitDebug);
// false will batch commits within autoCommitIntervalMs second of each other
this._consumer.commit(msg, true, async (e) => {
if (e) {
this.logger.debug("CommitError", commitDebug);
done.reject(e);
} else {
this.logger.debug("CommitSuccess", commitDebug);
done.resolve();
}
});
await done;
}
Dose it has any plan for the feature of _onRebalance support promise?
@jnotnull ideally it would, but promises aren't the API style used by this library, so I kept that using callbacks for consistency.
I would suggest the next major ver bump also convert all API's to use Promises instead of callbacks.
Though it is possible to wrap callbacks with promises.
This would be a semver major change, which it appears is the agenda for the next release.
ConsumerGroupStream by design is close to what most people want 1) Consume Message 2) Process Message 3) Record Message as processed.
However, in NodeJS, almost everything is asynchronous. I'm struggling to find a way to even see how one would use autocommit in a safe way, and guarantee processing.
I was forced to use ConsumerGroupStream with auto commit false, and then manually call commit once I am done processing a message.
It would be nice if ConsumerGroupStream stopped implementing the Stream interface, and instead simply took a callback that was capable of returning a promise to tell kafka-node when it is done processing a message.
Once the promise resolves, then kafka-node could do the auto commit.
Something I use for Async based event emitter is the events-async package