tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.7k stars 524 forks source link

Last 2 messages order swapped while sending message with indempotent:true configuration #1353

Closed karthikn1050 closed 2 years ago

karthikn1050 commented 2 years ago

Describe the bug I'm new to kafka, i am working on sending ordered messages to kafka using kafkajs. i tried to send a sequence of string using for loop to the broker. i have enabled indempotent option and maxflightsrequest:1. when i tried to send that message the last two values get swapped while i receive from consumer. Example: I am sending a sequence of string like "K" "A" "R" "T" "H" "I" "K" "E" "Y" "A" "N". on consumer end i am receiving like "K" "A" "R" "T" "H" "I" "K" "E" "Y" "N" "A" as you can see the last messages swapped in order. i even tried with sending 2 strings only if i send "A" "B" means I am receiving "B" "A". it is happening only when i enabled indempotent:true. otherwise values are coming correctly. i want the indempotent option to prevent duplicate issue. kindly guide me to solve this issue

To Reproduce The code i am using for producer:

const kafka = new Kafka({
  logLevel: logLevel.DEBUG,
  brokers: [`${host}:9092`],
  clientId: 'example-producer',
})

const topic = 'topic-test'
const producer = kafka.producer({
    maxInFlightRequests:1,
    idempotent: true
})
const sendMessage = () => {
var str ='karthikeyan'
    for (var i in str) {
        console.log(i,str)
        console.log(" number = "+str.charAt(i));
     producer
      .send({
        topic,
        compression: CompressionTypes.GZIP,
        messages: [
          { value: `" number = "${str.charAt(i)}`,key:"testt" },

        ],
        acks:-1,

      })
      .then(console.log)
      .catch(e => console.error(`[example/producer] ${e.message}`, e))
    }
  } 
  const run = async () => {
    await producer.connect()
sendMessage()
  }
  run().catch(e => console.error(`[example/producer] ${e.message}`, e))
app.listen(8000);

Expected behavior I want to receive messages in order that i send

Observed behavior Last two messages Swapped

Environment:

Nevon commented 2 years ago

You are not awaiting producer.send() to resolve before doing the next send. Essentially, you are queuing up all requests immediately, so the order that they'll finish is arbitrary.

Doing this properly would look like:

for (const letter of "karthikeyan".split("")) {
  await producer.send({ topic, messages: [{ value: letter, key: "test" }]})
}

This is not anything to do with KafkaJS, but just how async works in Javascript. Consider the following:

const sleep = ms => new Promise(resolve => setTimeout(resolve, ms))
const randomBetween = (min, max) => Math.floor(Math.random() * (max - min + 1) + min)

for (const letter of "Hello world".split("")) {
  sleep(randomBetween(100, 1000)).then(() => console.log(letter))
}

This would log the letters of the world in random order.