imeay / blog

my blog
0 stars 0 forks source link

aliyun-ons (node版)之 集群模式下模拟广播消费 #20

Open imeay opened 6 years ago

imeay commented 6 years ago

说在前头的概念

MQ 是基于发布订阅模型的消息系统。 消息的订阅方订阅关注的 Topic,以获取并消费消息。 由于订阅方应用一般是分布式系统,以集群方式部署有多台机器。 因此 MQ 约定以下概念。

集群:MQ 约定使用相同 Consumer ID 的订阅者属于同一个集群。同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点。

集群消费:当使用集群消费模式时,MQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。

广播消费:当使用广播消费模式时,MQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次

ons 包

非官方包

https://github.com/XadillaX/aliyun-ons

官方包

https://github.com/ali-sdk/ali-ons

项目实践之多服务单 Consumer ID 实现广播消费模式

从官方的描述来说,集群消费模式下,多台服务器同一个消费者ID,只会有一个消费节点能接收到消息 官方的ons包是支持广播消费的,但是项目中(node) 用到的是非官方包,并没有看到广播消费的配置参数, 想到 redis的 pub/sub ,就想到或许能模拟成广播消费的模式

MQ 部分

我觉得看 aliyun-ons 这个就可以了

redis 部分

const _ = require('lodash');
const redis = require('redis');
const {createClient} = redis;
let sub_client;
const redis_const = {
  CHANNEL_LIST: ['channel_name'],
  CHANNEL_PREFIX: 'XXX:PubSub:',
};
const init_sub_connection = async () => {
    sub_client = createClient({
      host: 'localhost',
      port: 9736,
    });
    sub_client.select(1);
  };

  const init_subscribe = async () => {
    const channels = _.map(redis_const.CHANNEL_LIST, (channel) => {
      return redis_const.CHANNEL_PREFIX + channel;
    });
    await sub_client.subscribe(channels);

    sub_client.on('subscribe', function (channel, count) {
    });
  };

  const on_subscribe_event = async () => {
    sub_client.on('message', function(channel, message) {
      channel = channel.replace(redis_const.CHANNEL_PREFIX, '');
      console.log(message);
    });
  };

  const start = async () => {
    await init_sub_connection();
    await init_subscribe();
    await on_subscribe_event();
  };

  start();

redis publish 部分

省略一堆代码

......
await redis_pub_service.publish_message({
channel: 'channel_name',
message: JSON.stringify({ status }),
});

当集群的某个消费者接收到消息时,把消息的内容通过 redis 发布到某个频道, 集群中所有订阅到这个频道的,都会接收到消息, 从而实现了自己业务系统内的广播消费