nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
15.68k stars 1.39k forks source link

work queue stream losing messages #5612

Closed aubergenepizza closed 2 months ago

aubergenepizza commented 3 months ago

Observed behavior

In an R3 cluster, when creating a work queue policy stream with explicit acks and a push consumers, it's possible to lose messages. The way to replicate it is to create and delete ephemeral consumers that listen to different subjects (I've attached a script that can quite reliably reproduce it). Based on the docs I'd expect the messages to not get deleted, as there's never an ack from the consumers, hence the work queue stream (with acks) has to retain the messages.

Expected behavior

All messages sent should be present in the stream.

Server and client version

nats-server: v2.10.16, v2.10.17 nats --version: 0.1.1

Host environment

linux, either cloud k8s or a local cluster (kind) with 3 replicas for the servers

Steps to reproduce

The main idea is to send some messages to different subjects in the stream, subscribe to the delivery subjects, then repeatedly create and delete ephemeral consumers that target the delivery subjects. On my system, with a kind cluster in an R3 setup, this script reliably causes message loss:

#!/bin/bash

nats stream create test --subjects 1 --subjects 2 --subjects 3 --subjects 4 --subjects 5 --subjects 6 --subjects 7 --subjects 8 --subjects 9 --subjects 10 --storage file --replicas 3 --retention work --deny-delete --no-deny-purge  --defaults

nats publish 2 --count 10 "test alskdjalksdjalskdjaksdjlaksjdlkajsdlakjsdlakjsdlakjdlakjsdlaksjdlj"
nats publish 5 --count 10 "test alskdjalksdjalskdjaksdjlaksjdlkajsdlakjsdlakjsdlakjdlakjsdlaksjdlj"
nats publish 7 --count 10 "test alskdjalksdjalskdjaksdjlaksjdlkajsdlakjsdlakjsdlakjdlakjsdlaksjdlj"
nats publish 9 --count 10 "test alskdjalksdjalskdjaksdjlaksjdlkajsdlakjsdlakjsdlakjdlakjsdlaksjdlj"

nats sub bob &
nats sub bob2 &

for i in {0..5}; do
        nats consumer rm test test --force; nats consumer add test test --trace --replay instant --deliver=all --ephemeral --ack explicit --max-deliver=-1 --max-pending=1 --target bob --defaults --filter 1 --filter 2 --filter 3 --filter 4  --filter 5
        sleep 7
        nats consumer rm test test --force; nats consumer add test test --trace --replay instant --deliver=all --ephemeral --ack explicit --max-deliver=-1 --max-pending=1 --target bob2 --defaults --filter 6 --filter 7 --filter 8 --filter 9  --filter 10
        sleep 7
done

kill %1
kill %2
nats stream info test | grep 'Messages: 40' || (echo $(nats stream info test | grep '  Messages: ') expected 40; exit 1)
nats stream rm test --force
aubergenepizza commented 3 months ago

I've been able to reproduce this in 2.10.17, as well as using pull consumers

aubergenepizza commented 2 months ago

@neilalexander were you able to reproduce the bug?

derekcollison commented 2 months ago

I am taking a look, can not re-create from top of main with a server test re-creating what you are doing. Will run your bash script next to see if that is doing something different.

derekcollison commented 2 months ago

Running the script with latest NATS cli and the server from main also succeeds. Will try the 2.10.17 release now.

derekcollison commented 2 months ago

ok running against 2.10.17 release also shows no issues..

derekcollison commented 2 months ago

Using your script from above and a 3 node cluster.

derekcollison commented 2 months ago

Apologies I can see only 20 msgs sometimes with 2.10.17..

derekcollison commented 2 months ago

On 2.10.18-RC1 seems good.

aubergenepizza commented 2 months ago

@derekcollison thanks for taking a look. I tried testing with 2.10.18-RC1, and the bug seems to reproduce for me. Since there's no docker image available, I built from source using go1.22.2 and commit 7c645cf48983ccbd41f9fd334d33390b787b1420 Then configured a 3 server cluster with the bellow configs. I also see some logs on server 1 which I'm not sure are related:

[186749] 2024/07/10 10:52:42.877728 [INF] JetStream cluster new consumer leader for '$G > test > test'
[186749] 2024/07/10 10:52:49.894909 [INF] 127.0.0.1:51894 - rid:96 - Route connection created
[186749] 2024/07/10 10:52:49.895129 [ERR] 127.0.0.1:51894 - rid:96 - attempted to connect to route port
[186749] 2024/07/10 10:52:49.895136 [INF] 127.0.0.1:51894 - rid:96 - Router connection closed: Incorrect Port
[186749] 2024/07/10 10:52:49.895142 [ERR] 127.0.0.1:51894 - rid:96 - attempted to connect to route port
[186749] 2024/07/10 10:52:49.904968 [INF] 127.0.0.1:51908 - rid:98 - Route connection created
[186749] 2024/07/10 10:52:49.905165 [ERR] 127.0.0.1:51908 - rid:98 - attempted to connect to route port
[186749] 2024/07/10 10:52:49.905171 [INF] 127.0.0.1:51908 - rid:98 - Router connection closed: Incorrect Port
[186749] 2024/07/10 10:52:49.905176 [ERR] 127.0.0.1:51908 - rid:98 - attempted to connect to route port
[186749] 2024/07/10 10:52:57.722124 [INF] 127.0.0.1:60162 - rid:103 - Route connection created
[186749] 2024/07/10 10:52:57.722610 [ERR] 127.0.0.1:60162 - rid:103 - attempted to connect to route port
[186749] 2024/07/10 10:52:57.722634 [INF] 127.0.0.1:60162 - rid:103 - Router connection closed: Incorrect Port
[186749] 2024/07/10 10:52:57.722640 [ERR] 127.0.0.1:60162 - rid:103 - attempted to connect to route port
[186749] 2024/07/10 10:52:57.732641 [INF] 127.0.0.1:60166 - rid:105 - Route connection created
[186749] 2024/07/10 10:52:57.732796 [ERR] 127.0.0.1:60166 - rid:105 - attempted to connect to route port
[186749] 2024/07/10 10:52:57.732802 [INF] 127.0.0.1:60166 - rid:105 - Router connection closed: Incorrect Port
[186749] 2024/07/10 10:52:57.732824 [ERR] 127.0.0.1:60166 - rid:105 - attempted to connect to route port
[186749] 2024/07/10 10:52:57.874399 [INF] JetStream cluster new consumer leader for '$G > test > test'

server 1

server_name=n1-c1
listen=4222

accounts {
  $SYS {
    users = [
      { user: "admin",
        pass: "$2a$11$DRh4C0KNbNnD8K/hb/buWe1zPxEHrLEiDmuq1Mi0rRJiH/W25Qidm"
      }
    ]
  }
}

jetstream {
   store_dir=nats/storage
}

cluster {
  name: C1
  listen: 0.0.0.0:6222
  routes: [
    nats://localhost:6223
    nats://localhost:6224
  ]
}

server 2

server_name=n2-c1
listen=4224

accounts {
  $SYS {
    users = [
      { user: "admin",
        pass: "$2a$11$DRh4C0KNbNnD8K/hb/buWe1zPxEHrLEiDmuq1Mi0rRJiH/W25Qidm"
      }
    ]
  }
}

jetstream {
   store_dir=nats/storage-2
}

cluster {
  name: C1
  listen: 0.0.0.0:6224
  routes: [
    nats://localhost:6222
    nats://localhost:6223
  ]
}

server 3

server_name=n3-c1
listen=4223

accounts {
  $SYS {
    users = [
      { user: "admin",
        pass: "$2a$11$DRh4C0KNbNnD8K/hb/buWe1zPxEHrLEiDmuq1Mi0rRJiH/W25Qidm"
      }
    ]
  }
}

jetstream {
   store_dir=nats/storage-3
}

cluster {
  name: C1
  listen: 0.0.0.0:6223
  routes: [
    nats://localhost:6222
    nats://localhost:6224
  ]
}
derekcollison commented 2 months ago

ok will continue to test with your script to see.

kozlovic commented 2 months ago

@derekcollison It seems that messages are incorrectly removed from this place: https://github.com/nats-io/nats-server/blob/e01679d4ca4a8f9f2de564c48c7721d0f9f5209e/server/consumer.go#L5614

With this test and original code, we get AckFloor.Stream==0 and Delivered.Stream=21 and 21 is pending, but this is a consumer that has filtered subject and never received any message below that 21 sequence, but still decides to remove messages 1..20. That does not seem right?

derekcollison commented 2 months ago

Looking..

derekcollison commented 2 months ago

I see it now.. Working on fix.