nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
15.92k stars 1.41k forks source link

Discard policy does not apply #4878

Closed tyler-eon closed 6 months ago

tyler-eon commented 11 months ago

Observed behavior

When the host running nats-server ran out of disk space, the durable message stream I was running stopped accepting new messages despite having a discard policy set to "old". In order to resolve this issue I had to explicitly set "max bytes" to a value that was less than the total amount of usable storage space on the file system. Only then did the stream correctly start discarding older messages to make room for new messages.

Expected behavior

I have a durable message stream that was created without specifying "max bytes" (i.e. it defaulted to -1). Additionally, I had a discard policy of "old". I expected that when the file system ran out of space that the oldest messages stored on disk would be discarded, as per the discard policy.

Server and client version

Server version 2.10.1 Client version 0.1.1

Host environment

Running on Kubernetes using the NACK operator.

Each node has 10GB of attached persistent disk storage.

The NATS pod shows it is running nats:2.10.1-alpine for the container image.

Steps to reproduce

If you want to reproduce the workaround, set "max bytes" after noticing that new messages are being discarded. After a few seconds, older messages will be pruned until "max bytes" has been reached, and then it will continue to properly discard older messages from that point forward.

The problem, to reiterate, is that you can't have unlimited "max bytes" or a value that accidentally exceeds the amount of usable space on your server, otherwise a file system error seems to be invoked which is not caught by NATS and ends up discarding incoming messages rather than respecting the configured discard policy.

tyler-eon commented 11 months ago

I double-checked the documentation to see if there was any mention about the need to specify these limits when using file-based storage but the only thing I could find was this:

It's recommended that you set specific limits though

I'm not sure if it's specifically because I'm using the interest-based retention policy, but I would still expect the discard policy to be effective regardless. Or at the very least I would expect the documentation to mention that not setting resource limits could cause errant behavior such as what I describe above.

ripienaar commented 11 months ago

With no limits set - max age, max bytes etc - you are saying never discard anything. Then it is correct to refuse new messages and not discard old ones - since you are telling us specifically to never discard anything.

What we should improve though is not accept discard "old" on a stream with no limits as that's just not a sensible setting - you cant both discard old and never discard anything.

derekcollison commented 11 months ago

DiscardOld and DiscardNew do not have a semantic effect if no limits are in place.

tyler-eon commented 11 months ago

With no limits set - max age, max bytes etc - you are saying never discard anything.

@ripienaar

Hmmm, I guess that's a matter of interpretation. To me, it doesn't mean never discard anything, it just means use "dynamic" limits, i.e. go as much as you can go until you can't anymore. Especially if you can set a discard policy.

If discard policies were disallowed without any limits being set, I think I'd be able to intuit that I would never see messages discarded without setting at least one resource limit. But right now being able to set "unlimited storage" and "discard old messages" feels more like "use as much storage as possible and then start discarding old messages when you run out of space." Which, again, I think makes this a matter of interpretation since there's no explicit mention of how these values effect each other.

Maybe we can change the label from defect to a feature request or something? I'm not sure if disallowing the discard policy is necessary, I'll let others decide on that, but I think at least a change in the documentation would be beneficial. At the very least, that the discard policy only works when resource limits are specified, just to make it obvious why you should prefer setting resource limits over leaving everything unbounded.

Also, I'm 99% certain that this is implicitly tied to my other issue regarding consumer interest. The interest policy is supposed to delete messages when all interested consumers ack it, but I'm having to use a workaround to ensure interest is registered on my consumer which is probably causing ack-confusion. I'm guessing not many people with an interest-based policy would ever actually run into this scenario. Still, it's definitely a real edge case that was extremely frustrating to debug without any significant documentation surrounding the interconnected-ness of these configuration values.

ripienaar commented 11 months ago

I would strongly disagree with discarding data if not given limits.

Imagine you have logs on the same partition as data. Your logs run full and then the server just deletes all your data on its own? Likewise a backup runs but there isn’t space for it or countless other reasons disks can go full, it would be extremely bad behaviour. No limits mean no limits :)

I kissed about interest issues, can you link a discussion about those?

tyler-eon commented 11 months ago

While it feels like NATS durable message stores are similar to logs, they really aren't. In some applications, it might be more important to receive new messages than it is to persist old messages. That's because NATS is a message queue, not a log store. If I was running Kafka and I had an unexpected surge in the number of messages I was pushing through it and I ran into storage limit issues, I 100% would prefer to delete old messages and receive new ones. Additionally, there are log storage solutions that specifically let you decide what to do if you reach max storage capacity and deleting older logs is definitely one such option, so it's not like it's unheard of even with regards to log storage.

And then if you consider other classifications of software systems, you can see this "dynamic limits" ideology is common. Take Kubernetes as an example: I could deploy something with no resource limits. If you try to deploy something else and you don't have room in your cluster (and can't auto-scale to have more space), Kubernetes doesn't just immediately tell you "no too bad"; it will looks for deployments that have gone beyond their requested resources and restart those in an effort to reasonably reclaim space so that new deployments can have a chance to succeed. And in the case of deployments without resource limits, it assumes a resource request value of zero, meaning 100% of the resources it is consuming is over the requested limit.

So again, we can disagree on this, I don't mind where NATS wants to land on the issue either way, but if you're not stating the position clearly in the documentation then it's a matter of interpretation. Do people see NATS as a real-time message broker? In such cases, they could reasonably expect to lose older messages in favor of new ones. Do people see NATS as a durable log storage for events? Then sure, maybe in those cases it's reasonable to expect failure on adding new messages to that "log". But the way in which NATS is being used absolutely would influence the assumptions about how these configuration options would work in conjunction with each other.

As for the interest thing, it's https://github.com/nats-io/nats-server/issues/4611

The TL;DR on that issue is:

For some very unknown reason, I needed to add a subscriber without a consumer group in order to generate active interest on a consumer configured to use a consumer group. I have a set of Elixir application instances that are using the consumer group to ensure each message is only delivered once among the group, and then I have a Python application that subs (without the use of a consumer group name) to the same subject and just does nothing. If I don't have the Python client connected, the stream/consumer assume no active interest and won't send anything along to the Elixir application instances.

At this point it's assumed that maybe the Elixir client is missing a step or formatting it's subscription command differently from other more supported clients, but I can't tell if that's the case. From what I can see, the Elixir client sends SUB <subject> along with any supported options (like consumer group name) and it will receive messages from the subscribed subjects, so it's definitely working when it subscribes itself. But when subscribing as part of a consumer group to a stream/consumer, that alone doesn't appear to generate active interest.

tyler-eon commented 11 months ago

I just tested this out using the NATS CLI and I think there's still a problem with the discard policy not applying even when you have a single client connected with active interest: if the consumer is configured with a set of subject filters, the stream will store every message regardless of those filters. Let me give an example:

I made a stream that accepts messages from the subject pattern test.*. Then I made a consumer which filters for subjects test.foo. If I send a mix of test.foo, test.bar, and test.baz, I can see the stream builds up messages to its file storage even though I'm receiving an acknowledging all incoming test.foo messages from the consumer subject. My assumption (and it's just that, I don't know if this is the case) is that test.bar and test.baz messages are being retained even though there's no interest for them.

The interest-based retention policy states the following:

Retention based on the consumer interest in the stream and messages. The base case is that there are zero consumers defined for a stream. If messages are published to the stream, they will be immediately deleted so there is no interest. This implies that consumers need to be bound to the stream ahead of messages being published to the stream. Once a given message is ack'ed by all consumers, the message is deleted.

This tells me a few things:

  1. If no consumers are bound to the stream, messages are automatically deleted. This means that no future consumers that are bound to the stream can ever receive those messages because they were automatically deleted since nothing was available to potentially receive those messages. This is very important to keep in mind.
  2. Every consumer bound to a stream must ack a message before it is deleted. If even one consumer fails to ack a message it'll stay until some other limit is reached and it gets deleted.

What I'm inferring from this test is: if you use subject filters in your consumer, the subjects you don't consume will continue to be stored in the stream. This is in direct opposition to the idea in point 1 above. If no consumers are bound to the stream, there's no storage of messages to accommodate future consumers. Similarly, I would expect that if no currently-bound consumers can ever receive a particular message (because of the subject filters) that the message would be deleted because there's the implicit idea that the stream does not attempt to store message for future consumers.

Now this could be called another case of interpretation but the implication is definitely leaning toward messages not being retained if there's no currently-bound consumer that can receive them. I would expect those messages to be deleted if the set of subject filters across consumers means that message will never reach a connected client.

EDIT: Looking over the documentation again, it states that the interest is in both the stream and the messages. So I would definitely expect that if a message has no interest among the consumers that it would not persist.

ripienaar commented 11 months ago
$ nats s add INTEREST --retention interest --subjects 'js.in.interest.>' --defaults
$ nats c add INTEREST ONE --filter 'js.in.interest.one' --pull --defaults

# we see the message is accepted
$ nats req js.in.interest.foo x 
17:58:32 Sending request on "js.in.interest.foo"
17:58:32 Received with rtt 1.937393ms
{"stream":"INTEREST", "domain":"hub", "seq":1}

# but not stored as it has no interest
$ nats s info INTEREST
...
State:

              Messages: 0

# we now put a message in where there is interest
$ nats req js.in.interest.one x
# and it is retained
$ nats s info INTEREST
...
State:

              Messages: 1

# we ack it
$  nats c next INTEREST ONE --ack
# it got removed
$ nats s info INTEREST
...
State:

              Messages: 0
tyler-eon commented 11 months ago

Yeah, I confirmed that if I match the subject filters on the consumer to those on the stream and then just ack any subjects I don't care to handle, all my retained messages disappear. So retention is definitely happening for messages that can never receive acknowledgement because there's no interest for them.

I'd be alright with this if the design was to prefer message storage over interest, but again the documentation (and the design) clearly states that messages do not persist if there are no consumers at the time a message is delivered. Presumably because there's no way for that message to be delivered to any clients in that particular moment. I would assume that same presumption would apply to messages that never make it to clients because of the combined subject filters.

ripienaar commented 11 months ago

As above, they are not retained if there are no interest.

So could be you are somehow triggering a bug a simple reproduction isnt triggering. So be good to get a clear set of steps to reproduce.

In my test above, its not keeping messages with no interest.

tyler-eon commented 11 months ago

That's super weird. Maybe that's still related to a non-consumer group client being connected. Let me see if I can manage to finagle something where I drop that hack of a client and maybe that will help clear things up...

tyler-eon commented 11 months ago

Ok, I think I'll close out the other ticket because upgrading from 1.6 to 1.7 of the elixir client seems to have maybe solved the issue (even though in the change log I can't see how that would be possible, but whatever I'll take the win). However, I'm still seeing a huge backlog of messages which is continuing to grow despite my consumer being up-to-date on incoming messages that it is filtering for.

$ nats s info
...
              Subjects: match.*
...
State:

              Messages: 1,334,982
                 Bytes: 4.7 GiB
        First Sequence: 1,507,990,599 @ 2023-12-13 16:07:25 UTC
         Last Sequence: 1,510,261,308 @ 2023-12-13 18:16:47 UTC
      Deleted Messages: 935,728
      Active Consumers: 1
    Number of Subjects: 3

$ nats c info
Configuration:
...
         Filter Subjects: match.one, match.two
...
State:

  Last Delivered Message: Consumer sequence: 17,457,949 Stream sequence: 1,510,304,403 Last delivery: 11ms ago
    Acknowledgment Floor: Consumer sequence: 17,457,949 Stream sequence: 1,510,304,403 Last Ack: 9ms ago
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 0
         Active Interest: Active using Queue Group group-one

The stream info shows 3 subjects when the consumer is only filtering on 2. Could that be the issue here?

ripienaar commented 11 months ago

Show nats s subjects for the stream

tyler-eon commented 11 months ago

Ah, I was not aware I could do that.

~ # nats s subjects
[default] ? Select a Stream JOBS

╭────────────────────────────────────────────╮
│         4 Subjects in stream JOBS          │
├─────────────┬───────┬────────────┬─────────┤
│ Subject     │ Count │ Subject    │ Count   │
├─────────────┼───────┼────────────┼─────────┤
│ match.one   │ 1     │ match.four │ 643,489 │
│ match.three │ 2,457 │ match.five │ 688,237 │
╰─────────────┴───────┴────────────┴─────────╯
ripienaar commented 11 months ago

So that seems like a bug - we have had many interest stream bugs in the past, you're on latest server?

tyler-eon commented 11 months ago

Yeah, 2.10.1.

ripienaar commented 11 months ago

So not the latest

tyler-eon commented 11 months ago

Oh, I tried using NACK the NATS Helm chart to make sure it was up-to-date and it didn't change anything. Maybe I did something wrong...

tyler-eon commented 11 months ago

Ok, that seems to have done it. 2.10.7. Still seeing the same problem though. Numbers for the subjects which aren't in the consumer filter are still rising steadily and not being deleted.

ripienaar commented 11 months ago

OK, so that does seem like a bug - whch I cant exactly repro here - so if you have some guidance or ability to trigger this that would be useful.

tyler-eon commented 11 months ago

Let me try a few more tests on my end and see if any of them are reliable with the repro.

tyler-eon commented 11 months ago

I haven't tried this just yet as I'm wrapping up something else at the moment, but I just had a thought: maybe it's a difference between using * and > wildcards? I'll have to try that out later and see if that makes a difference.

tyler-eon commented 7 months ago

@ripienaar

After running an obscene number of tests, where I basically started with your example and worked my way one config change at a time to what I had running where I see the issue, I think I figured out why this appeared to be an issue.

In the very beginning, I had my consumer without any filter, and I would just ACK messages I didn't care about in my code. But there was minor performance implications from this, as my code would have to expend resources to accept the incoming message, parse the message body, and then ultimately decide it didn't care about it because of the subject it was sent on. And so I changed to using subject filters on the consumer to ensure I would only be sent messages I actually cared to process in my application.

While this does make sense in retrospect, I guess I assumed that eventually all of those "old" messages for subjects I didn't care about would just go away, but they didn't because my application was extremely efficient at working through the other messages as they came in, meaning there is rarely a need for purging old messages.

What does not make sense, is that even after changing the consumer to filter by more specific subjects, the other subjects I previously wanted but no longer was receiving would still get retained by the stream. It's like the stream did not recognize that those subjects would no longer be deliverable to any consumers.

Here's my test which managed to finally replicate the behavior I am still seeing even today:

$ nats s info INTEREST
Information for Stream INTEREST created 2024-03-27 14:02:46

              Subjects: js.in.interest.*, js.in.other.interest, js.in.never.interest
              Replicas: 1
               Storage: File

Options:

             Retention: Interest
       Acknowledgments: true
        Discard Policy: Old
      Duplicate Window: 2m0s
            Direct Get: true
     Allows Msg Delete: true
          Allows Purge: true
        Allows Rollups: false

Limits:

      Maximum Messages: unlimited
   Maximum Per Subject: unlimited
         Maximum Bytes: unlimited
           Maximum Age: unlimited
  Maximum Message Size: unlimited
     Maximum Consumers: unlimited

State:

              Messages: 0
                 Bytes: 0 B
        First Sequence: 14
         Last Sequence: 13 @ 2024-03-27 15:07:42 UTC
      Active Consumers: 1

$ nats c info INTEREST ONE
Information for Consumer INTEREST > ONE created 2024-03-27T14:02:58-07:00

Configuration:

                    Name: ONE
        Delivery Subject: consumer.js.in.interest.one
         Filter Subjects: js.in.interest.*, js.in.other.interest
          Deliver Policy: All
     Deliver Queue Group: group-one
              Ack Policy: Explicit
                Ack Wait: 30.00s
           Replay Policy: Instant
         Max Ack Pending: 1,000
            Flow Control: false

State:

  Last Delivered Message: Consumer sequence: 12 Stream sequence: 13 Last delivery: 15m3s ago
    Acknowledgment Floor: Consumer sequence: 12 Stream sequence: 13 Last Ack: 14m58s ago
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 0
         Active Interest: Active using Queue Group group-one

The stream is like in the example and the consumer is almost exactly the same with one key difference: the filter subject is catching js.in.interest.*.

Now I send four messages to my application and wait for them to be received and acknowledged.

$ nats req js.in.interest.one x
$ nats req js.in.interest.two x
$ nats req js.in.interest.three x
$ nats req js.in.other.interest x
$ nats s info INTEREST
Information for Stream INTEREST created 2024-03-27 14:02:46

              Subjects: js.in.interest.*, js.in.other.interest, js.in.never.interest
              Replicas: 1
               Storage: File

Options:

             Retention: Interest
       Acknowledgments: true
        Discard Policy: Old
      Duplicate Window: 2m0s
            Direct Get: true
     Allows Msg Delete: true
          Allows Purge: true
        Allows Rollups: false

Limits:

      Maximum Messages: unlimited
   Maximum Per Subject: unlimited
         Maximum Bytes: unlimited
           Maximum Age: unlimited
  Maximum Message Size: unlimited
     Maximum Consumers: unlimited

State:

              Messages: 4
                 Bytes: 200 B
        First Sequence: 14 @ 2024-03-27 15:23:34 UTC
         Last Sequence: 17 @ 2024-03-27 15:23:35 UTC
      Active Consumers: 1
    Number of Subjects: 4

My test application sleeps for a couple seconds after receiving a message before acknowledging it, just to ensure I can see how many messages make it to the stream before they get ACK'd. So after waiting a few seconds I check the stream again.

$ nats s info INTEREST
Information for Stream INTEREST created 2024-03-27 14:02:46

              Subjects: js.in.interest.*, js.in.other.interest, js.in.never.interest
              Replicas: 1
               Storage: File

Options:

             Retention: Interest
       Acknowledgments: true
        Discard Policy: Old
      Duplicate Window: 2m0s
            Direct Get: true
     Allows Msg Delete: true
          Allows Purge: true
        Allows Rollups: false

Limits:

      Maximum Messages: unlimited
   Maximum Per Subject: unlimited
         Maximum Bytes: unlimited
           Maximum Age: unlimited
  Maximum Message Size: unlimited
     Maximum Consumers: unlimited

State:

              Messages: 0
                 Bytes: 0 B
        First Sequence: 18
         Last Sequence: 17 @ 2024-03-27 15:23:35 UTC
      Active Consumers: 1

So far everything is working as expected. We see that all four messages are persisted in the stream initially, eventually getting acknowledged and removed. Perfect.

Now we edit the consumer to only accept a subset of js.in.interest.* subjects, specifically js.in.interest.one.

$ nats c edit INTEREST ONE --filter 'js.in.interest.one' --filter 'js.in.other.interest'
Information for Consumer INTEREST > ONE created 2024-03-27T14:02:58-07:00

Configuration:

                    Name: ONE
        Delivery Subject: consumer.js.in.interest.one
         Filter Subjects: js.in.other.interest, js.in.interest.one
          Deliver Policy: All
     Deliver Queue Group: group-one
              Ack Policy: Explicit
                Ack Wait: 30.00s
           Replay Policy: Instant
         Max Ack Pending: 1,000
            Flow Control: false

State:

  Last Delivered Message: Consumer sequence: 12 Stream sequence: 13 Last delivery: 15m3s ago
    Acknowledgment Floor: Consumer sequence: 12 Stream sequence: 13 Last Ack: 14m58s ago
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 0
         Active Interest: Active using Queue Group group-one

Now we send the same four messages we sent last time plus one additional message on a subject that we have never filtered on with any consumers. Only js.in.interest.one and js.in.other.interest should be committed to the stream, the others - especially js.in.never.interest - should all be discarded immediately.

$ nats req js.in.interest.one x
$ nats req js.in.interest.two x
$ nats req js.in.interest.three x
$ nats req js.in.other.interest x
$ nats req js.in.never.interest x
$ nats s info INTEREST
Information for Stream INTEREST created 2024-03-27 14:02:46

              Subjects: js.in.interest.*, js.in.other.interest, js.in.never.interest
              Replicas: 1
               Storage: File

Options:

             Retention: Interest
       Acknowledgments: true
        Discard Policy: Old
      Duplicate Window: 2m0s
            Direct Get: true
     Allows Msg Delete: true
          Allows Purge: true
        Allows Rollups: false

Limits:

      Maximum Messages: unlimited
   Maximum Per Subject: unlimited
         Maximum Bytes: unlimited
           Maximum Age: unlimited
  Maximum Message Size: unlimited
     Maximum Consumers: unlimited

State:

              Messages: 5
                 Bytes: 251 B
        First Sequence: 18 @ 2024-03-27 15:23:55 UTC
         Last Sequence: 22 @ 2024-03-27 15:24:50 UTC
      Active Consumers: 1
    Number of Subjects: 5

Whoa! All 5 messages ended up in the stream, none were discarded. While it might be reasonable to expect this could have messed up on only messages using the subject pattern js.in.interest.*, it's surprising to see that js.in.never.interest is also being sent to the stream when it would not have prior to the config change.

I wait a few seconds for the messages that are being sent to the consumer to be acknowledged and check to see if those extraneous messages are still persisted within the stream.

nats s info INTEREST
Information for Stream INTEREST created 2024-03-27 14:02:46

              Subjects: js.in.interest.*, js.in.other.interest, js.in.never.interest
              Replicas: 1
               Storage: File

Options:

             Retention: Interest
       Acknowledgments: true
        Discard Policy: Old
      Duplicate Window: 2m0s
            Direct Get: true
     Allows Msg Delete: true
          Allows Purge: true
        Allows Rollups: false

Limits:

      Maximum Messages: unlimited
   Maximum Per Subject: unlimited
         Maximum Bytes: unlimited
           Maximum Age: unlimited
  Maximum Message Size: unlimited
     Maximum Consumers: unlimited

State:

              Messages: 3
                 Bytes: 151 B
        First Sequence: 18 @ 2024-03-27 15:23:55 UTC
         Last Sequence: 22 @ 2024-03-27 15:24:50 UTC
      Deleted Messages: 2
      Active Consumers: 1
    Number of Subjects: 3

$ nats s subjects INTEREST
╭─────────────────────────────────────────────────────────────╮
│                3 Subjects in stream INTEREST                │
├──────────────────────┬───────┬──────────────────────┬───────┤
│ Subject              │ Count │ Subject              │ Count │
├──────────────────────┼───────┼──────────────────────┼───────┤
│ js.in.interest.three │ 1     │ js.in.never.interest │ 1     │
│ js.in.interest.two   │ 1     │                      │       │
╰──────────────────────┴───────┴──────────────────────┴───────╯

Yup, they are still there. The two messages matching our consumer filters were acknowledged and removed but the remaining messages were not.

"Downgrading" the consumer filter messed up the behavior of the stream when deciding which messages to discard based on interest. It's as though this particular configuration flow sets the stream to permanently retain all messages that it receives, regardless of the state of its consumers.

I tried to nats c rm the consumer and add a new one with the "correct" configuration, and that didn't work. Once the stream itself is in this state there is no recovery without re-creating the stream, which I'd rather avoid doing because I don't want to risk losing messages during a stream change.

On a related note, NACK (managing NATS resources via Kubernetes manifests) does not seem to actually update certain fields, such a the subject filters for a consumer; I have to manually use nats c edit on my consumers to do that.

EDIT: Breaking up the huge code block into segments with better explanations separating them.

tyler-eon commented 7 months ago

I think it's safe to say that the issue is not "Discord policy does not apply" and rather "changing a consumer subject filter configuration does not reflect properly in the associated stream", or something to that effect. Let me know if you want me to change the title of this issue or feel free to change it yourself.

As a side note, I did update to version 2.10.12 before trying this, just to see if maybe my issue was resolved in the latest version, and clearly it is still a problem.

ripienaar commented 7 months ago

This is great thanks @tyler-eon

I can confirm this with some CLI testing but also with a unit test

func TestInterestConsumerFilterEdit(t *testing.T) {
    s := RunBasicJetStreamServer(t)
    defer s.Shutdown()

    // Client for API requests.
    nc, js := jsClientConnect(t, s)
    defer nc.Close()

    _, err := js.AddStream(&nats.StreamConfig{
        Name:      "INTEREST",
        Retention: nats.InterestPolicy,
        Subjects:  []string{"interest.>"},
    })
    require_NoError(t, err)

    _, err = js.AddConsumer("INTEREST", &nats.ConsumerConfig{
        Durable:       "C0",
        FilterSubject: "interest.>",
        AckPolicy:     nats.AckExplicitPolicy,
    })
    require_NoError(t, err)

    for i := 0; i < 10; i++ {
        _, err = js.Publish(fmt.Sprintf("interest.%d", i), []byte(strconv.Itoa(i)))
        require_NoError(t, err)
    }

    // we check we got 10 messages then pull one, ack it and check we have 9 - so basic interest works
    nfo, err := js.StreamInfo("INTEREST")
    require_NoError(t, err)
    if nfo.State.Msgs != 10 {
        t.Fatalf("expected 10 messages got %d", nfo.State.Msgs)
    }

    sub, err := js.PullSubscribe("interest.>", "C0", nats.Bind("INTEREST", "C0"))
    require_NoError(t, err)

    msgs, err := sub.Fetch(1)
    require_NoError(t, err)
    if len(msgs) != 1 {
        t.Fatalf("did not get enough messages")
    }
    err = msgs[0].AckSync()
    require_NoError(t, err)

    nfo, err = js.StreamInfo("INTEREST")
    require_NoError(t, err)
    if nfo.State.Msgs != 9 {
        t.Fatalf("expected 9 messages got %d", nfo.State.Msgs)
    }

    // now we lower the consumer interest from all subjects to 1, then check the stream state and check if interest behaviours still work
    _, err = js.UpdateConsumer("INTEREST", &nats.ConsumerConfig{
        Durable:       "C0",
        FilterSubject: "interest.1",
        AckPolicy:     nats.AckExplicitPolicy,
    })
    require_NoError(t, err)

    msgs, err = sub.Fetch(1)
    require_NoError(t, err)
    if len(msgs) != 1 {
        t.Fatalf("did not get enough messages")
    }
    if msgs[0].Subject != "interest.1" {
        t.Fatalf("expected interest.1 message got: %v", msgs[0].Subject)
    }
    err = msgs[0].AckSync()
    require_NoError(t, err)

    // we should now have no messages, we acked the only message on ...1 and the rest have no interest
    nfo, err = js.StreamInfo("INTEREST", &nats.StreamInfoRequest{
        SubjectsFilter: ">",
    })
    require_NoError(t, err)
    if nfo.State.Msgs != 0 {
        // here we will fail as things stand with messages in subjects that now has no interest
        //
        // if instead of failing we published into the subjects with no interest those new messages
        // will be dropped correctly but old messages in those subjects will remain
        t.Fatalf("expected 0 messages got messages: %#v", nfo.State.Subjects)
    }
}
tyler-eon commented 7 months ago

Wonderful, I'm glad to hear you could repro the issue (well, glad I'm not crazy, not glad there's a bug haha). Thanks for the patience while I drove myself mad trying to figure out what the problem could be.

derekcollison commented 7 months ago

Thanks @ripienaar. @neilalexander could you take a look?

tyler-eon commented 6 months ago

Sorry to be pedantic, but can we at least change the labels on this so to the casual observer it doesn't look like a stale issue?

Jarema commented 6 months ago

Yes, @tyler-eon it seems the issue is still there. Removing the stale flag. We will revisit this one.