Open alian926 opened 3 years ago
type TMessage = string | symbol; type TPublish = (message: TMessage, data?: Object) => void; type TSubscribe = (message: TMessage, func: TPublish) => string | boolean; type TSubscribeAll = (func: TPublish) => string | boolean; // 单例事务空间 interface IStorages { [topic: string]: { [token: string]: TPublish; }; } interface IPubSub { publish: TPublish; publishSync: TPublish; subscribe: TSubscribe; subscribeAll: TSubscribeAll; subscribeOnce: (message: TMessage, func: TPublish) => IPubSub; clearAllSubscriptions: () => void; countSubscriptions: (topic: TMessage) => number; unsubscribe: (value: Function | string) => boolean | string; getSubscriptions: (topic: TMessage) => string[]; clearSubscriptions: (topic: TMessage) => void; immediateExceptions: boolean; } const PubSub: IPubSub = { immediateExceptions: false, } as IPubSub; let storages: IStorages = {}; // 内部通过id管理订阅函数 let lastUid = -1; // 全部订阅标识 const ALL_SUBSCRIBING_MSG = '*'; const hasOwnProperty = Object.prototype.hasOwnProperty; /** * @description 判断对象是否拥有有自身属性 * @param {*} obj * @returns {Boolean} */ function hasKeys(obj: Object) { // Object.keys返回可枚举的属性, Object.getOwnPropertyNames返回全部自身字符串属性 // 获取Symbol类型的属性是 Object.getOwnPropertySymbols return Object.keys(obj).length > 0; } function formatTopic(topic: TMessage) { return typeof topic === 'symbol' ? topic.toString() : topic; } /** * Returns a function that throws the passed exception, for use as argument for setTimeout * @description 返回一个抛出所传递异常的函数,用作setTimeout的参数 * @alias throwException * @function * @param { Object } ex An Error object */ function throwException(ex: Error) { return function reThrowException() { throw ex; }; } /** * @description: 调用订阅回调,伴随异步异常处理 * @param {*} subscriber * @param {*} message * @param {*} data */ function callSubscriberWithDelayedExceptions( subscriber: TPublish, message: TMessage, data: Object ) { try { subscriber(message, data); } catch (ex) { setTimeout(throwException(ex), 0); } } /** * @description: 直接调用订阅回调 * @param {*} subscriber * @param {*} message * @param {*} data */ function callSubscriberWithImmediateExceptions( subscriber: TPublish, message: TMessage, data: Object ) { subscriber(message, data); } /** * @description: 传递消息 * @param {*} */ function deliverMessage( originalMessage: string, matchedMessage: string, data: Object, immediateExceptions: boolean ) { if (!, matchedMessage)) { return; } // 获取订阅回调集合 const subscribers = storages[matchedMessage]; // 执行回调方式 const callSubscriber = immediateExceptions ? callSubscriberWithImmediateExceptions : callSubscriberWithDelayedExceptions; // 执行回调 Object.keys(subscribers).forEach((s) => { const subscribe = subscribers[s]; callSubscriber(subscribe, originalMessage, data); }); } /** * @description: 创建传递函数 * @param {*} message * @param {*} data * @param {*} immediateExceptions * @return {Function} */ function createDeliveryFunction( message: string, data: Object, immediateExceptions: boolean ) { return function deliverNamespaced() { let topic = String(message); // 从后往前查询 topic 支持 a a.b a.b.c 按照层级的订阅发布方式 let position = topic.lastIndexOf('.'); // deliver the message as it is now // 按照现在的方式传递消息 deliverMessage(message, message, data, immediateExceptions); // trim the hierarchy and deliver message to each level // 修剪层次结构并将消息传递到每一层 while (position !== -1) { topic = topic.substr(0, position); position = topic.lastIndexOf('.'); deliverMessage(message, topic, data, immediateExceptions); } // 发送订阅*的 deliverMessage(message, ALL_SUBSCRIBING_MSG, data, immediateExceptions); }; } /** * @description: 查询topic本身是否拥有订阅回调,不进行修剪 * @param {*} message * @return {*} */ function hasDirectSubscribersFor(message: TMessage) { const topic = String(message); const found = Boolean(, topic) && hasKeys(storages[topic]) ); return found; } /** * @description: 查询topic是否有回调,会进行修剪查询, topic为a.b 会通知a.b 和 a * @param {*} message * @return {*} */ function messageHasSubscribers(message: string) { let topic = String(message); let found = hasDirectSubscribersFor(topic) || hasDirectSubscribersFor(ALL_SUBSCRIBING_MSG); let position = topic.lastIndexOf('.'); // 修剪处理 while (!found && position !== -1) { topic = topic.substr(0, position); position = topic.lastIndexOf('.'); found = hasDirectSubscribersFor(topic); } return found; } /** * @description: 发布消息 * @param {*} message * @param {*} data * @param {*} sync * @param {*} immediateExceptions * @return {Boolean} 成功执行回调返回true, 否则false */ function publish( message: TMessage, data: Object, sync: boolean, immediateExceptions: boolean ) { const topic = formatTopic(message); const deliver = createDeliveryFunction(topic, data, immediateExceptions); const hasSubscribers = messageHasSubscribers(topic); if (!hasSubscribers) { return false; } // 区分同步异步执行, 同步执行 栈调用 可能存在问题 if (sync === true) { deliver(); } else { setTimeout(deliver, 0); } return true; } /** * Publishes the message, passing the data to it's subscribers * @function * @description 发布消息,将数据传递给它的订阅者 * @alias publish * @param { String } message The message to publish * @param {} data The data to pass to subscribers * @return { Boolean } */ PubSub.publish = function (message, data) { return publish(message, data, false, PubSub.immediateExceptions); }; /** * Publishes the message synchronously, passing the data to it's subscribers * @function * @description 同步发布消息,将数据传递给它的订阅者 * @alias publishSync * @param { String } message The message to publish * @param {} data The data to pass to subscribers * @return { Boolean } */ PubSub.publishSync = function (message, data) { return publish(message, data, true, PubSub.immediateExceptions); }; /** * Subscribes the passed function to the passed message. Every returned token is unique and should be stored if you need to unsubscribe * @function * @description 将传递的函数订阅到传递的消息。每个返回的令牌都是唯一的,如果需要取消订阅,应该存储 * @alias subscribe * @param { String } message The message to subscribe to * @param { Function } func The function to call when a new message is published * @return { String } */ PubSub.subscribe = function (message, func) { if (typeof func !== 'function') { return false; } const topic = formatTopic(message); // message is not registered yet if (!, message)) { storages[topic] = {}; } // forcing token as String, to allow for future expansions without breaking usage // and allow for easy use as key names for the 'storages' object // 强制token为字符串,以允许未来扩展而不破坏使用 并允许轻松用作“storages”对象的键名 const token = 'uid_' + String(++lastUid); storages[topic][token] = func; // return token for unsubscribing // 返回取消订阅的token, 是个字符串 return token; }; /** * @description: 订阅全部消息, 任意通知都会触发 * @param {Function} func * @return {*} */ PubSub.subscribeAll = function (func) { return PubSub.subscribe(ALL_SUBSCRIBING_MSG, func); }; /** * Subscribes the passed function to the passed message once * @function * @description 订阅的函数在执行一次 * @alias subscribeOnce * @param { String } message The message to subscribe to * @param { Function } func The function to call when a new message is published * @return { PubSub } */ PubSub.subscribeOnce = function (message, func) { const token = PubSub.subscribe(message, (...args) => { // before func apply, unsubscribe message // 在执行前, 取消注册 PubSub.unsubscribe(token as string); func.apply(this, args); }); return PubSub; }; /** * Clears all subscriptions * @function * @description 清空订阅 * @public * @alias clearAllSubscriptions */ PubSub.clearAllSubscriptions = function clearAllSubscriptions() { storages = {}; }; /** * Clear subscriptions by the topic * @function * @description 根据topic 取消订阅, 如果topic为car,则car,, car.xxx都会取消 * @public * @alias clearAllSubscriptions * @return { int } */ PubSub.clearSubscriptions = function clearSubscriptions(topic) { Object.keys(storages).forEach((message) => { if (message.indexOf(formatTopic(topic)) === 0) { delete storages[message]; } }); }; /** * Count subscriptions by the topic * @function * @description 查询订阅主题的数量 * @public * @alias countSubscriptions * @return { Array } */ PubSub.countSubscriptions = function countSubscriptions(topic) { let count = 0; for (let m in storages) { if (, m) && m.indexOf(formatTopic(topic)) === 0 ) { // 似乎会有问题 如果topic同时注册了 car,, 查询 car的数量会不全, 查完其中一个就停止了 count = Object.keys(storages[m]).length; break; } } return count; }; /** Gets subscriptions by the topic * @function * @description 根据topic查询订阅topic数组, 会匹配到子集的 * @public * @alias getSubscriptions */ PubSub.getSubscriptions = function getSubscriptions(topic) { const list: string[] = []; Object.keys(storages).forEach((m) => { if (m.indexOf(formatTopic(topic)) === 0) { list.push(m); } }); return list; }; /** * Removes subscriptions * * - When passed a token, removes a specific subscription. * * - When passed a function, removes all subscriptions for that function * * - When passed a topic, removes all subscriptions for that topic (hierarchy) * @function * @public * @alias subscribeOnce * @param { String | Function } value A token, function or topic to unsubscribe from * @example // Unsubscribing with a token * var token = PubSub.subscribe('mytopic', myFunc); * PubSub.unsubscribe(token); * @example // Unsubscribing with a function * PubSub.unsubscribe(myFunc); * @example // Unsubscribing from a topic * PubSub.unsubscribe('mytopic'); */ PubSub.unsubscribe = function (value) { // 判断是否存在topic子集 const descendantTopicExists = function (topic: string) { // a descendant of the topic exists: // 存在该topic的子集 return Object.keys(storages).some((m) => m.indexOf(topic) === 0); }; // 存在指定topic const isTopic: boolean = typeof value === 'string' && (, value) || descendantTopicExists(value)); // 是字符串类型 const isToken = !isTopic && typeof value === 'string'; // 是函数类型 const isFunction = typeof value === 'function'; let result: boolean | string = false; if (isTopic) { // 根据topic取消订阅 PubSub.clearSubscriptions(value as string); return; } for (let m in storages) { if (, m)) { let message = storages[m]; if (isToken && message[value as string]) { // 根据id取消订阅 delete message[value as string]; result = value as string; // tokens are unique, so we can just stop here break; } if (isFunction) { // 函数类型的 取消订阅该函数的 Object.keys(message).forEach((t) => { if (message[t] === value) { delete message[t]; result = true; } }); } } } return result; }; export default PubSub;