nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
15.56k stars 1.39k forks source link

Consumer not receiving messages when power off and restart #4566

Closed shivabohemian closed 9 months ago

shivabohemian commented 1 year ago

What version were you using?

nats-server: v2.9.15 (the same in v2.10.0) client: v1.29.0

What environment was the server running in?

uname -a
Linux xxxx 5.10.110 #55 SMP Thu Aug 24 05:28:09 PDT 2023 aarch64 GNU/Linux

Is this defect reproducible?

First, I would like to explain my background. I have deployed nats-server and my application on a Debian server. My application creates a persistent jetstream and several corresponding consumers (using AddStream and AddConsumer) every time it starts up. Then, it subscribes using nats.BindStream. Under normal circumstances, I can receive messages and consume them. However, the following situations can lead to issues and prevent message consumption.

1. I perform a power shutdown and reboot of my linux server.
2. I proceed to carry out business operations to send NATS js messages.

Result: I can see the logs of the messages I sent in the 'nats-server -DV' logs, similar to this: [796] 2023/09/20 12:07:07.947632 [TRC] 127.0.0.1:36830 - cid:21 - <<- MSG_PAYLOAD: [{xxxxx}] In my application logs, when using js.Publish to send messages, I print the returned PubAck, which outputs the following information: [DEBU] pub ack: &{Stream:xxxx Sequence:1 Duplicate:false Domain:}

The above information indicates that the message has been sent to the NATS server. At the same time, I can see an increase in the number of messages in the jsz monitoring on port 8222. However, my consumer cannot consume the messages. After restarting the NATS server, the message count in jsz monitoring becomes 0 again, suggesting that they might have been deleted.

(Note: In version v2.9.15, it seems that only the first few messages cannot be consumed initially, and the subsequent messages automatically recover consumption. However, in version v2.10.0, none of the later messages can be consumed unless the stream is deleted and recreated.)

Additionally, I'd like to post my subscription code. Perhaps the key point is that multiple consumers are created concurrently.

streamName := "TASK_STREAM"
config := &nats.StreamConfig{
Name:      streamName,
Retention: nats.InterestPolicy,
Subjects:  []string{“task.>”},
}
_, err := n.js.AddStream(config)
if err != nil {
log.Logger.Warnf("InitTaskConsumer add stream error: %v, will retry", err)
_ = n.js.DeleteStream(streamName)
_, _ = n.js.AddStream(config)
}

go func(){
  consumerName := "taskConsumer"
  filterSubject := "task.1"
  consumerCfg := &nats.ConsumerConfig{
      Durable:           consumerName,
      FilterSubject:    filterSubject,
      AckPolicy:         nats.AckExplicitPolicy,
      AckWait:           1 * time.Minute,
      MaxDeliver:        0,
      MaxAckPending:     50,
      MaxRequestBatch:   1,
      MaxRequestExpires: 30 * time.Second,
  }
  var err1 error
  _, err1 = n.js.AddConsumer(streamName, consumerCfg)
  if err1 != nil {
      log.Logger.Warnf("InitTaskConsumer add consumer error: %v, will retry", err1)
      _ = n.js.DeleteConsumer(streamName, consumerName)
      _, _ = n.js.AddConsumer(streamName, consumerCfg)
  }
  sub, err1 := n.js.PullSubscribe(filterSubject, consumerName, nats.BindStream(streamName))
  for {
      msgs, _ := sub.Fetch(1)
      if len(msgs) == 0 {
          continue
      }
          xxxxxxx
  }
}

go func(){
  // other consumer
  consumerName := "taskConsumer2"
  filterSubject := "task.2"
  // the same code...
}

Given the capability you are leveraging, describe your expectation?

Messages should be consumed normally.

Given the expectation, what is the defect you are observing?

As mentioned above, the messages have entered the NATS server but cannot be consumed.

neilalexander commented 1 year ago

Need more information here about your setup. Is the JetStream store_dir definitely in a persistent location, i.e. it's not being deleted when the system restarts?

When NATS Server starts up, it should show logging messages about the assets it is recovering from disk. If none of those are logged then your store_dir is not persistent. i.e.

[9627] 2023/09/20 11:20:25.067692 [INF]   Starting restore for stream '$G > foo'
[9627] 2023/09/20 11:20:25.069232 [INF]   Restored 1 messages for stream '$G > foo' in 2ms

Another thing to note is that if you are using InterestPolicy and there are no consumers on the stream yet, then any messages that are sent to the stream will be deleted as there are no consumers "interested" in the message.

Finally, you may want to revisit that AddStream error check, as if your first AddStream call fails for whatever reason then you will indiscriminately delete and recreate the stream, which will delete all messages with it.

shivabohemian commented 1 year ago

Thanks for your replay~

My nats-server.conf:

# Client port of 4222 on all interfaces
port: 4222

# HTTP monitoring port
monitor_port: 8222

jetstream {
    store_dir: /userdata/nats
    max_mem: 100M
    max_file: 500M
}

In the /userdata/nats directory, the corresponding jetstream/$G/streams/TASK_STREAM directory has also been generated, and it contains persistent files. Below is the startup log for NATS, which indicates the use of persistence.

[5875] 2023/09/20 19:57:49.187132 [INF]   Starting restore for stream '$G > TASK_STREAM'
[5875] 2023/09/20 19:57:49.190313 [INF]   Restored 2 messages for stream '$G > TASK_STREAM' in 3ms

But that's not the issue here, as I faced the problem of newly sent messages not being consumed after restarting the nats-server and the application. And I encountered this issue once again. After a power outage and reboot, the message was successfully sent but not consumed. Here is the content of the message.

[856] 2023/09/20 19:51:55.719554 [TRC] 127.0.0.1:59470 - cid:5 - <<- [PUB task.1 _INBOX.qNAn2Aqb9y9RBBRS5xIcDA.aTqjJpHP 191]
[856] 2023/09/20 19:51:55.719923 [TRC] 127.0.0.1:59470 - cid:5 - <<- MSG_PAYLOAD: ["{xxxxxxxx}"]
[856] 2023/09/20 19:51:55.721415 [TRC] 127.0.0.1:59470 - cid:5 - ->> [MSG _INBOX.qNAn2Aqb9y9RBBRS5xIcDA.aTqjJpHP 1 37]
[856] 2023/09/20 19:52:00.254858 [TRC] 127.0.0.1:59470 - cid:5 - ->> [HMSG _INBOX.qNAn2Aqb9y9RBBRS5xIcII.qNAn2Aqb9y9RBBRS5xIdG8 3 81 81]
[856] 2023/09/20 19:52:00.257562 [TRC] 127.0.0.1:59470 - cid:5 - ->> [HMSG _INBOX.qNAn2Aqb9y9RBBRS5xIcEs.qNAn2Aqb9y9RBBRS5xIdHq 2 81 81]

!!!!!! Fortunately, I was able to resolve the issue by using js.DeleteConsumer followed by js.AddConsumer each time I started the application. I'm not sure what happened, perhaps there was some confusion with the consumers?

As you mentioned, there's a new issue here. When I use InterestPolicy, does the js.DeleteConsumer operation remove messages that have been persisted but not yet consumed?

Jarema commented 9 months ago

@shivabohemian Did they issue happen again in recent version of the server?

shivabohemian commented 9 months ago

@shivabohemian Did they issue happen again in recent version of the server?

It's ok in latest v2.10.6 version. By the way, does js.AddConsumer need to be called every time the application restarts? And would it fail if there were modifications in the consumer configuration? I'm using InterestPolicy, and when I call js.DeleteConsumer before each application restart, any existing unconsumed messages are deleted.

derekcollison commented 9 months ago

Depends, if the consumer is backed by filestore (by default inherits from parent stream) and has not passed an inactive threshold while the server was down, the consumer will persist.

shivabohemian commented 9 months ago

Got it, thank you! I've tested it, and using js.DeleteConsumer indeed deletes messages. I should use js.UpdateConsumer to update the consumer.

dimuska139 commented 3 months ago

Is it the same? https://github.com/nats-io/nats-server/issues/4736