SOHU-Co / kafka-node

Node.js client for Apache Kafka 0.8 and later.
MIT License
2.66k stars 628 forks source link

I call removeTopics on a consumer, and then add back that topic, I then receive duplicate messages #1452

Open davesargrad opened 3 years ago

davesargrad commented 3 years ago

My code is simple Here I initialize client:

const init = (broker_address) => {
    // console.log("stream_from_server conn: connecting stream ");
    // console.log("stream_from_server init: connecting to broker ", broker_address);
    const client = new kafka.KafkaClient(broker_address);
    const offset = new kafka.Offset(client)
    const consumer = new kafka.Consumer(client, [  ], { autoCommit: false, fromOffset: true });

    return {client: client, offset: offset, consumer: consumer};
}

I use this to join a topic:

const join = (client_offset_consumer, topic, on_msg_cb) => {
    console.log("stream_from_server join: ", topic);
    client_offset_consumer.offset.fetchLatestOffsets([topic], (err, offsets) => {
        if (err) {
            console.log(`error fetching latest offsets ${err}`)
            return
        }
        var latest = 1
        Object.keys(offsets[topic]).forEach( o => {
            latest = offsets[topic][o] > latest ? offsets[topic][o] : latest
        })

        client_offset_consumer.consumer.addTopics([ { topic: topic, partition: 0,  offset: latest } ], (err, added) => {
            console.log("stream_from_server joined: ", added);
            console.log("stream_from_server joined consumer payloads: ", client_offset_consumer.consumer.payloads);
            client_offset_consumer.consumer.on('message',  (message) => {
                console.log("my message from kafka ", "offset - ", message.offset, "highwateroffset - ", message.highWaterOffset);
                on_msg_cb(message);
            });
        }, true);
    })

}

I use this to leave a topic:

const leave = (client_offset_consumer, topic) => {
    console.log("stream_from_server leave: ", topic);
    client_offset_consumer.consumer.removeTopics([ topic ], (err, removed) => {
        console.log("stream_from_server - left: ", removed);
    });
}

In the following experiment I have a topic named "avl-vam". I join it, leave it, join a second non-existing topic "avl-vama", leave that topic, then join the original topic again.

In the following diagnostic you see that I receive three messages (offsets: 865, 866, 867) on the topic before leaving it. image

I then leave that first topic, and join then leave a second topic "avl-vama". image

I then rejoin the original topic "avl-vam".

Then the very next message that arrives (offset 868) is seen in triplicate! image

Each subsequent message is also seen in triplicate. image

If I leave the topic again (call removeTopics), then join a second non-existing topic "avl-vamb", leave that, then rejoin the original topic, then I see five messages at the same offset.

image

This seems like a bug. Please advise.

I am using kafka-node 5.0.0. image

I am using kafka 2.8.0. image