amqp-node / amqplib

AMQP 0-9-1 library and client for Node.JS
https://amqp-node.github.io/amqplib/
Other
3.69k stars 474 forks source link

Support Auto-reconnection #25

Open skeggse opened 11 years ago

skeggse commented 11 years ago

At the moment, when a socket closes it's basically permanent. All kinds of methods are replaced in favor of functions which only throw and there's no reconnect support.

Even in a data center, connections can fail or time out, and reconnection would allow the connection to fail and recover from the failure, instead of simply giving up.

This might need to happen at the connect.js level given the current setup, but that would put messages that have been sent but not acked into an uncertain place where it's unclear if the message has reached the server or just fallen. I know from experience that trying to get reconnection to work well can be tricky. For a counterexample, see node-amqp's reconnection, which appears to leak everywhere.

michaelklishin commented 11 years ago

Recovery of AMQP 0-9-1 connections involves more than simply reconnecting. You may want to get familiar with what other clients do. Fortunately, there are good examples of how consumer recovery can work with no effort on the developer end.

It's a lot more involved for producers as socket issues may be detected in several seconds. As such, not only publishers that cannot afford to lose a message need to use publisher confirms, they also need to keep a write-ahead log of messages that can be replayed if the publisher fails before outstanding confirms arrive.

michaelklishin commented 11 years ago

For example, Hutch adds a disk-based write-ahead log on top of Bunny among other things.

skeggse commented 11 years ago

Would it not make sense to roll that into the parent library? Maybe not disk-based, but a write-ahead log?

squaremo commented 11 years ago

At the moment, when a socket closes it's basically permanent. All kinds of methods are replaced in favor of functions which only throw and there's no reconnect support.

When the connection drops you are pretty much hosed, and usually the best thing to do is start from scratch -- assert all your exchanges and queues again and go from there. There are some things the client can do for you (like Michael's Ruby client); but I am rather wary of second-guessing the application code. You might want such-and-such a queue to disappear with the connection.

Regarding messages, yes you can use confirmations (now with callbacks!), which can change your guarantee from "at most once" (a message might not make it, and you won't know) to "at least once" (you might not get a confirmation, and so resend a message).

(Aside: Do you guys get github emails from the future or something? You always have about three posts back and forth before I manage to comment ..)

michaelklishin commented 11 years ago

RAM-based write-ahead log is not an option for people who cannot afford to lose a single message (such as a couple of payment processors that are known to use Bunny). Others often simply don't need a WAL.

In any case, I'm not at all convinced it should be part of client libraries. amqplib should try to provide about as much as Ruby clients do. Those provide over 2 years of automatic connection recovery experience to learn from.

squaremo commented 11 years ago

Would it not make sense to roll that into the parent library? Maybe not disk-based, but a write-ahead log?

It'd make an interesting example, at least -- put a buffer in front of writes, and recover from that in the case of connection drops.

The usual example for guaranteed delivery is moving rows from a database (or files from a filesystem), so the "reliable storage" bit is implicit in the scenario.

skeggse commented 11 years ago

When the connection drops you are pretty much hosed, and usually the best thing to do is start from scratch -- assert all your exchanges and queues again and go from there.

In every case I can think of, the starting from scratch can be taken care of by the library, and if you want a transient queue or exchange you define it as such with autoDelete and let the library or RabbitMQ take care of arbitrary naming. Then, on reconnect, the library or RabbitMQ just create a new queue/exchange in the same manner. If you bound it to something specific, it would ensure the bind to the specific entity, and if it were to another transient entity it would recreate that bind.

Those steps, however, would quickly become convoluted and difficult to get right in an abstract enough manner to suit all users.

RAM-based write-ahead log is not an option for people who cannot afford to lose a single message (such as a couple of payment processors that are known to use Bunny). Others often simply don't need a WAL.

That's a good point. For us, we need nearly as much reliability, so RAM-based might not make sense (but what happens if the hard drive fails and causes the system to go down? Then you're really hosed.

That said, I feel like it might be beneficial for those who don't need quite as much reliability.

It'd make an interesting example, at least -- put a buffer in front of writes, and recover from that in the case of connection drops.

That's more or less the extended version of what I was attempting to communicate in our other discussion. Instead of just keeping a buffer of unsent commands, keep a buffer of unacknowledged commands. This would allow one to publish transparently through the library and assume that it will reach its destination--provided the producer doesn't crash.

skeggse commented 11 years ago

I don't know the right answer. Auto-reconnection is a feature many database libraries support, though this isn't a database library. Regardless, in my ideal world the end-developer wouldn't need to worry about reliability as much. Reconnections are common enough that it seems a useful feature.

(Aside: Do you guys get github emails from the future or something? You always have about three posts back and forth before I manage to comment ..)

Yup, on every single one. Make sure you're watching the thread--it's a button at the bottom of issue pages.

michaelklishin commented 11 years ago

Nobody is saying that reconnection should not be supported, just that it is not as trivial as recovering TCP connection.

skeggse commented 11 years ago

Oh, absolutely. Who would you rather be in charge of recovery, the library or every developer using the library? Better to get it right once.

squaremo commented 11 years ago

Who would you rather be in charge of recovery, the library or every developer using the library?

If there was a definitive way to recover, the library; without that, every developer.

skeggse commented 11 years ago

Exactly.

cmoesel commented 11 years ago

I'll provide my point of view as a developer who is considering switching from node-amqp to amqp.node: The lack of a reconnection feature is a barrier to me making the switch. If it was just a matter of changing the library, it would be an easy decision-- but currently it also means I need to design and implement all the reconnection logic too-- and that's enough to dissuade me from switching right now. I suspect there are other node-amqp users like me.

Ideally, node.amqp would support a common-sense reconnection strategy that works for 80% of its users, and provide a way to turn it off for the other 20% who want to implement it in their own applications. That's what I'd love to see... Of course, it's easy to say that when I'm not the one who has to code it up. ;-)

squaremo commented 11 years ago

@cmoesel So the reconnection in node-amqp works for you?

cmoesel commented 11 years ago

@squaremo It works well enough. I think there are leaks in there, but so far it has not negatively impacted us.

skeggse commented 11 years ago

Ideally, node.amqp would support a common-sense reconnection strategy that works for 80% of its users, and provide a way to turn it off for the other 20% who want to implement it in their own applications.

This. You could take it a step further and provide a reconnect hook for say an 18% who have a specific reconnect strategy that fits into the reconnect envelope. Basically, on reconnect at a socket level, by default try to reconstruct the old state. If the user would prefer, give them a means to easily tap into the same information used in the default reconnection mechanism and set up the new connection. For the last, say, 2%, who need something very specialized, just let them disable it. Not strictly necessary, but providing a common structure for reconnection could help with maintaining the reconnection aspect of the project.

That was a little convoluted...let me know if I can clarify something.

squaremo commented 11 years ago

Basically, on reconnect at a socket level, by default try to reconstruct the old state.

This is a bad idea. It assumes that the desired state is always the same. What if, for example, queues are constructed according to received data?

I would use something like a fixture; that is, a procedure that creates a known base state at each connection. You could do, for example,

function fixture(conn) {
  return conn.createChannel().then(function (ch) {
    conn.on('error', reconnectAtSomePoint);
    ch.assertQueue(TASKS);
    ch.assertExchange(PUBSUB);
    ch.close();
    return conn;
  });
}

function reconnect() {
  amqp.connect(URL).then(fixture).then(main);
}

function main(conn) {
   // set up consumers etc.
}

There's nothing extra needed in amqplib in order to accomplish that yourself. Helpers for e.g., reconnection strategies (reconnectAtSomePoint) would make a nice addition at some point, I agree.

michaelklishin commented 11 years ago

Bunny tries to reconstruct the state it knows about with a couple of exceptions:

This works very well in practice but also relies on the fact that exchanges in queues in Bunny are objects, so we can track what the client itself has declared.

skeggse commented 11 years ago

This is a bad idea. It assumes that the desired state is always the same. What if, for example, queues are constructed according to received data?

In our case, they are constructed according to received data. And once I've created them, I expect them to stay the same. It looks like this is related to the strategy node-amqp uses, except that it doesn't care about what's on the channels. All it does for reconnection is to reconnection the underlying transport socket, then reconnect each channel. Channel "reconnection" is basically just connection: it uses the same strategy as it starts with.

Even this simple approach seems like it might work in many cases. You'd lose autoDelete entities, but you'd keep persistent ones.

I would use something like a fixture; that is, a procedure that creates a known base state at each connection.

That was sorta my idea for the 18% case, except that the fixture would know what the previous configuration was, if any. Not strictly required, and not necessarily useful, but could hint at some things.

Server-named queues always get a new name from the server

That's what node-amqp lacks, and it's a pretty big loss. On reconnect, your code might not continue to operate.

Channel ids are not guaranteed to be the same (nobody should rely on them anyway)

Agreed, but if you just keep your Channel objects, why not use the channel ids? They already exist, just keep them! If the last state was valid--no overlapping channel ids, the new state will be too, right?

michaelklishin commented 11 years ago

Agreed, but if you just keep your Channel objects, why not use the channel ids? They already exist, just keep them! If the last state was valid--no overlapping channel ids, the new state will be too, right?

Yes and that's how it works most of the time, but if you open and close channels all the time (not necessary but some do that to get very fine-grained error handling), at the time of recovery there may be concurrent channel id allocations. For similar cases, there are ways to "recover" a channel manually: keep the object but allocate a new number and open it. So they are not guaranteed to stay the same between connection recoveries, even though almost always they do.

gjohnson commented 10 years ago

Since your talking reconnections... Thoughts on supporting multiple hosts?

benmoss commented 10 years ago

I implemented a really simple reconnection strategy in my app: https://gist.github.com/benmoss/e93125d1fb3561be9276 (excuse the CoffeeScript :smile:)

For my use case just dropping messages is ok until reconnection is possible, though it's not hard to imagine how you could implement a buffer.

squaremo commented 10 years ago

I implemented a really simple reconnection strategy in my app: https://gist.github.com/benmoss/e93125d1fb3561be9276 (excuse the CoffeeScript :smile:)

It reads quite nicely in CoffeeScript, I reckon.

For my use case just dropping messages is ok until reconnection is possible, though it's not hard to imagine how you could implement a buffer.

Exactly, if dropped messages did matter, you could extend it to use confirmations and a replay buffer. Although, more moving parts = more failure modes.

squaremo commented 10 years ago

Since your talking reconnections... Thoughts on supporting multiple hosts?

That would certainly go hand-in-hand with reconnection. A typical scheme is to supply a collection of connection points (URLs, it would be here) for a cluster and try each in turn.

By the way, this is the kind of thing I worry about with automatic reconnection: http://status.cloudamqp.com/incidents/56bhzt813hg9 (more info: https://twitter.com/CloudAMQP/status/455806520370798592; underlying problem: https://github.com/postwait/node-amqp/issues/262). This particular issue was pretty straight-forward -- trying to redeclare queues with server-generated names will always fail -- but AMQP is full of corners like this, and that's not taking into account the idiosyncrasies of a given application. Making assumptions about the reconnection properties of queues is dangerous.

That's not to say it can't be figured out. The RabbitMQ team certainly seem to think it's possible, since they (well, @michaelklishin for the most part, as I understand it) added reconnection to the Java client, and that is based, I would think, on how Bunny does it -- which presumably has worked very well in practice.

I'm not comfortable with all the things the RabbitMQ Java client does for its recovery -- rebasing delivery tags, for example -- which come rather close to second-guessing the server behaviour (or, to put it another way, are tightly coupled with the server).

I would prefer to require applications to be deliberate about recovery, help them where it is possible to do so without making additional assumptions, and to surface failures where it is not.

richzw commented 9 years ago

I am not familiar with CoffeeScript, so I try to convert the https://gist.github.com/benmoss/e93125d1fb3561be9276 to JavaScript.

Here it is https://gist.github.com/richzw/57177f3fecbeb921819c

@squaremo and @benmoss, Could you please help me review it? Thanks in advance.

I want to know what does the createFakeChannel_ function mean?

cmoesel commented 9 years ago

@richzw, coffeescript compiles into javascript, so if you need javascript, you don't need to manually convert it-- you can just take the result of compiling the coffeescript. Here is the result of the coffeescript you referenced above: http://pastebin.com/REPR1zP2

benmoss commented 9 years ago

@richzw the createFakeChannel function was there so that if publish is called while the server is disconnected it will be a no-op, rather than throwing an exception. It allows the other parts of the program to work while just dropping messages instead of sending them to RabbitMQ. This may not be acceptable for all programs, but it worked well enough for mine.

richzw commented 9 years ago

@cmoesel and @benmoss thanks for your help.

@benmoss, I want to publish message to amqp if the channel is ok, otherwise, continue to reconnect to amqp until the connection is ok. so

when(amqp.connect( this.addr_ ))
    .with( this )
    .then( this.createChannel_ )
    .then( this.createExchange_ )
    .then( this.handleUnrouteableMessages_ )
    .then( this.publish_, this.handleDisconnections_ )
    .catch( this.reconnect_ )

It seems works well.

Now I want to establish one permanent connection with amqp when my apps starts, then call publish function once there are messages to be sent. Is it doable? or Is that possible to do that with your codes?

benmoss commented 9 years ago

@richzw yeah, just create an instance of the MessageBus in your app startup and use it as a global object. i've seen this done by having a globals module that has all the global objects. wherever you need to publish from the rest of your app:

// globals.js
module.exports = {
  messageBus: new MessageBus()
};
// somewhere_else.js
var messageBus = require("globals").messageBus;

messageBus.publish("order.created", {orderNumber: 555});
richzw commented 9 years ago

hi @benmoss and @squaremo

In this gist https://gist.github.com/richzw/6b4d348c6b8abdc8176e, I want to Sender class to handle connection failure recover through re-connection. Here are the test codes.

var Sender = require('./sender.js');
var sender = new Sender( amqpAddress, 3000);

function sendMessage( key, msg ) {
    var byteBuffer = msg.encode().toBuffer();
    sender.deliverMessage( key, byteBuffer );
}

// send message every 5 seconds
var CronJob = require('cron').CronJob;
var send_message_job = new CronJob({
    cronTime: '*/5 * * * * *',
    onTick: function(){
        sendMessage( key, message );
    },
    start: true,
});

send_message_job.start();

Then I try to stop and start amqp service randomly. And the failure recover can be well handled sometime, but it can failed with errors

Error: read ECONNRESET at errnoException (net.js:905:11) at TCP.onread (net.js:559:19)

Can anyone help me figure out how to handle it?

benmoss commented 9 years ago

@richzw sorry I can't help you

jwalton commented 9 years ago

Ok, I've taken a stab at fixing this whole reconnection problem. Check out https://github.com/benbria/node-amqp-connection-manager and see if it solves your use case. If it doesn't, or if you have suggestions, I'm certainly interested in hearing about it. :)

firemyst13 commented 9 years ago

Is there something horribly wrong with what I've done here, in regards to re-establishing a connection? It seems to work to reconnect to the server (I've been starting/stopping it for testing...)

function connectRMQ() {
  amqp.connect(config.rabbitmq.URI).then(function(conn) {
    conn.on('close', function() {
      console.error('Lost connection to RMQ.  Reconnecting in 60 seconds...');
      return setTimeout(connectRMQ, 60 * 1000);
    });
    return conn.createChannel().then(function(ch) {
        var ok = ch.assertQueue(config.rabbitmq.queue, {durable: true});
        ok = ok.then(function() {
            ch.prefetch(1);
            ch.consume(config.rabbitmq.queue, doWork, {noAck: false});
        });
        return ok.then(function() {
            console.log(" [*] Waiting in %s.", config.rabbitmq.queue);
        });

        function doWork(msg) {
            var body = msg.content.toString();
            console.log(" [x] Received %s", body);
            setTimeout(function() {
                ch.ack(msg);
            }, config.rabbitmq.timeout);
        }
    });
  }).then(null, function() {
     setTimeout(connectRMQ, 10 * 1000);
     return console.log('connection failed');
  });
}

connectRMQ();
luoyjx commented 8 years ago

my implemention.

function MQ() {
    this.connection = null;
    this.init();
}

/**
 * init
 * @return {[type]} [description]
 */
MQ.prototype.init = function init() {
    var self = this;

    Connection
        .createConnection(config.rabbitMQ_url)
        .then(function (conn) {

            self.connection = conn;

            conn.on('error', function(err) {
                log.error('[mq] connection error ', err);
                self.reconnect();
            });

            log.info('[mq] create connection success');

            return Channel
                .createChannel(conn);
        })
        .then(function (ch) {

            process.once('SIGINT', function() { 
                log.info('kill by signal SIGINT');
                ch.close();
                self.connection.close(); 
                self.connection = null;
                process.exit(0);
            });

            ch.on('error', function(error) {
                // ch.close();
                log.error('[mq] channel error: ', error);
            });

            log.info('[mq] create channel success');

            return Exchange
                .assertExchange(ch, config.exchange_name, ExchangeTypes.DIRECT, {durable: false})
                .then(function () {
                    log.info('[mq] assert exchange [%s] [%s]', config.exchange_name, ExchangeTypes.DIRECT);

                    return Queue
                        .assertQueue(ch, QUEUE_NAME, {exclusive: false, durable: false}); 
                })
                .then(function (queue) {
                    log.info('[mq] assert queue [%s] success', QUEUE_NAME);

                    log.debug(queue);

                    // bind to exchange
                    return Queue.
                        bindQueue(ch, QUEUE_NAME, config.exchange_name, ROUTE_KEY);
                })
                .then(function() {
                    log.info('[mq] bind queue [%s] to exchange [%s]', QUEUE_NAME, config.exchange_name);

                    return Consume
                        .consume(self.connection, ch, QUEUE_NAME);
                })
        })
        .catch(function (err) {
            log.error('[mq] Init failed , error: ', err);
            self.reconnect();
        });
};

/**
 * reconnect
 * @return {[type]} [description]
 */
MQ.prototype.reconnect = function() {
    var self = this;

    log.info('[mq] try reconnect 3 seconds later');

    setTimeout(function () {
        self.init();
        self.reconnectCount++;
    }, 3000);   
}
magicdawn commented 7 years ago

Ha, just see ioredis Redis client do connection recover stuff well https://github.com/luin/ioredis#auto-reconnect

code https://github.com/luin/ioredis/blob/v2.5.0/lib/redis/event_handler.js#L62-L103

dskrvk commented 2 years ago

In my experience we have to listen to both the error and the close events. I tried listening to only one event type, but each one only covers some of the situations:

  1. When rabbitmq is stopped, the client emits an error event.
  2. When rabbitmq forcefully terminates the connection, a close event is emitted.

I went ahead and subscribed to both, however, now in situation 1 both events are emitted, and if we're not careful we'll end up reconnecting twice, which eventually leads to PRECONDITION_FAILED - unknown delivery tag errors (see https://github.com/squaremo/amqp.node/issues/271#issuecomment-243795316). The below implementation takes this into account.

function connect(onMessage, attempt = 0) {
    let reconnecting = false;

    function _onConnectionError(err) {
        if (reconnecting) {
            return Promise.resolve();
        }
        reconnecting = true;
        const nextDelay = nextExponentialDelay(attempt, MAX_DELAY, DELAY_STEP, DELAY_RATE);
        logger.error(`AMQP channel error, retrying in ${nextDelay} s`, err);
        return new Promise(resolve => setTimeout(() => connect(onMessage, attempt + 1).then(resolve), nextDelay * 1000));
    }

    return mq.connect(amqpConn).then(connection => {
        return connection.createChannel().then(channel => {
            // recreate the exchange/queue topology as needed, then:
            attempt = 0;
            reconnecting = false;
            connection.once("close", _onConnectionError);
            connection.once("error", _onConnectionError);
            logger.info("Waiting for messages");
            return channel;
        });
    }).catch(_onConnectionError);
}
bmxpiku commented 1 year ago

`> In my experience we have to listen to both theerrorand theclose` events. I tried listening to only one event type, but each one only covers some of the situations:

  1. When rabbitmq is stopped, the client emits an error event.
  2. When rabbitmq forcefully terminates the connection, a close event is emitted.

I went ahead and subscribed to both, however, now in situation 1 both events are emitted, and if we're not careful we'll end up reconnecting twice, which eventually leads to PRECONDITION_FAILED - unknown delivery tag errors (see #271 (comment)). The below implementation takes this into account.

function connect(onMessage, attempt = 0) {
    let reconnecting = false;

    function _onConnectionError(err) {
        if (reconnecting) {
            return Promise.resolve();
        }
        reconnecting = true;
        const nextDelay = nextExponentialDelay(attempt, MAX_DELAY, DELAY_STEP, DELAY_RATE);
        logger.error(`AMQP channel error, retrying in ${nextDelay} s`, err);
        return new Promise(resolve => setTimeout(() => connect(onMessage, attempt + 1).then(resolve), nextDelay * 1000));
    }

    return mq.connect(amqpConn).then(connection => {
        return connection.createChannel().then(channel => {
            // recreate the exchange/queue topology as needed, then:
            attempt = 0;
            reconnecting = false;
            connection.once("close", _onConnectionError);
            connection.once("error", _onConnectionError);
            logger.info("Waiting for messages");
            return channel;
        });
    }).catch(_onConnectionError);
}

it works nice, one thing to add would be some condition to break, in case you wont like infinite restart loop

export const connect = async (attempt = 1): Promise<void> => {
  let reconnecting = false;

  if (attempt === 10) {
    throw new RabbitMQError('Maximum reconnect attempts exceeded.');
  }

  async function onConnectionError(err): Promise<void> {
    connection = null;
    if (reconnecting) {
      return Promise.resolve();
    }
    reconnecting = true;
    logger.error(`AMQP channel error, retrying in ${attempt} s`, err);
    await setTimeout(attempt * 1000);
    return connect(attempt + 1);
  }

  try {
    connection = await amqp.connect(RABBITMQ_URL);
    if (connection) {
      logger.info('AMPQ client connected');
      connection.once('error', onConnectionError);
      connection.once('close', onConnectionError);
      reconnecting = false;
    }
  } catch (e) {
    await onConnectionError(e);
  }
};
devthejo commented 7 months ago

Here is my workaround:

class ReconnectableProxy {
  constructor(target) {
    this.target = target
    this.callLog = []
    this.proxy = new Proxy({}, this.createHandler())
  }

  createHandler() {
    return {
      get: (obj, prop) => {
        if (typeof this.target[prop] === "function") {
          return (...args) => {
            this.callLog.push({ method: prop, args })
            return this.target[prop](...args)
          }
        }
        return this.target[prop]
      },
    }
  }

  setTarget(newTarget) {
    this.target = newTarget
  }

  async reconnect(newTarget) {
    this.target = newTarget
    await this.replayCalls()
  }

  async replayCalls() {
    for (const { method, args } of this.callLog) {
      await this.target[method](...args)
    }
  }

  getProxy() {
    return this.proxy
  }
}

const proxyManager = new ReconnectableProxy()
let reconnecting = false
const createConnection = async () => {
  const onConnectionError = async () => {
    if (reconnecting) {
      return
    }
    reconnecting = true
    await yaRetry(
      async (_bail) => {
        logger.debug("rabbitmq disconnected, trying to reconnect")
        const conn = await createConnection()
        await proxyManager.reconnect(conn)
        logger.debug("reconnected")
      },
      {
        retries: 10,
        minTimeout: 1000,
        maxTimeout: 30000,
        ...(autoReconnect.retryOptions || {}),
      }
    )
    reconnecting = false
  }
  const conn = await amqplib.connect(amqpURL)
  conn.on("close", onConnectionError)
  // conn.on("error", onConnectionError)
  return conn
}

const conn = await createConnection()
proxyManager.setTarget(conn)
const connProxy = proxyManager.getProxy()

// then use connProxy in the code instead of conn

imlpemented here: https://codeberg.org/devthefuture/modjo/src/branch/master/plugins/amqp/index.js it will work great if you have fixed channels, and you want to re-sub on reconnect, but doesn't work well in case you do many dynamic sub and unsub it suitable for a "classic" queue worker

cressie176 commented 7 months ago

@devthejo

Something to watch out for...

  conn.once("error", onConnectionError)

You can get two error events. Because once will remove the error handler after the first, the second will be unhandled and can crash your application. See https://github.com/onebeyond/rascal/issues/122 for a little more context.

devthejo commented 7 months ago

thanks @cressie176 for pointing this, I'm just realizing it now and I'm trying to find a better implementation, I will edit my post when it will be done

edit: finally I edit with only the change to on, and removing on error, my final implementation will be a lot more complicated, but I want to handle channel auto-reconnect independently of connection (is this a good idea ?)