tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.74k stars 525 forks source link

Response without match #257

Closed jafri closed 5 years ago

jafri commented 5 years ago

Seeing lots of these errors

{"level":"WARN","timestamp":"2019-02-04T15:33:30.509Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"client","broker":"localhost:29092","correlationId":145}

This is the command

          this.producer.send({
            topic: 'messages',
            idempotent: true,
            messages: [
              { value: JSON.stringify(message) }
            ],
          })

Sending 100K of these messages, and interestingly enough, I end up with anywhere between 110- 120K messages (retried 5 times).

tulios commented 5 years ago

Hi @jafri, interesting. Are you using 1.5.0-beta.1? And can you add here how are you posting these messages, is it a Promise.all, sequential, etc?

Thanks.

jafri commented 5 years ago

Its a for loop using 1.5.0-beta.1 + multiple messages at a time.

My newer tests show that messages are delivered properly when timeout is increased to 60 seconds.

I thought idempotent mode handles timeouts and prevents duplicates?


for (let messages of messageBox) {
              this.producer.send({
                topic: 'messages',
                idempotent: true,
                messages: [
                  ...messages.map(message => ({ value: JSON.stringify(message) }))
                ],
              })
}
tulios commented 5 years ago

I think this is a bug, the request queue should ignore timeouts when using idempotent. Can you increase the timeout for now, I will get this fixed on the next version (1.5.0.beta.2).

tulios commented 5 years ago

Looking at your code now, can you add await to the send calls? This way you are generating a lot of concurrent promises

jafri commented 5 years ago

I will look into it, but as the NodeJS app can easily handle the concurrent promises, I do believe there is an issue somewhere with the experimental idempotency guarantee.

tulios commented 5 years ago

So, I ran some tests, and it looks fine. Since you are not awaiting send you are spawning a huge amount of promises at once (100k) and completely overloading the system. I used the example producer in the repo to test:

const run = async () => {
  await producer.connect()
  for (let i = 0; i < 100000; i++) {
    await Promise.all([sendMessage(), sendMessage()])
  }
}

run().catch(e => console.error(`[example/producer] ${e.message}`, e))

sendMessage generates between 10 to 1000 messages, so I end up with ~1M published messages.

[ { topicName: 'topic-test',
    partition: 1,
    errorCode: 0,
    baseOffset: '161592',
    logAppendTime: '-1' },
  { topicName: 'topic-test',
    partition: 4,
    errorCode: 0,
    baseOffset: '159825',
    logAppendTime: '-1' },
  { topicName: 'topic-test',
    partition: 2,
    errorCode: 0,
    baseOffset: '162346',
    logAppendTime: '-1' },
  { topicName: 'topic-test',
    partition: 5,
    errorCode: 0,
    baseOffset: '161405',
    logAppendTime: '-1' },
  { topicName: 'topic-test',
    partition: 0,
    errorCode: 0,
    baseOffset: '161060',
    logAppendTime: '-1' },
  { topicName: 'topic-test',
    partition: 3,
    errorCode: 0,
    baseOffset: '161235',
    logAppendTime: '-1' } ]

But I agree that even if it overloads the system, it should work. I will check the Java implementation to see if they also ignore the timeouts when using idempotent producers. It shouldn't be a problem in real-world scenarios, but I will take a look anyway. Thanks.

tulios commented 5 years ago

I checked the java client, and the request timeout is used even for idempotent producers. My opinion is the same here; you just generated too many promises on the same tick. We are still looking at this to see if we can make some improvements, but for now, I will close this issue. Feel free to re-open.