nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.34k stars 677 forks source link

I often get the error nats: no heartbeat received #1622

Open VuongUranus opened 3 months ago

VuongUranus commented 3 months ago

Observed behavior

I often get the error no heartbeat received. Why do I get this error, where have I misconfigured? When I get this error it seems like my pull consumer has been deleted, I don't know the reason but it seems to be due to exceeding the time configured in the InactiveThreshold attribute configured in the consumer config.

func  ContinuousPolling(cons jetstream.Consumer, f func([]byte)) (jetstream.MessagesContext, error) {
    iter, err := cons.Messages()
    if err != nil {
        log.Error().Msgf("Error when get message from NATS: %v", err)
        return nil, err
    }
    go func() {
        for {
            msg, err := iter.Next()
            if err != nil {
                if errors.Is(err, jetstream.ErrMsgIteratorClosed) || errors.Is(err, jetstream.ErrConsumerDeleted) {
                    return
                }

                log.Error().Msgf("Error when consume message. Reson: %v",  err)
                continue
            }
            go f(msg.Data())
            msg.Ack()
        }
    }()

    return iter, nil
}
func setDefaultConsumerConfig(subjects []string, consumerName ...string) jetstream.ConsumerConfig {
    name := ""
    if len(consumerName) > 0 {
        name = consumerName[0]
    }

    return jetstream.ConsumerConfig{
        Name:              name,
        DeliverPolicy:     jetstream.DeliverNewPolicy,
        FilterSubjects:    subjects,
        AckPolicy:         jetstream.AckAllPolicy,
        Replicas:          1,
        InactiveThreshold: 1 * time.Minute,
    }
}

Expected behavior

Please explain to me and avoid this situation again.

Server and client version

nats-server version: 2.10.12

Host environment

No response

Steps to reproduce

No response

Jarema commented 3 months ago

Hey!

What we don't see here, is the context in which both provided functions are called. Could you please share it?

Inactive Threshold kicks in when there is noone listening for messages from a consumer (so, no Fetch or Messages active).

This means it can happen for example if there is a long pause between creating a consumer and consuming its messages, or longer than a minute app downtime (the app that is consuming).

Moving the issue to the nats.go repo, as it's probably a client side discussion unless we find any issue.

withinboredom commented 2 months ago

I believe this happens if you spend too long processing a message in the message loop. You need to take the message off the "queue" and return processing to the library asap (at least that is what I got when trying to debug this issue in my code).

Jarema commented 2 months ago

That should not be the case, especially in such a long inactive threshold. We will however try to replicate it by running a slow workload.

withinboredom commented 2 months ago

AFAICT, the issue appears to be that heartbeats are only processed when calling iter.Next() so you need to call it again asap.

piotrpio commented 2 months ago

This is not correct. The heartbeats are indeed processed in Next(), but the timer which triggers the error is only running in the context of each Next() execution. So each time you call iter.Next() the timer is reset and stopped when the method returns.

withinboredom commented 2 months ago

Indeed! Looking at the code, it looks like this error might be coming from networking issues as the heartbeat isn't paused/reset during reconnection:

https://github.com/nats-io/nats.go/blob/8894a274f4f5f00fd602f74cef462184e31d0256/jetstream/pull.go#L608-L630

withinboredom commented 2 months ago

I can confirm that the above PR appears to fix the issue -- at least, I haven't seen this message in awhile now.

withinboredom commented 2 months ago

@VuongUranus try adding this to your go.mod file and see if you are also not seeing the issue anymore?

replace github.com/nats-io/nats.go => github.com/withinboredom/nats.go patch-1
VuongUranus commented 1 month ago

@withinboredom i can't replace replace github.com/nats-io/nats.go => github.com/withinboredom/nats.go patch-1

piotrpio commented 1 month ago

@VuongUranus The issue mentioned by @withinboredom is fixed here: https://github.com/nats-io/nats.go/pull/1643, we'll be merging it soon.

However, I am still not certain that this is indeed your problem. It would be great if you could verify and if you still encounter the issue it would be very helpful if you could answer the questions from this comment: https://github.com/nats-io/nats.go/issues/1622#issuecomment-2068484682

VuongUranus commented 1 month ago

My message handling function is in a goroutine so it may not be due to the effects of processing messages for too long.

VuongUranus commented 1 month ago

@piotrpio I updated to version 1.36.0 but still get the error nats: no heartbeat received

svirmi commented 1 month ago

I have also "no heartbeat received" with 1.36.0 version. Code snippet:

for {

        select {

        case <-nats.Ctx.Done():
            return

        default:

            msg, err := consumer.Next()

            if errors.Is(err, jetstream.ErrMsgIteratorClosed) {
                logger.Error("consumer.Next()", slog.String("nats", err.Error()))
                continue

            } else if errors.Is(err, jetstream.ErrNoHeartbeat) {
                logger.Error("consumer.Next() :: jetstream.ErrNoHeartbeat", slog.String("symbol", symbol))
                continue

            } else if err != nil {
                logger.Error("consumer.Next() :: failed to fetch message", slog.String("nats", err.Error()))
                continue
            }

            // msg.Ack()

            err = json.Unmarshal(msg.Data(), &kline)

            if err != nil {
                logger.Error("error in json.Unmarshal(msg.Data(), &kline)", slog.String("nats", err.Error()))
                return
            }

            nats.KLine <- kline
        }
    }
Jarema commented 1 month ago

After you get that error, do you get it once, and then resume normal operation, or there are many consecutive heartbeat errors? After the error - can you check if the consumer is still there (with consumer info call in client or CLI?)

VuongUranus commented 1 month ago

This error occurs repeatedly on a consumer. When this error occurs, I tried searching for the consumer using CLI but could not find it. And I found that when I use NoAck: true for stream config and use AckNonePolicy for consumer config, this error is reduced.

Jarema commented 1 month ago

This error means that there is some issue with the Consumer, or JetStream. Sometimes client can recover, sometimes it can't (for example if it is deleted).

Ack should have nothing to do with it. Most probable reason is having consumer with inactivity threshold set, which will go away after given duration of client inactivity.

VuongUranus commented 1 month ago

I use Consume function to pull messages so why is the consumer deleted by inactivity threshold?

VuongUranus commented 1 month ago

I increased the value of InactiveThreshold but the error still occurs as usual. However using NoAck: true and AckPolicy: jetstream.AckNonePolicy actually reduces errors a lot.

kaverhovsky commented 2 weeks ago

@VuongUranus I have encountered this error. It was a consecutive set of nats: no heartbeat received just after nats-server have been shutted down. I couldn't grasp why my client wasn't able to reconnect.

The thing was that I missed ReconnectWait option in client configuration (not Jetstream, but Core Nats client). Because of that all reconnection attempts had been exhausted before nats-server have gone up again. That led to client connection close.

Maybe in your case there were some server failure and reconnection attempts exhaustion.