nats-io / nats.deno

Deno client for NATS, the cloud native messaging system
https://www.nats.io
Apache License 2.0
158 stars 50 forks source link

JetStream KV gets consumer stuck in a cluster node #691

Closed ThalesValentim closed 4 months ago

ThalesValentim commented 4 months ago

Observed behavior

The KV consumer subscription remains active without interest after the client reconnects to another available cluster node.

Freshly, I had discussed the possibility of nats-server issue (see 5352) and it has been raised that it might be a client bug when keeping interest on the old subscription.

Expected behavior

Consumers should be unsubscribed/removed if the client has reconnected to another available cluster node and has no interest on it.

Server and client version

Host environment

Steps to reproduce

Considering that nats-server has 3 cluster servers running

  1. Run an app to watch for JetStream KV updates with a cluster mode setup, Node.js example:

    const nc = await nats.connect({
    servers: ['localhost:4111','localhost:4222','localhost:4333'],
    });
    const sc = nats.StringCodec();
    const js = nc.jetstream();
    const kv = await js.views.kv("testing");
    const jsm = await nc.jetstreamManager();
    const stream = await jsm.streams.get('KV_testing');
    const { config } = await stream.info();
    
    await jsm.streams.update(config.name, {
    ...config,
    num_replicas: 3
    });
    
    const watch = await kv.watch();
    (async () => {
    for await (const e of watch) {
      // do something with the change
      console.log(
        `watch: ${e.key}: ${e.operation} ${e.value ? sc.decode(e.value) : ""}`,
      );
    }
    })();
    
    kv.put('hello.word', sc.encode("hi"));
  2. Check which cluster has been chosen: nats consumer report KV_testing

  3. Force container restart/node fail situation, so the client will connect to another available cluster node

  4. Check the consumer report again: nats consumer report KV_testing

  5. Output shows two consumers instead of one expected

aricart commented 4 months ago

The multiple consumers is not the problem - if the ordered consumer (the watcher fails) it will recreate it from underneath - the client will attempt to remove the old consumer but it may not succeed, specially if the cluster is flapping. It will then attempt to recreate the consumer, and the server will prune the old consumer after 5 seconds.

aricart commented 4 months ago

Also if the consumer is never reaped, you have 2 processes that are watching/consuming etc. As you can see below, we create a new subscription with a new inbox, and the old consumer unsubs.

https://github.com/nats-io/nats.deno/blob/cdce825dfea61564734afeb6964ab387ea713d3c/jetstream/jsclient.ts#L784

Is your watch throwing an error?

ThalesValentim commented 4 months ago

Also if the consumer is never reaped, you have 2 processes that are watching/consuming etc. As you can see below, we create a new subscription with a new inbox, and the old consumer unsubs.

https://github.com/nats-io/nats.deno/blob/cdce825dfea61564734afeb6964ab387ea713d3c/jetstream/jsclient.ts#L784

Is your watch throwing an error?

Thanks for the explanation.

It has been observed that the consumer is never reaped, even with only one KV watch started by the client. We are running 3 clustered nodes, let us name them nats1, nats2, and nats3.

Assuming the app client starts and connects to the nats3 node:

The suspicion is that when the node (nats3) is unavailable, it can't unsubscribe the old consumer, and the client keeps the old subscription active when nats3 is back.

Checking nats consumer report KV_testing, if the nats3 is not recovered (fail state), it shows Inaccessible Consumers :

Screenshot 2024-04-29 at 10 27 41

When the nats3 is healthy again:

Screenshot 2024-04-29 at 11 30 00

Are updates being propagated through both existing consumers? The expectation is that only one of the consumers listed above should receive the updates over the client watch callback.

I noticed the Ack Floor receives an update when publishing a change to the KV_testing: nats kv put testing test.key test.value

Screenshot 2024-04-29 at 14 31 40

I haven't experienced error exceptions following the steps described.

aricart commented 4 months ago

Couple of things:

When the client disconnects - the server terminates all interest for that client (subjects the client is interested in) On reconnect the client will resend interest.

In the case of the ordered consumer, if the server comes back quickly enough, it may still have the consumer (which is reaped after the specified time (would be nice if you could print the full consumer infos for the clients on the KV_testing)

When the client reconnects, there's a slight chance that the consumer is still live, and the subscription is rewired between the consumer and client, and things resume.

If the client detects a sequence gap or if the heartbeat monitor detects that the client is not getting messages, the client will recreate the consumer. If you are seeing 2 different consumers and one of them is not going away, there are 2 different consumers on that KV.

ThalesValentim commented 4 months ago

Please, follow the outputs of the consumers' info:

The first one was the initial consumer, and the last one was created after the node failure:

Screenshot 2024-04-30 at 10 53 03

Before publishing a message:

9H5K3K4DYTB8N6538QY5X4

Screenshot 2024-04-30 at 10 54 30

9H5K3K4DYTB8N6538QY73H

Screenshot 2024-04-30 at 10 54 49

After publishing a message:

9H5K3K4DYTB8N6538QY5X4

Screenshot 2024-04-30 at 10 55 59

9H5K3K4DYTB8N6538QY73H

Screenshot 2024-04-30 at 10 56 22

Subscriptions:

{
  "account": "$G",
  "subject": "_INBOX.9H5K3K4DYTB8N6538QY65L",
  "sid": "2",
  "msgs": 63,
  "cid": 58
}
{
  "account": "$G",
  "subject": "_INBOX.9H5K3K4DYTB8N6538QY6V0",
  "sid": "3",
  "msgs": 80,
  "cid": 58
}
aricart commented 4 months ago

I did another local test - which uses some refactored APIs but shares all the changes in your current client:

import { connect, millis } from "../src/mod.ts";
import { Kvm } from "../kv/mod.ts";

const nc = await connect();

const kvm = new Kvm(nc);
const kv = await kvm.create("A", { replicas: 3, history: 100 });

const w = await kv.watch();
console.log(millis((await w._data.consumerInfo()).config.inactive_threshold));
(async () => {
  for await (const e of w) {
    console.log(`${e.key}: ${e.string()} - ${e.revision}`);
  }
})().catch((err) => {
  console.log(err);
  throw err;
});

let i = 1;
setInterval(() => {
  const idx = i++;
  kv.put("a", `${idx}`).catch((err) => {
    i--;
  });
}, 3000);

setInterval(() => {
  const subs = nc.protocol.subscriptions.all();
  const subjects = subs.map((sub) => {
    return sub.subject || sub.requestSubject;
  });
  console.log(subjects);
}, 1000);

And ran that against my cluster tool - https://github.com/nats-io/nats.deno/blob/d32538fa38bbd636826d53216ad5b86e5a71708e/tests/helpers/cluster.ts

And started chaos on it - where random servers are restarted, creating a very hostile environment:

5000
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
a: 1 - 1
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
a: 2 - 2
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
a: 3 - 3
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
a: 4 - 4
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
a: 5 - 5
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
a: 6 - 6
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
a: 7 - 7
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
a: 8 - 8
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6RVO" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6RVO" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6RVO" ]
a: 9 - 9
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6RVO" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6RVO" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6RVO" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
a: 10 - 10
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
a: 11 - 11
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6UI0" ]
a: 12 - 12
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6UI0" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6UI0" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6UI0" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6UI0" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6UI0" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6UI0" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6UI0" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6VT6" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6VT6" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6VT6" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6VT6" ]

Note that at any one point the client only has 2 suscriptions, one of them changes whenever the ordered consumer resets.

This means that somehow a cluster route is possibly staying open (from a previous subscription) but for a fact shows that the process is NOT listening on that subject. I don't know how long before the server figures out that it has that stale situation. Have you observed when it goes away?

ThalesValentim commented 4 months ago

Thanks for sharing the test and results! I added the loggings into my local tests and got the same behavior. The client didn't report listening to multiple subjects. I ran the app test for different timespans, for up to two hours, and I observed that the consumer disappeared from the report only after stopping the application / dropping the connection.

ThalesValentim commented 4 months ago

Debugging the KV watcher via JetStreamSubscriptionImpl, the "old subscription" reference name was found in the path _data.info.last

Screenshot 2024-05-02 at 12 17 11

also found the current consumer in the watcher config:

Screenshot 2024-05-02 at 12 17 26

consumer report:

Screenshot 2024-05-02 at 12 26 56
aricart commented 4 months ago

@ThalesValentim I found the issue - will have a fix and a release in a bit

aricart commented 4 months ago

@ThalesValentim all the javascript clients have been released with the above fix! Thanks for helping me get to the bottom of this!

ThalesValentim commented 4 months ago

@ThalesValentim all the javascript clients have been released with the above fix! Thanks for helping me get to the bottom of this!

Thanks a lot!