Open 499689317 opened 4 years ago
const AMQPClient = require("amqp10").Client;
const Policy = require("amqp10").Policy;
const DEFAULT_SESSION = "default_session";
const DEFAULT_RECEIVER = "default_receiver";
const DEFAULT_SENDER = "default_sender";
class ActiveMQ {
constructor(url) {
this._url = url;
this._client = new AMQPClient(Policy.EventHub);
this._senders = {};
this._sessions = {};
this._receivers = {};
}
async connect(opt) {
if (!this._client) {
throw new Error("connect activemq client is null");
}
const conn = await this._client.connect(this._url, opt);
if (conn instanceof Error) {
throw conn;
}
console.log(`${this._url} connected`);
return conn;
}
async createSession(sessionName, opt) {
if (!this._client) {
throw new Error("session activemq client is null");
}
sessionName = sessionName ? sessionName : DEFAULT_SESSION;
if (this._sessions[sessionName]) {
throw new Error(`session ${sessionName} is existed`);
}
const session = await this._client.createSession(opt);
if (session instanceof Error) {
throw session;
}
this._sessions[sessionName] = session;
return session;
}
async getSession(sessionName) {
sessionName = sessionName ? sessionName : DEFAULT_SESSION;
const session = this._sessions[sessionName];
if (session) {
return session;
}
return await this.createSession(sessionName);
}
async createSender(senderName, sessionName) {
if (!this._client) {
throw new Error("sender activemq client is null");
}
if (arguments.length >= 2) {
sessionName = arguments[1];
senderName = arguments[0];
} else if (arguments.length === 1) {
sessionName = DEFAULT_SESSION;
senderName = arguments[0];
} else {
sessionName = DEFAULT_SESSION;
senderName = DEFAULT_SENDER;
}
senderName = `${sessionName}.${senderName}`;
const session = this.getSession(sessionName);
let sender = this._senders[senderName];
if (!session) {
throw new Error(`${senderName} session ${sessionName} is null`);
}
if (sender) {
throw new Error(
`sender ${senderName} is existed in session ${sessionName}`
);
}
sender = await this._client.createSender(senderName, {}, session);
if (sender instanceof Error) {
throw sender;
}
this._senders[senderName] = sender;
return sender;
}
async createReceiver(receiverName, sessionName) {
if (!this._client) {
throw new Error("receiver activemq client is null");
}
if (arguments.length >= 2) {
sessionName = arguments[1];
receiverName = arguments[0];
} else if (arguments.length === 1) {
sessionName = DEFAULT_SESSION;
receiverName = arguments[0];
} else {
sessionName = DEFAULT_SESSION;
receiverName = DEFAULT_RECEIVER;
}
receiverName = `${sessionName}.${receiverName}`;
const session = this.getSession(sessionName);
let receiver = this._receivers[receiverName];
if (!session) {
throw new Error(`${receiverName} session ${sessionName} is null`);
}
if (receiver) {
throw new Error(
`receiver ${receiverName} is existed in session ${sessionName}`
);
}
receiver = await this._client.createReceiver(
receiverName,
{
attach: { rcvSettleMode: 0 },
},
session
);
if (receiver instanceof Error) {
throw receiver;
}
this._receivers[receiverName] = receiver;
return receiver;
}
async getSender(senderName, sessionName) {
senderName = senderName ? senderName : DEFAULT_SENDER;
sessionName = sessionName ? sessionName : DEFAULT_SESSION;
const sender = this._senders[`${sessionName}.${senderName}`];
if (sender) {
return sender;
}
return await this.createSender(senderName, sessionName);
}
async getReceiver(receiverName, sessionName) {
receiverName = receiverName ? receiverName : DEFAULT_RECEIVER;
sessionName = sessionName ? sessionName : DEFAULT_SESSION;
const receiver = this._receivers[`${sessionName}.${receiverName}`];
if (receiver) {
return receiver;
}
return await this.createReceiver(receiverName, sessionName);
}
async send(senderName, message, sessionName) {
const sender = await this.getSender(senderName, sessionName);
if (sender) {
sender.send(message);
return;
}
throw new Error(
`the sender ${senderName} in session ${sessionName} send message error`
);
}
async subscribe(receiverName, cb, opt, sessionName) {
const receiver = await this.getReceiver(receiverName, sessionName);
receiver.on("errorReceived", function (err) {
console.error(err);
});
receiver.on("message", function (message) {
receiver.accept(message);
cb && cb(message);
});
}
destroy() {
this._url = "";
this._client = null;
this._sessions = null;
this._senders = null;
this._receivers = null;
}
}
module.exports = ActiveMQ;
测试
const ActiveMQ = require("./activemq");
(async function () {
const mq = new ActiveMQ("amqp://admin:admin@localhost:5672");
await mq.connect();
const tickQueue = "queue-schedule-coin-expand";
mq.subscribe(tickQueue, function (message) {
console.log("resive message", message);
});
setInterval(function () {
const msg = {
data: Date.now(),
};
mq.send(tickQueue, JSON.stringify(msg));
}, 1000);
})();
注
const rhea = require("rhea");
const DEFAULT_SESSION = "default_session";
const DEFAULT_QUEUE = "default_queue";
const DEFAULT_TOPIC = "default_topic";
class ActiveMQ {
constructor(url) {
this._url = url;
this._conn = null;
this._sessions = {};
this._senders = {};
this._receivers = {};
}
connect() {
if (this._conn) {
throw new Error("connected already");
}
this._conn = rhea.connect({
transport: "tcp",
host: "localhost",
username: "admin",
password: "admin",
port: "5672",
reconnect: true,
});
this._conn.on("error", function (context) {
throw new Error("connec error");
});
this._conn.socket.setNoDelay(true);
return this._conn;
}
formatName(name, isTopic) {
return isTopic ? `topic://${name}` : `queue://${name}`;
}
createSession(sessionName) {
if (!this._conn) {
throw new Error("connect failed to create session");
}
const session = this._conn.create_session();
const self = this;
session.observers.on("session_close", function (err) {
console.errors(err);
session = null;
delete self._sessions[sessionName];
});
session.begin();
sessionName = sessionName ? sessionName : DEFAULT_SESSION;
this._sessions[sessionName] = session;
return session;
}
getSession(sessionName) {
sessionName = sessionName ? sessionName : DEFAULT_SESSION;
const session = this._sessions[sessionName];
if (session) {
return session;
}
return this.createSession(sessionName);
}
createSender(senderName, sessionName) {
if (arguments.length >= 2) {
senderName = arguments[0];
sessionName = arguments[1];
} else if (arguments.length === 1) {
senderName = arguments[0];
sessionName = DEFAULT_SESSION;
} else {
senderName = DEFAULT_QUEUE;
sessionName = DEFAULT_SESSION;
}
senderName = this.formatName(`${sessionName}.${senderName}`, arguments[2]);
const session = this.getSession(sessionName);
let sender = this._senders[senderName];
if (!session) {
throw new Error(`${senderName} session ${sessionName} failed`);
}
if (sender) {
throw new Error(
`sender ${senderName} is existed in session ${sessionName}`
);
}
sender = session.attach_sender({ target: senderName });
this._senders[senderName] = sender;
return sender;
}
getSender(senderName, sessionName, isTopic) {
senderName = senderName ? senderName : DEFAULT_QUEUE;
sessionName = sessionName ? sessionName : DEFAULT_SESSION;
const sender = this._senders[
this.formatName(`${sessionName}.${senderName}`, isTopic)
];
if (sender) {
return sender;
}
return this.createSender(senderName, sessionName, isTopic);
}
createReceiver(receiverName, sessionName) {
if (arguments.length >= 2) {
receiverName = arguments[0];
sessionName = arguments[1];
} else if (arguments.length === 1) {
receiverName = arguments[0];
sessionName = DEFAULT_SESSION;
} else {
receiverName = DEFAULT_QUEUE;
sessionName = DEFAULT_SESSION;
}
receiverName = this.formatName(
`${sessionName}.${receiverName}`,
arguments[2]
);
const session = this.getSession(sessionName);
let receiver = this._receivers[receiverName];
if (!session) {
throw new Error(`${receiverName} session ${sessionName} is null`);
}
if (receiver) {
throw new Error(
`receiver ${receiverName} is existed in session ${sessionName}`
);
}
receiver = session.attach_receiver({
source: receiverName,
autoaccept: false,
});
this._receivers[receiverName] = receiver;
return receiver;
}
getReceiver(receiverName, sessionName, isTopic) {
receiverName = receiverName ? receiverName : DEFAULT_QUEUE;
sessionName = sessionName ? sessionName : DEFAULT_SESSION;
const receiver = this._receivers[
this.formatName(`${sessionName}.${receiverName}`, isTopic)
];
if (receiver) {
return receiver;
}
return this.createReceiver(receiverName, sessionName, isTopic);
}
send(senderName, message, sessionName) {
const sender = this.getSender(senderName, sessionName, arguments[3]);
if (!sender) {
throw new Error(
`the sender ${senderName} in session ${sessionName} send message error`
);
}
try {
sender.send(message);
} catch (e) {
throw e;
}
}
subscribe(receiverName, cb, opt, sessionName) {
const receiver = this.getReceiver(receiverName, sessionName, arguments[4]);
if (!receiver) {
throw new Error(
`the receiver ${receiverName} in session ${sessionName} not init`
);
}
receiver.observers.on("message", function (context) {
try {
context.delivery.accept();
cb && cb(context.message);
} catch (e) {
throw e;
}
});
receiver.observers.on("receiver_error", function (context) {
throw new Error("receiver_error");
});
receiver.observers.on("receiver_close", function (context) {
throw new Error("receiver close");
});
}
destroy() {
this._url = "";
this._conn = null;
this._sessions = null;
this._senders = null;
this._receivers = null;
}
}
module.exports = ActiveMQ;
测试
const ActiveMQ = require("./connect");
(function () {
const mq = new ActiveMQ();
mq.connect();
const queue = "test";
// const session = mq.createSession();
// const receiver = mq.createReceiver(queue);
// receiver.observers.on("message", function (context) {
// console.log(context.message);
// });
// const sender = mq.createSender(queue);
// sender.send({ body: Date.now() });
mq.subscribe(
queue,
function (message) {
console.log("topic: ", message);
},
{},
"",
true
);
mq.subscribe(queue, function (message) {
console.log("queue: ", message);
});
setInterval(function () {
const msg = {
body: Date.now(),
};
mq.send(queue, msg);
mq.send(queue, msg, "", true);
}, 1000);
})();
本地搭建activemq环境
docker search activemq
,没有官方指定的镜像下载,只有星级最高的webcenter/activemqdocker pull webcenter/activemq:5.14.3
拉取activemq镜像docker run --name activemq -p 61616:61616 -p 5672:5672 -p 8161:8161 webcenter/activemq:5.14.3
启动容器(61616为tcp端口,amqp端口为5672)http://127.0.0.1:8161
进入activemq管理后台