nats-io / nats-streaming-server

NATS Streaming System Server
https://nats.io
Apache License 2.0
2.51k stars 283 forks source link

All unacknowledged messages are delivered at once on subscription resumption #1009

Open martin-spinks opened 4 years ago

martin-spinks commented 4 years ago

Background We've noticed a behaviour we would like to suggest a change for. We are running a number of subscribers (in both node and python), these are subscribed with a durable name a queue group and manual acks. When running multiple instances of these subscribers, messages are distributed across the instances and subscribers can drop out and come back fine (durable name). Not all messages that are sent will be acknowledged, if a piece of work fails its message is not acknowledged and nats-streaming will retry later.

Request We have noticed that if all the subscribers close their connections the first one back gets a huge influx of unacknowledged messages (it can be a lot and we suspect it maybe the entire backlog). Can nats-streaming therefore honour the maxInFlight setting for that channel when attempting to re-deliver all the unacknowledged messages on resumption?

We understand this is related to #732 and #187. We don't have any problem with unacknowledged messages being redelivered first on resume, this is actually fine for our use case. But having all messages re-delivered at once is a problem as there could be a LOT of them. This could send the first listening subscription into a sudden heavy load situation.

To replicate If we have a running nats-streaming-server and we have a subscriber (below in Node) that only acknowledges messages that are strings that equal bob. (subscriber.js)

const stan = require('node-nats-streaming');
const os = require('os');

// Client Id
const clientId = `${os.hostname()}.${process.pid}`.replace(/\./g, '-');

// Transport
const transport = stan.connect('test-cluster', clientId);
transport.on('connect', function () {
  // Generic function
  const caller = (msg) => {
    console.log('Function', msg.getSequence(), msg.getData());
    if (msg.getData() === 'bob') {
      console.log('Got a bob message');
      msg.ack()
    }
  }

  // Generic function
  const wrapper = (msg) => {
    caller(msg);
  }

  const exit = () => {
    transport.close();
  }

  // Set options
  const options = transport.subscriptionOptions();
  options.setAckWait(2000);
  options.setMaxInFlight(2);
  options.setManualAckMode(true);
  options.setDurableName('nlp.a');

  // Subscribe
  const subscription = transport.subscribe('nlp.a', 'q.nlp.a', options);
  subscription.on('message', wrapper);

  process.on('SIGTERM', exit);
  process.on('SIGINT', exit);
  process.on('SIGQUIT', exit);

  // Done
  console.log('Ready');
})

If we run multiple instances of this, messages are shared between the instances (queue name), if one instance is stopped and started it will resume and join the group again. If we send bob messages and non bob messages via: (caller.js)

const stan = require('node-nats-streaming');
const os = require('os');

// Client Id
const clientId = `${os.hostname()}.${process.pid}`.replace(/\./g, '-');

// Transport
const transport = stan.connect('test-cluster', clientId);
transport.on('connect', function () {

  for (let i = 0; i < 8; i++) {
    transport.publish('nlp.a', JSON.stringify({ a: 'a' }), function (err, uid) {
      console.log('Message recieved ', uid);
    })    
  }

  for (let i = 0; i < 8; i++) {
    transport.publish('nlp.a', 'bob', function (err, uid) {
      console.log('Bob message recieved ', uid);
    })
  }
})

You can see bob messages appearing and being acknowledged and after sometime you can observe all the retries happening. If you then close both instances of the of the subscriber.js and start just one instance of subsriber.js again; it receives more that the maxInFlight messages (in this example 2). This number could get very large very quickly in our situation.

Environment

nats-streaming-server version 0.16.2, nats-server: v2.0.4
kozlovic commented 4 years ago

Redeliveries do not honor maxInflight for the reason that if the server sends messages up to MaxInflight but the client does not ack them or simply missed them, then the server would not be able to redeliver since the number of outstanding messages would already be at MaxInflight.

But in the case of a subscription restart, we may be able to limit the number of redelivered messages to MaxInflight. That is likely to happen only for queue subs (since this is the only case where more un-ack'ed messages than MaxInflight can be assigned to a subscription), or in cases where a non-queue durable subscription is restarted with a lower MaxInflight I guess...

Will look into that, thanks!

martin-spinks commented 4 years ago

Hi @kozlovic, thank you so much for your response and for looking into this. You are correct it is the case of the fist subscription back (say after a restart) will potentially get a lot of messages (in our case its a queue + durable subscription).

For us, it would be sensible to be able to limit the number that are redelivered, something like the maxInFlight for the channel, which would mean that first subscription back wasn't swamped with a bucket load of messages.

Is this a feature you see being added in the future?