narcisoguillen / kafka-node-avro

ISC License
26 stars 13 forks source link

Seek to end when creating consumer #5

Closed alaamoustafabakr closed 4 years ago

alaamoustafabakr commented 5 years ago

Hi, I want to read new messages from the topic, i.e. not all previously unconsumed messages but only messages that arrive after the creation of the ConsumerGroup. how can i set offset to latest message in topic for each partition by any supported options?

narcisoguillen commented 5 years ago

Hey thanks for using kafka-node-avro ! ... yeah , if I'm not wrong you could set that up with fromOffset and commitOffsetsOnFirstJoin and by making sure the consumers are part of the same groupId

let myConsumer = addConsumer("my-topic", {
  groupId : 'group-x',
  fromOffset : 'latest',
  commitOffsetsOnFirstJoin: true
});

Hope this helps.

alaamoustafabakr commented 5 years ago

I were test this solution and unfortunately it doesn't work, still getting unconsumed messages before creating consumerGroup.

i tried another solution by getting offset from consumergroup and set offset after that still getting unconsumed messages, but it's workaround

const offset = consumerGroup.consumer.getOffset();
offset.fetchLatestOffsets([topic], (err, offsets) => {
    if (err) {
        return;
    }
    Object.keys(offsets).forEach( topic => {
        Object.keys(offsets[topic]).forEach( partition => {
            consumerGroup.consumer.setOffset(topic, partition, offsets[topic][partition]);
        });
    });
    listenOnConsumer(consumerGroup, callback, err_callback);
});