499689317 / notes

note
2 stars 0 forks source link

rabbitmq #30

Open 499689317 opened 4 years ago

499689317 commented 4 years ago

docker安装启动mq


499689317 commented 4 years ago

producer.js


let amqp = require('amqplib/callback_api');
var co = require("co");
// 连接URL带上虚拟机名称
var RabbitMQ = {
    host: "amqp://admin:admin@127.0.0.1:5672/my_vhost",
    conn: null,
    chan: null
};
RabbitMQ.connect = function() {
    var self = this;
    return function(next) {
        console.log(self.host);
        amqp.connect(self.host, function(err, conn) {
            if(err) {
                return console.log(err);
            }
            next(null, conn);
        });
    };
};
RabbitMQ.createChannel = function() {
    if(this.chan) {
        return;
    }
    this.chan = this.conn.createChannel();
};
RabbitMQ.createExchange = function(ex, ext) {
    if(!this.chan) {
        return;
    }
    this.chan.assertExchange(ex, ext, {durable: false});
};
RabbitMQ.createQueue = function(bindKey) {
    if(!this.chan) {
        return;
    }
    this.chan.assertQueue(bindKey, {durable: false});
};
RabbitMQ.send = function(queue, msg) {
    if(!this.chan) {
        return;
    }
    this.chan.sendToQueue(queue, Buffer.from(msg), {persistent: false, mandatory: true});
};
(function() {
    var ex = "testEx";
    var ext = "option";
    var bindKey = "testQueue";
    var routeKey = "testQueue";
    var msg = JSON.stringify({id: "xxx"});

    co(function*() {

        RabbitMQ.conn = yield RabbitMQ.connect();
        RabbitMQ.createChannel();
        // RabbitMQ.createExchange(ex, ext);
        RabbitMQ.createQueue(bindKey);

        setInterval(function() {
            console.log(">>>>>> msg");
            RabbitMQ.send(routeKey, msg);
        }, 5000);

    });
})();
499689317 commented 4 years ago

consumer.js


let amqp = require('amqplib');

class RabbitMQ {
    constructor() {
        this.open = amqp.connect(this.hosts[this.index]);
    }
    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName)
                    .then(function (ok) {
                        return channel.consume(queueName, function (msg) {
                            if (msg !== null) {
                                let data = msg.content.toString();
                                channel.ack(msg);
                                receiveCallBack && receiveCallBack(data);
                            }
                        })
                            .finally(function () {
                                setTimeout(() => {
                                    if (channel) {
                                        channel.close();
                                    }
                                }, 500)
                            });
                    })
            })
            .catch(function () {
                let num = self.index++;
                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index = 0;
                    self.open = amqp.connect(self.hosts[0]);
                }
            });
    }