nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
15.84k stars 1.4k forks source link

Interest policy: nats server won't delete acked messages if consumers are not keeping up #2873

Closed lukasGemela closed 2 years ago

lukasGemela commented 2 years ago

Defect

when using Interest policy, nats server won't delete acked messages if consumers are not keeping up

Make sure that these boxes are checked before submitting your issue -- thank you!

Versions of nats-server and affected client libraries used:

nats-server 2.7.2 python nats client 2.0.0

OS/Container environment:

docker container nats 2.7.2 alpine

Steps or code to reproduce the issue:

1) create jetstream with following config

Configuration:

             Subjects: messages.*
     Acknowledgements: true
            Retention: File - Interest
             Replicas: 1
       Discard Policy: New
     Duplicate Window: 2m0s
    Allows Msg Delete: true
         Allows Purge: true
       Allows Rollups: false
     Maximum Messages: unlimited
        Maximum Bytes: unlimited
          Maximum Age: unlimited
 Maximum Message Size: unlimited
    Maximum Consumers: unlimited
  1. create two polling consumers, one for messages.msg1 and another for messages.msg2
Configuration:

        Durable Name: MSG_XXX_CONSUMER
           Pull Mode: true
      Filter Subject: messages.message1
      Deliver Policy: All
          Ack Policy: Explicit
            Ack Wait: 30s
       Replay Policy: Instant
     Max Ack Pending: 20,000
   Max Waiting Pulls: 512
  1. consumers are slower than inbound producer but ack messages normally within configured timeframe
  2. messages get stacked up inside nats server filesystem, waiting for consumer to pick them up
  3. stop producer

Expected result:

messages should be eventually all consumed, acked and removed from nats file system

Actual result:

messages are all consumed, acked, but stay on nats file system. when we delete durable consumers from server and reregister consumers without producing any messages on stream, messages eventually disappear from nats server

Screenshot 2022-02-16 at 13 21 48

aricart commented 2 years ago

@lukasGemela can you output the status of the stream? (nats stream info)

lukasGemela commented 2 years ago
Configuration:

             Subjects: kpis.*
     Acknowledgements: true
            Retention: File - Interest
             Replicas: 1
       Discard Policy: New
     Duplicate Window: 2m0s
    Allows Msg Delete: true
         Allows Purge: true
       Allows Rollups: false
     Maximum Messages: unlimited
        Maximum Bytes: unlimited
          Maximum Age: unlimited
 Maximum Message Size: unlimited
    Maximum Consumers: unlimited

Cluster Information:

                 Name: nats
               Leader: nats-2

State:

             Messages: 17,385
                Bytes: 31 MiB
             FirstSeq: 3,708,310 @ 2022-02-16T09:37:25 UTC
              LastSeq: 4,078,085 @ 2022-02-16T14:46:59 UTC
     Deleted Messages: 352391
     Active Consumers: 3
derekcollison commented 2 years ago

Can you show us what nats c info for each of the consumers show as well?

lukasGemela commented 2 years ago

Can you show us what nats c info for each of the consumers show as well?

we have it running now with running producer again so numbers will be probably not corresponding with numbers above. But messages in streams are not dropping down, actually very slowly rising

Information for Consumer kpis > KPI_BMC_CONSUMER created 2022-02-16T09:31:31+01:00

Configuration:

        Durable Name: KPI_BMC_CONSUMER
           Pull Mode: true
      Filter Subject: kpis.BMC
      Deliver Policy: All
          Ack Policy: Explicit
            Ack Wait: 30s
       Replay Policy: Instant
     Max Ack Pending: 20,000
   Max Waiting Pulls: 512

Cluster Information:

                Name: nats
              Leader: nats-2

State:

   Last Delivered Message: Consumer sequence: 215,257 Stream sequence: 4,136,841 Last delivery: 1m11s ago
     Acknowledgment floor: Consumer sequence: 215,257 Stream sequence: 4,136,841 Last Ack: 1m11s ago
         Outstanding Acks: 0 out of maximum 20,000
     Redelivered Messages: 0
     Unprocessed Messages: 0
            Waiting Pulls: 5 of maximum 512
Information for Consumer kpis > KPI_TDP_CONSUMER created 2022-02-16T09:25:29+01:00

Configuration:

        Durable Name: KPI_TDP_CONSUMER
           Pull Mode: true
      Filter Subject: kpis.TDP
      Deliver Policy: All
          Ack Policy: Explicit
            Ack Wait: 30s
       Replay Policy: Instant
     Max Ack Pending: 20,000
   Max Waiting Pulls: 512

Cluster Information:

                Name: nats
              Leader: nats-2

State:

   Last Delivered Message: Consumer sequence: 214,660 Stream sequence: 4,136,841 Last delivery: 2m39s ago
     Acknowledgment floor: Consumer sequence: 214,660 Stream sequence: 4,136,840 Last Ack: 2m39s ago
         Outstanding Acks: 0 out of maximum 20,000
     Redelivered Messages: 0
     Unprocessed Messages: 0
            Waiting Pulls: 6 of maximum 512
Information for Consumer kpis > KPI_TDP_NEW_CONSUMER created 2022-02-16T10:33:13+01:00

Configuration:

        Durable Name: KPI_TDP_NEW_CONSUMER
           Pull Mode: true
      Filter Subject: kpis.TDP
      Deliver Policy: All
          Ack Policy: Explicit
            Ack Wait: 30s
       Replay Policy: Instant
     Max Ack Pending: 20,000
   Max Waiting Pulls: 512

Cluster Information:

                Name: nats
              Leader: nats-2

State:

   Last Delivered Message: Consumer sequence: 214,272 Stream sequence: 4,136,841 Last delivery: 3m2s ago
     Acknowledgment floor: Consumer sequence: 214,272 Stream sequence: 4,136,840 Last Ack: 3m2s ago
         Outstanding Acks: 0 out of maximum 20,000
     Redelivered Messages: 0
     Unprocessed Messages: 0
            Waiting Pulls: 5 of maximum 512
tbeets commented 2 years ago

@derekcollison

Was just about to create same/similar issue from slack community report. Here's a Go test that is showing the same:


func TestJetStreamClusterInterestRetentionWithFilteredConsumersExtra(t *testing.T) {
    c := createJetStreamClusterExplicit(t, "R3S", 3)
    defer c.shutdown()

    subjectNameZero := "foo.a.b.c.0"
    subjectNameOne := "foo.a.b.c.1"
    // subjectNameZero := "foo"
    // subjectNameOne := "baz"
    // Client based API
    nc, js := jsClientConnect(t, c.randomServer())
    defer nc.Close()

    _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo.>"}, Retention: nats.InterestPolicy, Replicas: 3})
    if err != nil {
        t.Fatalf("Unexpected error: %v", err)
    }

    msg := []byte("FILTERED")
    sendMsg := func(subj string) {
        t.Helper()
        if _, err = js.Publish(subj, msg); err != nil {
            t.Fatalf("Unexpected publish error: %v", err)
        }
    }

    jsq, err := nc.JetStream(nats.MaxWait(250 * time.Millisecond))
    if err != nil {
        t.Fatalf("Unexpected error: %v", err)
    }

    checkState := func(expected uint64) {
        t.Helper()
        checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
            t.Helper()
            si, err := jsq.StreamInfo("TEST")
            if err != nil {
                t.Fatalf("Unexpected error: %v", err)
            }
            if si.State.Msgs != expected {
                return fmt.Errorf("Expected %d msgs, got %d", expected, si.State.Msgs)
            }
            return nil
        })
    }

    subZero, err := js.PullSubscribe(subjectNameZero, "dlc-0")
    if err != nil {
        t.Fatalf("Unexpected error zero: %v", err)
    }
    subOne, err := js.PullSubscribe(subjectNameOne, "dlc-1")
    if err != nil {
        t.Fatalf("Unexpected error one: %v", err)
    }

    // Now send a bunch of messages
    for i := 0; i < 1000; i++ {
        sendMsg(subjectNameZero)
        sendMsg(subjectNameOne)
    }

    // should be 2000 in total
    checkState(2000)

    // fetch and acknowledge, count records to ensure no errors acknowledging
    getAndAckBatch := func(sub *nats.Subscription) {
        t.Helper()
        successCounter := 0
        messages, err := sub.Fetch(1000)
        if err != nil {
            t.Fatalf("Unexpected error fetching: %v", err)
        }
        for _, message := range messages {
            err = message.Ack()
            if err != nil {
                t.Fatalf("Unexpected error acknowledge: %v", err)
            }
            successCounter++
        }

        if successCounter != 1000 {
            t.Fatalf("Unexpected number of acknowleges for subscription %v", sub)
        }
    }

    // fetch records subscription zero
    getAndAckBatch(subZero)

    // fetch records for subscription one
    getAndAckBatch(subOne)

    // not sure if needed, added a delay...
    time.Sleep(12 * time.Second)

    // since we have 1000 messages on each subject, 2000 in total and we have got this far and acknowledged would expect checkstate to return 0
    checkState(0)

    // tidy up
    if err := js.DeleteConsumer("TEST", "dlc-0"); err != nil {
        t.Fatalf("Unexpected error: %v", err)
    }
    if err := js.DeleteConsumer("TEST", "dlc-1"); err != nil {
        t.Fatalf("Unexpected error: %v", err)
    }
}
tbeets commented 2 years ago

Anecdotally have seen the same with simple test using NATS CLI. Have also played with above to reduce the number of messages and increase the wait time after total consumption, to check. Does not seem to necessarily be high-volume based.

tbeets commented 2 years ago

Thank you to @RowSolAdmin (John) who submitted above and described same/similar issue here as well:

https://github.com/RowSolAdmin/jetstream_test

derekcollison commented 2 years ago

Will take a look, thanks.

derekcollison commented 2 years ago

Thanks to the test I can see what is happening. Thanks!

Working on a fix as we speak.

derekcollison commented 2 years ago

ok fixed, thanks so much for the patience. Will be part of nightly once merged and also part of next release, which should be soon, like next week.

RowSolAdmin commented 2 years ago

Excellent! brilliant news thanks to all involved for help and support.

Kaarel commented 2 years ago

Can confirm that after upgrading to 2.7.3 messages are not accumulating to the stream with retention policy interest any more. However our upgrade process itself was a bit rocky:

The first go without deleting JetStream store_dir reported "[WRN] Error recreating stream "my_stream": deleted message" and "[WRN] Stream create failed for 'USERS > my_stream': deleted message"

nats consumer report my_stream would either print

nats: error: stream not found (10059), try --help

or

╭─────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│                                Consumer report for my_stream with 2 consumers                           │
├──────────┬──────┬────────────┬──────────┬─────────────┬─────────────┬─────────────┬───────────┬─────────┤
│ Consumer │ Mode │ Ack Policy │ Ack Wait │ Ack Pending │ Redelivered │ Unprocessed │ Ack Floor │ Cluster │
├──────────┼──────┼────────────┼──────────┼─────────────┼─────────────┼─────────────┼───────────┼─────────┤
╰──────────┴──────┴────────────┴──────────┴─────────────┴─────────────┴─────────────┴───────────┴─────────╯

nats stream report would print

Obtaining Stream stats

╭──────────────────────────────────────────────────────────────────────────────────────────────╮
│                                         Stream Report                                        │
├──────────┬─────────┬───────────┬───────────┬──────────┬──────┬─────────┬─────────────────────┤
│ Stream   │ Storage │ Consumers │ Messages  │ Bytes    │ Lost │ Deleted │ Replicas            │
├──────────┼─────────┼───────────┼───────────┼──────────┼──────┼─────────┼─────────────────────┤
│ my_stream│ File    │ 2         │ 1,037,073 │ 1024 MiB │ 0    │ 2026428 │ ns_1!, ns_2!, ns_3! │
╰──────────┴─────────┴───────────┴───────────┴──────────┴──────┴─────────┴─────────────────────╯

nats stream info my_stream would either print

nats: error: could not request Stream info: stream not found (10059)

or

Information for Stream my_stream created 0001-01-01T00:00:00Z

Configuration:

             Subjects: my_stream.>
     Acknowledgements: true
            Retention: File - Interest
             Replicas: 3
       Discard Policy: Old
     Duplicate Window: 2m0s
    Allows Msg Delete: true
         Allows Purge: true
       Allows Rollups: false
     Maximum Messages: unlimited
        Maximum Bytes: 1.0 GiB
          Maximum Age: 30d0h0m0s
 Maximum Message Size: unlimited
    Maximum Consumers: unlimited

Cluster Information:

                 Name: nats-cluster
               Leader:
              Replica: ns_3, outdated, seen 7m9s ago, 6,325,122 operations behind
              Replica: ns_2, outdated, seen 1m30s ago
              Replica: ns_1, outdated, not seen, 6,325,122 operations behind

State:

             Messages: 1,037,073
                Bytes: 1024 MiB
             FirstSeq: 3,895,414 @ 2022-02-20T00:45:21 UTC
              LastSeq: 6,958,914 @ 2022-02-25T11:45:12 UTC
     Deleted Messages: 2026428
     Active Consumers: 2

nats stream ls would sometimes print the stream sometimes not.

After deleting store_dir on each cluster node and restarting each cluster server the system seems to be back to normal. Message sequences and ack floor have been reset but that's expected I guess post rm store_dir.

derekcollison commented 2 years ago

Thanks for the information and your patience. Is the system fully operational now as far as you can tell?

Kaarel commented 2 years ago

So far so good. It's only been three days but everything looks stable. The issue here https://github.com/nats-io/nats-server/issues/2872 took several weeks before we really noticed. Prior to activating JetStream we had NATS running for over two years with absolutely no issues. I like the analogy of NATS providing a dial tone. I hope we can go back to not thinking about running NATS in our infra :) Thanks for your quick responses.

derekcollison commented 2 years ago

We are doing our absolute best to get to that goal as well. Thanks.