onebeyond / rascal

A config driven wrapper for amqp.node supporting multi-host connections, automatic error recovery, redelivery flood protection, transparent encryption / decryption and channel pooling.
MIT License
451 stars 69 forks source link

(Question) How to gracefully close a publisher service? #176

Closed AllanDaemon closed 3 years ago

AllanDaemon commented 3 years ago

Given a process that is just publishing to RabbitMQ, how to close it gracefully, in order that all the messages send to be published to be sent instead of lost?

To better explain it, here is a minimal version of the .mjs file that I'm using to test latency and found out the issue (The original is at the end of the post):

import rascal from 'rascal';
const Broker = rascal.BrokerAsPromised;

const RABBIT_MQ_URL = process.env.RMQ_URI
console.log(`RMB_URL:`, RABBIT_MQ_URL);

const rascal_confs = {
  vhosts: {
    "/": {
      connection: {url: conf.RABBIT_MQ.URL},
      exchanges: { 'rmq.stress.test': {type: 'topic'}, },
      queues: { 'RMQ_Stress_Test': {}, },
      bindings: {
        'external_bind': {
          source: 'rmq.stress.test',
          destination: 'RMQ_Stress_Test',
          bindingKey: 'sender',
          destinationType: 'queue',
        }
      },
    }
  },
  publications: {
    test : {
      vhost: "/",
      exchange: "rmq.stress.test",
      routingKey: "sender",
    },
  }
}

let broker;

async function rabbitmqSetup() {
  console.log(`Setting up RabbitMQ:`, rascal_confs.vhosts['/'].connection);
  broker = await Broker.create(rascal_confs);
  broker.on('error', ()=> {console.error; process.exit(-1)});
  console.log(`Setup done`);
}

await rabbitmqSetup();

async function rabbitmqSend(msg) {
  const ret = await broker.publish('test', msg);
  ret.on('error', console.error);
  console.log(ret)
}

for(var i=0; i<100; i++)
{
  await rabbitmqSend(`Test ${i}`);
  console.log(`${i}`, lat)
}

await broker.shutdown();

If I run this code, it will close with no error, but just one or sometimes none of the published messages will get to the RabbitMQ.

If I remove the last line with the await broker.shutdown(), it will send all the messages to RabbitMQ but it will be kept running forever without exiting the process.

I was looking for something like await broker.finishSendingEverything() thing, but I couldn't find anything alike (I tried looking in the documentation, Google, code in the repository, StackOverflow, etc).

The main problem is that when the process has to be restarted, it cannot lose anything. Another thing I was looking for is a way to instead of waiting to send everything that is remaining, read and save them in another place and try to send it after I restart.

I tried to investigate the source code for that, but because of the encapsulation using closure for the broker in the BrokerAsPromised, I can't do anything practically.

So how to handle process closing without losing data?

Basic info

Original file used to test (with latency measurements)

import { hrtime } from 'process';

import rascal from 'rascal';
const Broker = rascal.BrokerAsPromised;

const RABBIT_MQ_URL = process.env.RMQ_URI
console.log(`RMB_URL:`, RABBIT_MQ_URL);

const rascal_confs = {
  vhosts: {
    "/": {
      connection: {url: RABBIT_MQ_URL},
      exchanges: { 'rmq.stress.test': {type: 'topic'}, },
      queues: { 'RMQ_Stress_Test': {}, },
      bindings: {
        'external_bind': {
          source: 'rmq.stress.test',
          destination: 'RMQ_Stress_Test',
          bindingKey: 'sender',
          destinationType: 'queue',
        }
      },
    }
  },
  publications: {
    test : {
      vhost: "/",
      exchange: "rmq.stress.test",
      routingKey: "sender",
    },
  }
}

let broker;

async function rabbitmqSetup() {
  console.log(`Setting up RabbitMQ:`, rascal_confs.vhosts['/'].connection);
  broker = await Broker.create(rascal_confs);
  broker.on('error', ()=> {console.error; process.exit(-1)});
  console.log(`Setup done`);
}

await rabbitmqSetup();

async function rabbitmqSend(msg) {
  const ret = await broker.publish('test', msg);
  ret.on('error', console.error);
  console.log(ret)
}

var total = 0;
var lat_sum = 0;
var lat_max = 0;
var lat_min = 9999999999999;

for(var i=0; i<100; i++)
{
  const start = hrtime.bigint();
  await rabbitmqSend(`Test ${i}`);
  const end = hrtime.bigint();

  const lat = Number(end - start)/1000000;
  total += 1;
  lat_sum += lat;
  if (lat < lat_min) lat_min = lat;
  if (lat > lat_max) lat_max = lat;

  console.log(`${i}`, lat)
}

console.log(`\n[${i}]:\t$avg: ${lat_sum/total}\tmax: ${lat_max}\tmin: ${lat_min}\r`);

await broker.shutdown()
cressie176 commented 3 years ago

Hi @AllanDaemon,

Given a process that is just publishing to RabbitMQ, how to close it gracefully, in order that all the messages send to be published to be sent instead of lost?

You need to have some mechanism (e.g. outstanding) for monitoring inflight messages, and preventing shutdown until all have been successfully published by the success event and absence of a return event..

broker.publish('p1', 'some message', (err, publication) => {
  if (err) throw err; // publication didn't exist
  publication
    .on('success', (messageId) => {
      console.log('Message id was: ', messageId);
    })
    .on('error', (err, messageId) => {
      console.error('Error was: ', err.message);
    })
    .on('return', (message) => {
      console.warn('Message was returned: ', message.properties.messageId);
    });
});

'return' events should be emitted first, but because 'success' indicates that the message was successfully delivered to the broker, you will still get a 'success' event even when a message is unrouteable.

However, this does not necessarily guarantee that you will never lose messages. Your application could still crash or be rudely killed while messages are in progress. You cannot resolve through RabbitMQ alone - you need a detection and recovery mechanism built into your wider systems architecture.

cressie176 commented 3 years ago

To give a little more context as to why Rascal shouldn't keep track of outstanding messages when publishing, imagine the following scenario...

  1. start workflow (either through an HTTP request or incoming message)
  2. publish message 1
  3. write to a database
  4. publish message 2
  5. finish workflow (e.g. send an HTTP response or ack the incoming message)

If Rascal refused to shutdown while there are outstanding messages, it could stop half way through the workflow during step 3, therefore you need to rely on monitoring the outstanding work independently of Rascal.

With this in mind are you OK to close?