jakobloekke / jakobloekke-rabbitmq

AMQP-client for Meteor.js
6 stars 1 forks source link

Delayed parameter #4

Open molinto opened 8 years ago

molinto commented 8 years ago

Hi, I have a NormalExchange & a DelayedExchange. I'm using delayed parameter to send surveys after x amount of milliseconds on the DelayedExchange. Where do I put the x value on the publish please?

On lib/startup.js:

// RabbitMQ Config
var options = {
        host: 'rmq.cloudamqp.com',
        login: 'Dashboard',
        password: 'pwd',
        vhost: 'hmmm'
};

RabbitMQ.ensureConnection(options);

RabbitMQ.on('ready', function () {
        RabbitMQ.connection.exchange('amq.topic', {type: 'topic'}, function (exchange) {
                RabbitMQ.exchanges.NormalExchange = exchange;
                RabbitMQ.emit('NormalExchange');
        });

        RabbitMQ.connection.exchange('delayed-exchange', {type: 'x-delayed-message', durable: true, autoDelete: false, passive: true, arguments: { 'x-delayed-type':  "direct" }}, function (exchange) {
                RabbitMQ.exchanges.DelayedExchange = exchange;
                RabbitMQ.emit('DelayedExchange');
        });
});

        RabbitMQ.on('error', function (err) {
            console.log(err);
        });

surveys.js (methods):

try {
        RabbitMQ.exchanges.DelayedExchange.publish('survey.send', new Buffer(post_data), { persistent: true }, function (err, ok) {
                if (err) {
                        console.error("[AMQP] publish", err);
                        return err;
                } else {
                        return ok;
                }
        });
} catch (e) {
        console.error("[AMQP] publish", e.message);
        return false;
}

This is what node uses:

exports.delayPublish("delayed-exchange", "survey.send", new Buffer("sending survey after 10 secs"), 10000);

---
// Used for publishing delayed messages
exports.delayPublish = function(exchange, routingKey, content, delay) {
    try {
        pubDelayedChannel.publish(exchange, routingKey, content, { headers: { "x-delay": delay } }, function(err, ok) {
            if (err) {
                console.error("[AMQP delayed] publish", err);
                offlinePubDelayQueue.push([exchange, routingKey, content, delay]);
                pubDelayedChannel.connection.close();
            }
        });
    } catch (e) {
        console.error("[AMQP delayed] failed", e.message);
        offlinePubDelayQueue.push([exchange, routingKey, content, delay]);
    }
};

Any help greatly appreciated.

Sharry

molinto commented 8 years ago

As your package is a wrapper based on node-amqp, thought this would help: https://github.com/postwait/node-amqp#exchangepublishroutingkey-message-options-callback

Added header object to the options param of publish:

RabbitMQ.exchanges.DelayedExchange.publish('survey.send', new Buffer(post_data), { persistent: true, headers: { "x-delay": delay } }, function (err, ok) {
          if (err) {
                    console.error("[AMQP] publish", err);
                    return err;
          } else {
                    return ok;
          }
});

But still no msg, hmmm.