closertb / closertb.github.io

浏览issue 或 我的网站,即可查看我的所有博客
https://closertb.site
32 stars 0 forks source link

Koa-spring: Node进程通信的实践 #42

Open closertb opened 4 years ago

closertb commented 4 years ago

关于这篇文章

如果你感兴趣,可以fork项目,自己体验一下
Koa-springhttps://github.com/closertb/koa-spring
related-client: https://github.com/closertb/koa-spring-client
技术栈:koa + Sequelize + routing-controllers + typescript
上篇:Koa-spring:后端太忙,让我自己写服务(上)
下篇:Koa-spring:后端太忙,让我自己写服务(下)

Node多进程

进程与线程

这里只立个概念,有很多文章是讲Node的进程与线程的,这里推荐(很多文章年代久远,但都是精华):

简单粗暴高效,直到我为了程序更健壮,用cluster模式开启了多进程,显然,上面的方案不再奏效,进程与进程间的cache是分别存在不同的内存块的,所以,只有换方案,仍然没有上redis,而是考虑用进程间的通信来解决。下面是一个简单的示意图:

image

简单来讲,就是利用主进程与工作进程之间的IPC通道进行通信,登录成功后,工作进程将鉴权信息发送给主进程,主进程进行存储。下一次请求发起鉴权时,工作进程给主进程发送一个读取缓存的消息,并开启监听;主进程监听到这个消息,并读取缓存中的鉴权信息,然后再发送给工作进程;工作进程收到后,判断携带的token是否和主进程缓存的token一致,一致则鉴权通过,部分示例代码:

// index.ts
const worker = cluster.fork();
//监听message事件
worker.on('message', (msg: ActionBody) => {
  const { type, payload } = msg;
  if(type == 'readCache') {
    const { uid, id } = payload;
    const res = cache.get(id) || {};
    worker.send({ type: 'sendCache', uid, payload: res })
  }
  if(type == 'saveCache') {
    cache.put(payload.id, payload, ExpiredTime);
  }
});

// userController.ts, 发起缓存鉴权信息消息
process.send({ type: 'saveCache', payload: res });

// AuthCheckMiddleWare.ts
function readCache(id: string) {
  return new Promise((res, rej) => {
    process.on('message', (m) = {
      if (m.uid === id) {
          res(m.payload);
      } else {
        rej({});
      }
    });
    process.send({ type: 'readCache', payload: { id, uid: id }});
  });
}

const user: any = await readCache(uid);
if(user && user.token === token) {
    ctx.user = user;
    await next();
} else {
  ctx.body = {
    code: 120001,
    message: uid ? '登录超时,请重新登录' : '请先登录',
    status: 'error'
  };
}

上面展示的是一个简易版的进程通信,在低并发请求时,运行是没有问题的。但如果你是老江湖,就会发现很多bug。

闭包与立即执行函数

上一节展示的代码,有多少bug呢?简单列举一下:

所以基于以上问题,对readCache函数作了如下改动:

type Callback = (m: ActionBody) => boolean;

function generateUid() {
  const random = Math.floor(26 * Math.random() + 65);
  return `${Date.now()}-${String.fromCharCode(random)}`;
}

// 回调队列
const callbackList: Array<Callback> = [];

process.on('message', (m: ActionBody = { type: 'sendCache' }) => {
  let tempList = callbackList.slice();
  tempList.forEach((callback, i) => {
    callback = tempList[i];
    if (callback && callback(m)) {
      callbackList.splice(i, 1); // 成功处理了响应的或则响应已过期,移除这个回调;
    }
  });
});

function addCallback(callback: Callback) {
  callbackList.push(callback);
}

function readCache(id: string) {
  return new Promise((res, rej) => {
    try {
      const uid = generateUid();
      addCallback(((uid: string) => {
        let status = false;
        let timeout = setTimeout(() => { // 5秒超时读取,防止永久未回调,导致回调一直存在于回调列表中: 可能性很小
          rej({ message: '授权验证超时' });
          status = true;
        }, 5000);
        return (m: ActionBody) => {
        // 必须在过期前响应
        if (!status && m.uid === uid) {
          clearTimeout(timeout);  
          res(m.payload);
          return true;
        }
        return status;
      }
      })(uid));
      process.send({ type: 'readCache', payload: { id, uid }});
    } catch (error) {
      rej(error);
    }
  });
}

可以看到,针对上面提到的问题,作了如下几方面的改进:

通信的优化就讲完了,但为什么和章节标题闭包与立即执行半毛钱关系没有。是有的,我又在闭包上跌了一跤,为了长记性,我故意用了这样一个标题。事情经过是这样的,开始添加回调函数不是上面那样写的,而是下面这样:

try {
  const uid = generateUid();
  addCallback((m: ActionBody) => {
    // 必须在过期前响应
    if (m.uid === uid) {
      res(m.payload);
      return true;
    }
    return false;
  }
  });
  process.send({ type: 'readCache', payload: { id, uid }});
}

看起来没什么毛病,无并发运行起来也没毛病。但当我加大并发量时,就会偶尔报权限错误,最后一排查,发现是m.uid === uid这一行的uid与期望的不一致,被篡改成了下一次请求生成的uid(抱头思考N分钟,哦,原来,闭包的锅)。解决闭包最好的方法是什么,立即执行函数,然后才有了上面的写法。如果对还原这个过程有兴趣,可以在主进程的响应里,添加一个500ms的延迟。

至此,我的这一次前端向Node服务端的探索告一段落,但好多想做的还没实现。愿下一次机会来临时,有能力让Graphql在我的服务中落地,用了Github的API V4后, 发现Graphql对架构要求极高,这能力怎么学,还得倒腾,倒腾。

系列索引

上篇:Koa-spring:后端太忙,让我自己写服务(上)
下篇:Koa-spring:后端太忙,让我自己写服务(下)