nats-io / stan.go

NATS Streaming System
https://nats.io
Apache License 2.0
706 stars 117 forks source link

acked messages are re-delivered #353

Closed alexec closed 1 year ago

alexec commented 3 years ago
    if sub, err := sc.QueueSubscribe(x.Subject, queueName, func(msg *stan.Msg) {
        if err := f(context.Background(), msg.Data); err != nil {
            // noop
        } else if err := msg.Ack(); err != nil {
            logger.Error(err, "failed to ack message", "source", sourceName)
        }
    },
        stan.DurableName(queueName),
        stan.SetManualAckMode(),
        stan.StartAt(pb.StartPosition_NewOnly),
        stan.AckWait(30*time.Second),
        stan.MaxInflight(20)); err != nil {
        return fmt.Errorf("failed to subscribe: %w", err)
}

When I shutdown my client, I close the subscription, close the connection. When I reconnect, I get 20 message delivered again, even though I've acked them. Do I need to perform some extra action to flush the acks?

kozlovic commented 3 years ago

Several things to keep in mind:

Are you saying that messages that have been ack'ed are redelivered, or simply that messages arrive with "redelivered" flag? If the latter, it is because of what I describe above. If the former, it means that acks did not make it to the server.

You may want to ack and then close sub from inside the message callback. This would ensure that you don't have parallel subscription close/acking of messages which would cause some to fail.

But know that with NATS Streaming there is no proper way to not get "redelivered" messages on subscription close because of the reasons I described above. When sending the ack and then closing, there is no way to prevent the server to send the next message(s) when receiving the ack. So this message would be marked as redelivered when your app restarts.

alexec commented 3 years ago

I think you're saying that each message has a "re-delivered" flag and I can inspect that?

kozlovic commented 3 years ago

No, I am asking do you know for sure that you have ack'ed message say 100 and it gets redelivered? Or that you see redelivered messages when you restart your application?

Because I am saying that say server has 10 messages for channel foo and sub has maxInflight of 1, it sends message 1 to the sub, the sub acks it and then close. As soon as server receives ack for message 1, it sends message 2, and then receives the subscription close. But message 2 is considered delivered, so may be marked as "redelivered" in the next run.

If you say that you ack'ed message 1 and closed the subscription but when you restart you receive message 1 again, then we need to figure out why the server did not receive/processed the ack. If that is the case, which versions (server and client) are you using? Is the server in standalone or clustered mode?

alexec commented 3 years ago

Ah.

If the subscription is closed while msg.Ack() are happening, it is possible that msg.Ack() will return error if the subscription is closed.

We log any ack errors, no errors are logged.

Are you saying that messages that have been ack'ed are redelivered, or simply that messages arrive with "redelivered" flag?

The former. We close the connection after acking the messages.

You may want to ack and then close sub from inside the message callback.

We always get several messages sent again, so even if one ack was concurrent with close, would we get one messages re-sent?

which versions (server and client) are you using?

Stand-alone: https://github.com/argoproj-labs/argo-dataflow/blob/main/config/stan-dev/single-server-stan.yml stan.io v0.8.3: https://github.com/argoproj-labs/argo-dataflow/blob/main/go.mod#L26

kozlovic commented 3 years ago

We always get several messages sent again, so even if one ack was concurrent with close, would we get one messages re-sent?

No, closing the connection from the "main thread" would cause any internal messages not even yet dispatched to be discarded (not dispatched).

To investigate further we need to:

I would start with the upgrade to see if this was an issue that has been fixed since then. Then if you need to proceed with tracing, be aware that running the server with -SDV will produce quite a bit of tracing, so if you have high volume, this may be an issue. In that case, try to reproduce the issue in a dev/qa environment.

kozlovic commented 1 year ago

Closing since there has been no update