nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.5k stars 692 forks source link

jetStreamContext.PublishMsg memory consumption grow over time if nats server fail or restart #1593

Open tvojacek opened 6 months ago

tvojacek commented 6 months ago

Observed behavior

Nats client costume all available memory >4GB RAM is nats server fail.

Code simply publish messages into JetStream Input is 30messages/sec approx 100-2000bytes long

func connect(){
        natsConnection, err := nats.Connect("127.0.0.1", opts, nats.MaxReconnects(-1),nats.RetryOnFailedConnect(true))
    if err != nil {
        log.Err(err).Str("uri", config.NatsUri).Msg("failed to start connector")
        panic()
    }
        jetStreamContext, err = jetstream.New(natsConnection)
        if err != nil {
            panic()
        }

}
func publishChan(){
 for{ 
 msg:=<-inputChan
  tryPublishMsg(msg)
 }
}
func  tryPublishMsg(msg *nats.Msg) {
        for {
    ctx, cancel := context.WithTimeout(context.Background(), publisher.publishTimeout)
    defer cancel()
    pubAck, err := jetStreamContext.PublishMsg(ctx, msg, jetstream.WithExpectStream("MYDATA"))
    if err == nil {
        if pubAck.Duplicate {
            prometheus.AppStats.Publisher.StreamDuplicates.Inc()
        }
        return
    }else{
        switch {
    case errors.Is(err, nats.ErrConnectionReconnecting):
        fallthrough
    case errors.Is(err, nats.ErrConnectionDraining):
        fallthrough
    case errors.Is(err, context.DeadlineExceeded):
        fallthrough
    case errors.Is(err, nats.ErrReconnectBufExceeded):
        fallthrough
    case errors.Is(err, nats.ErrConnectionClosed):
                log.Info().Msg("retrying")
        continue
    default:
            log.Info().Err(err).Msg("unrecoverable error giving up")
        return
      }
        }
}

Expected behavior

Memory consumption of client will be limited. Normall memory footprint of whole application is 5-30MB.

Server and client version

nats.go 1.33.1 tested varios, versions nats server nats server 2.10.9,2.10.11,2.10.12,2.10.4,2.11.1-preview1

Host environment

Developer machine macOS 14.4.0 application running natively nats running inside of docker container with mounted volume

Steps to reproduce

start nats server inside docker, start consumer application start application with publish client. Wait several hours, Nats server become overloaded (bug or setup problem) JetStream get corrupted or unresponsive. nats-1 | [1] 2024/03/21 12:48:03.248044 [WRN] Internal subscription on "$JS.API.CONSUMER.INFO.MYDATA.FORWARDER" took too long: 5.000799336s nats-1 | [1] 2024/03/21 12:48:03.248082 [WRN] Internal subscription on "$JS.API.CONSUMER.INFO.MYDATA.FORWARDER" took too long: 5.310615294s nats-1 | [1] 2024/03/21 12:48:03.845698 [WRN] 172.18.0.4:46162 - cid:17 - Readloop processing time: 14.906449381s

Nats client start to cache internally messages that it fail to deliver to server. profile002

piotrpio commented 6 months ago

Hello @TomasVojacek , thanks for reporting the issue, I'll be looking at this.