apache / pulsar-client-node

Apache Pulsar NodeJS Client
https://pulsar.apache.org/
Apache License 2.0
148 stars 86 forks source link

Consumer.acknowledge() not registered in client #385

Open fhomam opened 6 months ago

fhomam commented 6 months ago

After receiving a message, our script makes an API call through axios to process the pulsar message. consumer.receive() and the processing logic runs in a while loop until receive() times out when the queue is empty.

It appears if we run acknowledge() prior to the API call, the client behaves correctly, that is messages are acked and receive does not receive any more messages once all messages are acked.

But running ack after the API call, the desired way to ack the message, it seems as receive() goes in a loop and keeps receiving messages already acked. However if we quit the script and re-run it, the queue is indeed empty and no messages are received, similarly peaking the messages through pulsar-admin for the given topic-partition while the script is running confirm messages are acked.

BewareMyPower commented 6 months ago

Could you add a simple example to reproduce? You mean that the following loop does not work?

  while (true) {
    const msg = await consumer.receive();
    process(msg); // process is the API call you mentioned
    consumer.acknowledge(msg);
  }

but the following loop work?

  while (true) {
    const msg = await consumer.receive();
    consumer.acknowledge(msg);
    process(msg); // process is the API call you mentioned
  }
fhomam commented 6 months ago

The order of execution you've provided above is correct, with await calls all around. The API calls are to an LLM service. I noticed the issue doesn't appear when the API returns relatively faster in one script than the other. I just tried this again by setting receiverQueueSize to 1, that didn't seem to solve the issue.

But trying again and setting ackTimeoutMs to 0, from an earlier value of 10000, does seem to resolve the issue and ack works as expected and the while loop terminates as expected with: "Error: Failed to receive message: TimeOut"

axios: 1.6.8 pulsar-client: 1.10.0 pulsar (standalone on docker): 3.2.2 node: v20.9.0

Apologies as a sample code is a bit involved, but I hope the above sheds some more light.

BewareMyPower commented 6 months ago

But trying again and setting ackTimeoutMs to 0, from an earlier value of 10000, does seem to resolve the issue

Interesting. I will take a further look.