nats-io / nats.node

Node.js client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
1.55k stars 163 forks source link

[JetStream] - ack dont deleting message #589

Closed iam-mishgun closed 1 year ago

iam-mishgun commented 1 year ago

What version were you using?

nats server - 2.9.21 nats.js - 2.16.0

What environment was the server running in?

MacOs, m2 max, docker docker-compose.yaml

  nats:
    image: 'nats:2.9.21-alpine3.18'
    expose:
      - '4222'
      - '8222'
    ports:
      - '4222:4222'
      - '8222:8222'
    command: --js --http_port 8222 -p 4222

Is this defect reproducible?

const { connect, AckPolicy } = require('nats')

const run = async () => {
  const stream = 'b'

  const nc = await connect({ servers: ['nats://localhost:14422'] })
  const jsm = await nc.jetstreamManager()
  await jsm.streams.add({ name: stream, subjects: ['a.*'], no_ack: false })
  const js = jsm.jetstream()

  await jsm.consumers.add(stream, {
    durable_name: 'me',
    ack_policy: AckPolicy.Explicit
  })

  const consumer = await js.consumers.get(stream, 'me')

  process.on('SIGINT', async () => {
    await consumer.delete()
    process.exit()
  })
  process.on('SIGTERM', async () => {
    await consumer.delete()
    process.exit()
  })

  // await js.publish('a.abc', JSON.stringify({ a: 1 }))
  const messages = await consumer.consume({
    max_messages: -1
  })
  for await (const msg of messages) {
    console.log(msg.json())
    msg.ack() // await msg.ackAck() returns true  every time
  }
}

run()

after stop my code I have this info from nats-cli

Information for Stream b created 2023-08-24 09:11:31

             Subjects: a.*
             Replicas: 1
              Storage: File

Options:

            Retention: Limits
     Acknowledgements: true
       Discard Policy: Old
     Duplicate Window: 2m0s
    Allows Msg Delete: true
         Allows Purge: true
       Allows Rollups: false

Limits:

     Maximum Messages: unlimited
  Maximum Per Subject: unlimited
        Maximum Bytes: unlimited
          Maximum Age: unlimited
 Maximum Message Size: unlimited
    Maximum Consumers: unlimited

State:

             Messages: 2
                Bytes: 80 B
             FirstSeq: 1 @ 2023-08-24T06:11:31 UTC
              LastSeq: 2 @ 2023-08-24T06:11:43 UTC
     Active Consumers: 0
   Number of Subjects: 1

Given the capability you are leveraging, describe your expectation?

After stop my code I expect see 0 messages from nats-cli info

Given the expectation, what is the defect you are observing?

But when I rerun my code I get acked messages again

iam-mishgun commented 1 year ago

should use

  await jsm.streams.add({
    name: stream,
    subjects: ['a.*'],
    no_ack: true,
    retention: RetentionPolicy.Workqueue // <- this
  })