nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.53k stars 696 forks source link

JetStream consumer does not resume receiving messages after successful reconnect #1729

Open njkleiner opened 2 weeks ago

njkleiner commented 2 weeks ago

Observed behavior

I am experiencing a bug where a JetStream consumer will sometimes become "stuck" after a reconnect.

Concretely, a consumer will sometimes not resume receiving messages and instead reach a state where it permanently throws "no heartbeat" errors after a successful reconnect to the server.

Expected behavior

The JetStream consumer should resume receiving messages after a successful reconnect (which it sometimes does).

Server and client version

I am using the main Branch of nats.go as of commit c7cf3452dd6359bdf40cbad0c39d900cbeba81e2.

Host environment

I am running these tests using Go 1.23.2 (darwin/arm64), with the asynctimerchan=1 GODEBUG setting enabled.

I am using the Consumer.Consume method and the following ConsumerConfig

jetstream.ConsumerConfig{
    Durable: "",

    AckPolicy:     jetstream.AckExplicitPolicy,
    DeliverPolicy: jetstream.DeliverAllPolicy,
}

Steps to reproduce

After a lot of manual debugging, I believe the issue is a race condition in the core NATS code, where a call to Subscription.pCond.Wait will block forever, because no subsequent call to pCond.Signal or pCond.Broadcast ever occurs in spite of the interrupted connection having been successfully reconnected.

I have attached two example logs each that demonstrate a "stuck" consumer and a consumer that is not "stuck" respectively. See the attached patch for the context w.r.t. the debug messages in these logs.

stuck2.log notstuck2.log stuck.log notstuck.log

0001-add-debug-messages.patch

piotrpio commented 1 week ago

Hello @njkleiner, thanks for creating the issue. I'll look at this, but in the meantime could you please check if your consumer is still there after the reconnect (at the point at which Consume is stuck)? You're using an ephemeral consumer (Durable is empty) and you don't explicitly set InactiveThreshold in your consumer config, so the server uses the default, which is 5 seconds. So if the client is disconnected and does not send pull requests for al least 5 seconds, thew consumer will be automatically deleted.

I'm obviously not saying that's the case and I'll be looking at this regardless, but this is a pretty common case so would be nice if you could check.

njkleiner commented 1 week ago

Sure, I can take a look.

But if this turns out to be the case, I would still argue that the client should not continue blocking indefinitely -- upon a successful reconnect -- when a consumer is deleted server side (and instead return an error immediately, on reconnect).

piotrpio commented 1 week ago

@njkleiner I agree with you, but there is not much we can do to make it better. We do not have a way of knowing the consumer has been deleted unless it happened during an active pull request, which results in us getting a Consumer Deleted status. But in case of reconnect we have no way of knowing unless we ask for consumer info on each reconnect, which is quite a heavy operation.

derekcollison commented 1 week ago

If the consumer does heartbeats the client will detect it is gone eventually.

njkleiner commented 1 week ago

I haven't had time to take a detailed look yet, but it appears that at least both of the example logs I have provided where the consumer gets "stuck" represent cases where there has been a disconnect of at least five seconds.

So you might be right about the consumer being deleted on the server side @piotrpio, I will investigate further next week.

@derekcollison I am not entirely sure how to interpret your comment. As I have originally stated, a "stuck" consumer eventually enters an indefinite state of heartbeat errors.

So the client detected it to be "gone" in that sense, but I have not treated these errors as unrecoverable so far -- my logic was that, if I assume an unstable connection where reconnects may occur, heartbeat timeouts are expected as well, and that successful reconnects would imply that heartbeat timeouts may, in principle, be recovered eventually as well.

Are you saying that indefinite heartbeat errors as described are indicative of a consumer that was deleted server side? And, if so, is there a way to distinguish these heartbeat errors from other (recoverable) heartbeat errors on the client side?

derekcollison commented 1 week ago

I think if the system can delete consumers out from underneath of apps, then the heartbeat should be required and iff the heartbeat fails, do a consumer info to determine if the consumer still exists, and if not take appropriate action to resolve.

Jarema commented 1 week ago

Let's clear out some points.

1. Missing Heartbeat:

A missing heartbeat indicates that the client did not receive a heartbeat message from the server-side consumer in time. This can be due to several factors, including:

• Temporary unavailability of JetStream • Network issues • Underlying infrastructure restarts or problems

While a few missed heartbeats may be transient and recoverable, persistent issues over a longer period suggest it’s worth checking what’s happening with the consumer. There’s no definitive threshold for how many missed heartbeats are too many or whether this should be treated as a terminal error—it largely depends on your specific architecture, topology, and infrastructure.

2. Handling Terminal Errors:

We initially treated some errors as terminal but realized we cannot always determine if an error is truly terminal. In many scenarios, consumers are managed separately from where messages are consumed; systems managing streams and consumers might independently create or delete them, and that should not force action by consuming apps. Because of this, we currently believe that even a “Consumer Deleted” error should not be considered terminal. Making errors terminal forces users to take immediate action. By treating errors as non-terminal, we provide users with all available information and options.