Open davidmcote opened 1 week ago
Hey, I wrote a small test with the issue:
Looking into the code, it seems that when the message is added to the Redelivery List, it is still in the pending list. (Also the Timeout is updated)
func TestConsumerNumAckPending(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test"},
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "dur",
AckPolicy: nats.AckExplicitPolicy,
AckWait: 2 * time.Second,
})
require_NoError(t, err)
for i := 0; i < 5; i++ {
_, err := js.Publish("test", nil)
require_NoError(t, err)
}
info, err := js.ConsumerInfo("TEST", "dur")
require_NoError(t, err)
require_Equal(t, info.NumAckPending, 0)
require_Equal(t, info.NumPending, 5)
req := JSApiConsumerGetNextRequest{Batch: 1}
reqb, _ := json.Marshal(req)
sub := natsSubSync(t, nc, nats.NewInbox())
err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb)
require_NoError(t, err)
_ = natsNexMsg(t, sub, time.Second)
info, err = js.ConsumerInfo("TEST", "dur")
require_NoError(t, err)
require_Equal(t, info.NumAckPending, 1)
require_Equal(t, info.NumPending, 4)
checkFor(t, 10*time.Second, 1*time.Second, func() error {
info, err := js.ConsumerInfo("TEST", "dur")
if err != nil {
return err
}
if info.NumAckPending != 0 {
return fmt.Errorf("Expected no ack pending, got %d", info.NumAckPending)
}
if info.NumPending != 5 {
return fmt.Errorf("Expected 5 pending, got %d", info.NumPending)
}
return nil
})
}
What server version?
If their is not a max redelivery set and you are using a pull consumer that never requests any more messages that is correct, num_ack_pending will not go down.
What server version?
The test is failing with the "main" branch.
I saw that the seq is added to the RedeliveryQueue but not removed from Pending, and when it is sent to the consumer, the timeout is set again.
What server version are you running? We did improve the num_pending when the consumer is a pull based consumer and no additional requests for messages are received.
What server version are you running? We did improve the num_pending when the consumer is a pull based consumer and no additional requests for messages are received.
@davidmcote 2.10.18
The test that I posted is failing to current "main"
$ git log -1 | cat
commit 32c26d6bdb30b7ac0f5921fdfa0bf018c285cd3c
Merge: 869d3ad2 695cec9f
Author: Derek Collison <derek@nats.io>
Date: Wed Nov 20 13:09:35 2024 -0800
$ go test -timeout 60s -run ^TestConsumerNumAckPending$ github.com/nats-io/nats-server/v2/server -count=1 -v -race
=== RUN TestConsumerNumAckPending
jetstream_consumer_test.go:2583: Expected no ack pending, got 1
--- FAIL: TestConsumerNumAckPending (10.07s)
FAIL
FAIL github.com/nats-io/nats-server/v2/server 11.087s
FAIL
If the system can try to deliver num ack pending will stay fixed until acked. or max deliveries is hit.
This indeed sounds like it's intended behaviour.
When a message is delivered, for example through the next message call above, it will count toward num ack pending.
If you don't ACK it will take until AckWait
before you get the message redelivered, and the message's metadata will reflect that with a delivery count of 2. During that time the message is still pending acknowledgement and being redelivered to you after the AckWait
, hence why it's counting toward num ack pending.
Num ack pending only goes down if:
MaxAge
(meaning it can't be delivered anymore, as the message is removed).MaxDeliver
set up for the consumer. After the message has hit the maximum amount of deliveries, num ack pending will decrease and this message will not be delivered to you again.I was doing some tests with NakWithDelay. If MaxAckPending = = Msg with NakWithDelay, messages will stay in the pending hashmap, limiting the option to get another message.
func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
// ...
// Check if we have max pending.
if o.maxp > 0 && len(o.pending) >= o.maxp {
// maxp only set when ack policy != AckNone and user set MaxAckPending
// Stall if we have hit max pending.
return nil, 0, errMaxAckPending
}
// ...
I have written a test that is failing in the last line as I was expecting to be allowed to continue consuming other messages:
func TestConsumerNumAckPendingWithNakDelay(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
nc, js := jsClientConnect(t, s)
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test"},
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "dur",
AckPolicy: nats.AckExplicitPolicy,
AckWait: 2 * time.Second,
MaxAckPending: 2,
})
require_NoError(t, err)
for i := 0; i < 5; i++ {
_, err := js.Publish("test", nil)
require_NoError(t, err)
}
info, err := js.ConsumerInfo("TEST", "dur")
require_NoError(t, err)
require_Equal(t, info.NumAckPending, 0)
require_Equal(t, info.NumPending, 5)
req := JSApiConsumerGetNextRequest{Batch: 1}
reqb, _ := json.Marshal(req)
sub := natsSubSync(t, nc, nats.NewInbox())
err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb)
require_NoError(t, err)
msg := natsNexMsg(t, sub, time.Second)
err = msg.NakWithDelay(time.Minute)
require_NoError(t, err)
err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb)
require_NoError(t, err)
msg = natsNexMsg(t, sub, time.Second)
err = msg.NakWithDelay(time.Minute)
require_NoError(t, err)
info, err = js.ConsumerInfo("TEST", "dur")
require_NoError(t, err)
require_Equal(t, info.NumAckPending, 2)
require_Equal(t, info.NumPending, 3)
err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb)
require_NoError(t, err)
// Fails because the server never responds as getNextMsg() returns errMaxAckPending
_ = natsNexMsg(t, sub, time.Second)
}
If MaxAckPending = = Msg with NakWithDelay, messages will stay in the pending hashmap, limiting the option to get another message.
Correct, that's the intended behaviour. When you call next message it will up the num ack pending count, if you then call NAK it will still count for this. A message that's scheduled for re-delivery also requires to be acked/termed at some point, so it counts toward num ack pending. It's still a pending message.
With two slight modifications to your test, it will start to pass:
err = msg.NakWithDelay(4*time.Second)
NakWithDelay
on:
_ = natsNexMsg(t, sub, 5*time.Second)
Could also leave the NakWithDelay
on a minute, and then wait more than a minute on the last next message call, but that will of course take longer to see the same result as with the shorter timeouts above.
If you'd want to get the third message, while the first two messages are scheduled for redelivery after a minute. Then you'll need to up the MaxAckPending
to 3. (If not set explicitly, the default of MaxAckPending
is 1000)
Thank you for the explanation. This issue can be closed if this is the expected behaviour.
Observed behavior
If an application terminates before acking its messages from a durable Jetstream Consumer and no further batches of messages are requested from the Consumer, num_ack_pending in ConsumerInfo never becomes zero.
Expected behavior
num_ack_pending should become zero after the JetStream Consumer's AckWait period expires on all outstanding messages.
Server and client version
nats-server 2.10.18
Host environment
Any. Apple M3 Pro.
Steps to reproduce
1. Create a stream and durable consumer w/ explicit ack & ackwait.
2 Receive a message without acknowledging it.
3 Poll ConsumerInfo
Notice that last activity is greater than the ack wait period yet num_ack_pending still shows there's an outstanding message.
The message is immediately ready for redelivery if another request comes along to pull messages.
It seems the ConsumerInfo counters are only advanced by requests which mutate the underlying stream.
As a workaround, I tried to request a zero-sized batch.
natscli appears to no-op on the client side for such a silly request.
I crafted the batch request manually, but nats-server clamps my batchsize back up to 1 and this counts against the delivery count of the sacrificial message.