nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
15.92k stars 1.41k forks source link

WorkQueue jetstream messages are not deleted on non-leader nodes when used as mirror source [v2.10.14] #5450

Open qiongzhu opened 5 months ago

qiongzhu commented 5 months ago

Observed behavior

WorkQueue jetstream messages are not deleted on non-leader nodes when used as mirror source; jetstream members holds inconsistent stream history, could not recover from that state

Expected behavior

  1. all JetStream nodes should have consistent message history
  2. non-leader workQueue stream nodes should delete messages that successfuly mirrored to new location

Server and client version

nats-server: 2.10.14 and 2.9.25 both have this problem natscli: 0.1.4

Host environment

using official docker image: nats:2.10.14 or nats:2.9.25

official binary release also have the same problem

Steps to reproduce

env step: local 3 nodes nats cluster

create a simple config file nats-account.conf with following content

accounts {
  $SYS {
    users = [
      { user: "admin",
        pass: "password"
      }
    ]
  }
}

run a fully local 3-nodes cluster with docker; you can use nats:2.10.14 or nats:2.9.25.

# docker rm -f node1 node2 node3

docker run -d --network=host --name=node1 \
    -v $PWD/nats-account.conf:/nats.conf:ro \
    nats:2.10.14 -a 127.0.0.1 -p 8001 -n node1 \
        --jetstream --store_dir /data \
        --config nats.conf \
        --cluster_name test --cluster nats://127.0.0.1:8101 \
        --routes 'nats://127.0.0.1:8101,nats://127.0.0.1:8102,nats://127.0.0.1:8103'

docker run -d --network=host --name=node2 \
    -v $PWD/nats-account.conf:/nats.conf:ro \
    nats:2.10.14 -a 127.0.0.1 -p 8002 -n node2 \
        --jetstream --store_dir /data \
        --config nats.conf \
        --cluster_name test --cluster nats://127.0.0.1:8102 \
        --routes 'nats://127.0.0.1:8101,nats://127.0.0.1:8102,nats://127.0.0.1:8103'

docker run -d --network=host --name=node3 \
    -v $PWD/nats-account.conf:/nats.conf:ro \
    nats:2.10.14 -a 127.0.0.1 -p 8003 -n node3 \
        --jetstream --store_dir /data \
        --config nats.conf \
        --cluster_name test --cluster nats://127.0.0.1:8103 \
        --routes 'nats://127.0.0.1:8101,nats://127.0.0.1:8102,nats://127.0.0.1:8103'

then wait some time for the cluster startup. now create nats cli context for easy access

nats context save user -s 'nats://127.0.0.1:8001,nats://127.0.0.1:8002,nats://127.0.0.1:8003'

nats context save sys -s 'nats://admin:password@127.0.0.1:8001,nats://admin:password@127.0.0.1:8002,nats://admin:password@127.0.0.1:8003'

nats context select user

# optional: run following 2 commands to verify cluster works
nats --context=sys server ls
nats account info

steps to reproduce this problem

create a jetstream as mirrored source stream, use file based R=3

nats stream add src \
    --subjects="src.>" \
    --storage=file --replicas=3 --retention=work --discard=old \
    --max-age=1d --max-bytes=100mb --max-msgs=-1 --max-msgs-per-subject=-1 \
    --max-msg-size=-1 --dupe-window=10m --allow-rollup \
    --no-deny-delete --no-deny-purge

create a jetstream as mirror destination stream

nats stream add dst \
    --subjects='dst.>' \
    --source src \
    --storage=file --replicas=3 --retention=limits --discard=old \
    --max-age=-1 --max-bytes=100mb --max-msgs=-1 --max-msgs-per-subject=-1 \
    --max-msg-size=-1 --dupe-window=10m --allow-rollup \
    --no-deny-delete --no-deny-purge

use all defaults in subsequent questions about how to import the source stream. that is:

nats stream add dst \
        --subjects='dst.>' \
        --source src \
        --storage=file --replicas=3 --retention=limits --discard=old \
        --max-age=-1 --max-bytes=100mb --max-msgs=-1 --max-msgs-per-subject=-1 \
        --max-msg-size=-1 --dupe-window=10m --allow-rollup \
        --no-deny-delete --no-deny-purge
? Adjust source "src" start No
? src Source Filter source by subject
? Import "src" from a different JetStream domain No
? Import "src" from a different account No
Stream dst was created

now we have following stream report. notice that the replication report indicates mirror is now working

nats stream report
Obtaining Stream stats

╭─────────────────────────────────────────────────────────────────────────────────────────────────────╮
│                                            Stream Report                                            │
├────────┬─────────┬───────────┬───────────┬──────────┬───────┬──────┬─────────┬──────────────────────┤
│ Stream │ Storage │ Placement │ Consumers │ Messages │ Bytes │ Lost │ Deleted │ Replicas             │
├────────┼─────────┼───────────┼───────────┼──────────┼───────┼──────┼─────────┼──────────────────────┤
│ dst    │ File    │           │ 0         │ 0        │ 0 B   │ 0    │ 0       │ node1*, node2, node3 │
│ src    │ File    │           │ 0         │ 0        │ 0 B   │ 0    │ 0       │ node1*, node2, node3 │
╰────────┴─────────┴───────────┴───────────┴──────────┴───────┴──────┴─────────┴──────────────────────╯

╭─────────────────────────────────────────────────────────────────────╮
│                         Replication Report                          │
├────────┬────────┬────────────┬───────────────┬────────┬─────┬───────┤
│ Stream │ Kind   │ API Prefix │ Source Stream │ Active │ Lag │ Error │
├────────┼────────┼────────────┼───────────────┼────────┼─────┼───────┤
│ dst    │ Source │            │ src           │ 0.82s  │ 0   │       │
╰────────┴────────┴────────────┴───────────────┴────────┴─────┴───────╯

now we send 10 messages to source stream

for idx in {0..9} ; do nats req 'src.hello' "${idx} | $(date)" ; sleep 1 ; done

check the stream report, it is ok, the message mirrored to dst stream, and removed from src (because src is a workqueue stream):

Obtaining Stream stats

╭───────────────────────────────────────────────────────────────────────────────────────────────────────╮
│                                             Stream Report                                             │
├────────┬─────────┬───────────┬───────────┬──────────┬─────────┬──────┬─────────┬──────────────────────┤
│ Stream │ Storage │ Placement │ Consumers │ Messages │ Bytes   │ Lost │ Deleted │ Replicas             │
├────────┼─────────┼───────────┼───────────┼──────────┼─────────┼──────┼─────────┼──────────────────────┤
│ src    │ File    │           │ 0         │ 0        │ 0 B     │ 0    │ 0       │ node1*, node2, node3 │
│ dst    │ File    │           │ 0         │ 10       │ 1.2 KiB │ 0    │ 0       │ node1*, node2, node3 │
╰────────┴─────────┴───────────┴───────────┴──────────┴─────────┴──────┴─────────┴──────────────────────╯

╭─────────────────────────────────────────────────────────────────────╮
│                         Replication Report                          │
├────────┬────────┬────────────┬───────────────┬────────┬─────┬───────┤
│ Stream │ Kind   │ API Prefix │ Source Stream │ Active │ Lag │ Error │
├────────┼────────┼────────────┼───────────────┼────────┼─────┼───────┤
│ dst    │ Source │            │ src           │ 0.12s  │ 0   │       │
╰────────┴────────┴────────────┴───────────────┴────────┴─────┴───────╯

check stream state of src, it is ok. please notice that current leader is node1

nats stream state src
State for Stream src created 2024-05-19 19:32:01

Cluster Information:

                 Name: test
               Leader: node1
              Replica: node2, current, seen 0.37s ago
              Replica: node3, current, seen 0.37s ago

State:

             Messages: 0
                Bytes: 0 B
             FirstSeq: 11
              LastSeq: 10 @ 2024-05-19T11:37:31 UTC
     Active Consumers: 0

Here is the problem: now we request cluster election, or restart current leader of the stream 'src' to force election, like this:

# step down
nats stream cluster step-down src

# or restart server
docker restart -t 15s node1

then the stream run into trouble. for example, we run 'step down', the immediate output shows:

Cluster Information:

                 Name: test
               Leader: node2
              Replica: node1, current, seen 0.49s ago
              Replica: node3, current, seen 0.49s ago

State:

             Messages: 10
                Bytes: 740 B
             FirstSeq: 1 @ 2024-05-19T11:37:21 UTC
              LastSeq: 10 @ 2024-05-19T11:37:31 UTC
     Active Consumers: 0
   Number of Subjects: 1

those message consumed by replication reappears in stream src, the status can be verified via nats stream state src, or nats stream get src ${idx}, those message indeed can be accessed.

qiongzhu@ws:~/workspace$ nats stream get src 1
Item: src#1 received 2024-05-19 11:37:21.465265444 +0000 UTC on Subject src.hello

0 | Sun May 19 07:37:21 PM CST 2024

qiongzhu@ws:~/workspace$ nats stream get src 2
Item: src#2 received 2024-05-19 11:37:22.528294991 +0000 UTC on Subject src.hello

1 | Sun May 19 07:37:22 PM CST 2024

qiongzhu@ws:~/workspace$ nats stream get src 3
Item: src#3 received 2024-05-19 11:37:23.589775421 +0000 UTC on Subject src.hello

2 | Sun May 19 07:37:23 PM CST 2024

luckily we can issue serveral other nats stream cluster step-down src command, to re-select the correct leader node1 to make the stream correct again.

to make the stream cluster consistent again, following steps can help:

  1. make sure current leader is correct. in this case, that is node1
  2. reduce replica count to R=1: nats stream edit src --replicas=1 -f
  3. adjust replica count back to R=3: nats stream edit src --replicas=3 -f

After that, the stream looks like normal, but it is not. we can issue multiple nats stream cluster step-down src commands to select a leader out of node1, for example select node2 as leader, then send another 10 messages by for idx in {0..9} ; do nats req 'src.hello' "${idx} | $(date)" ; sleep 1 ; done

use nats stream report we can see the 10 new message now mirrored to dst. then run nats stream state src, we can see that the 10 new messages in workqueue are not deleted by mirror process

# nats stream state src
State for Stream src created 2024-05-19 19:32:01

Cluster Information:

                Name: test
              Leader: node2
             Replica: node1, current, seen 17ms ago
             Replica: node3, current, seen 17ms ago

State:

            Messages: 10
               Bytes: 740 B
      First Sequence: 11 @ 2024-05-19 19:59:24 UTC
       Last Sequence: 20 @ 2024-05-19 19:59:33 UTC
    Active Consumers: 0
  Number of Subjects: 1

after that, we can issue multiple nats stream cluster step-down src commands until leader node1 is selected, we can see its states is correct, even this node is not leader at that time

 Cluster Information:

                  Name: test
                Leader: node1
               Replica: node2, current, seen 487ms ago
               Replica: node3, current, seen 488ms ago

State:

              Messages: 0
                 Bytes: 0 B
        First Sequence: 21
         Last Sequence: 20 @ 2024-05-19 19:59:33 UTC
      Active Consumers: 0
jnmoyne commented 1 month ago

I can reproduce this, even with 2.11

derekcollison commented 1 month ago

If both sides are WQ that is the way it is designed. Removing from one stream currently never affects the other source/mirror.

jnmoyne commented 1 month ago

As an aside, what is your use case for streaming from a work queue stream? Work queue stream exist in order to provide exactly once consumption (transmission plus processing) of messages, not really to be sourced from.

qiongzhu commented 1 month ago

As an aside, what is your use case for streaming from a work queue stream? Work queue stream exist in order to provide exactly once consumption (transmission plus processing) of messages, not really to be sourced from.

Here is the use case: we have hub-spoke topology with multiple leafnodes connected to hub cluster, and we want 'move' messages from hub to spoke. so we send message in hub cluster, record it into a workqueue jetstream, then 'sources' the stream in spoke side.

In this way, messages that are replicated to spoke cluster, will be deleted from hub cluster; then we have high availablity on both sender/receiver side even if leafnode connection interrupts.

In this issue, we just found some inconsistency about workqueue jetstream:

  1. if the 'sources' replication is something like 'consumer', the messages should be deleted from all source stream replicas
  2. if otherwise, the messages should be kept in source stream even it is a WorkQueue stream