tsy77 / blog

78 stars 2 forks source link

RPC实现原理 #3

Open tsy77 opened 6 years ago

tsy77 commented 6 years ago

在前端业务越来越向后扩展的情况下,RPC的调用也变成了我们获取数据重要的一部分,所以本文主要介绍RPC及其基本原理,主要有以下三部分:

1.RPC client/server的搭建及使用

2.client/server是如何处理RPC请求和调用的,其中包括我们每次用thrift命令生成的service和types到底是干嘛的

3.RPC的原理总结

RPC client/server的搭建及使用

RPC client的搭建和使用大家可以看官网上的实现就好了

官网:https://thrift.apache.org/tutorial/nodejs

client和server的业务逻辑建议大家简单搭建一个client和server看一下,我这里已官网上的例子为例进行分析:

client:

1.createConnection

var connection = thrift.createConnection("localhost", 9090, {
  transport : transport,
  protocol : protocol
});

这里的协议主要包括JSON, XML, plain text, compact binary

2.createClient

var Calculator = require('./gen-nodejs/Calculator');
......
var client = thrift.createClient(Calculator, connection);

这里的createClient其实就是实例化了Calculator.Client对象,以下是代码:

new Calculator.Client(connection.transport, connection.protocol)

Calculator.Client构造函数如下:

var CalculatorClient = exports.Client = function(output, pClass) {
    this.output = output;
    this.pClass = pClass;
    this._seqid = 0;
    this._reqs = {};
};

大家首先关注下_seqid属性,该属性在rpc中很重要,试想一下,我们调用了某个方法,并传入callback,当结果返回时执callback(),那么程序异步获取result后,如何知道其对应callack呢,_seqid和_reqs出现了,RPC在this._reqs[seqid]中存储每个方法的callback,每个方法的seqid是在this._seqid基础上递增而来。

上面client对象中包含了output对象,对象中包含了thrift的接口,thrift协议规定了传输数据和内存中的变量之间的转换及其序列化、反序列化。这些接口可以在文档https://thrift.apache.org/docs/concepts中找到。下面只简单介绍下thrift的writeMessageBegin方便大家理解。

 public void writeMessageBegin(TMessage message) throws TException {  
    if (strictWrite_) {//判断是否强制写入版本号,是  
      int version = VERSION_1 | message.type;  
      writeI32(version);//写入版本号  
      writeString(message.name);//写入功能方法的名称  
      writeI32(message.seqid);//写入客户端的标识,这个标识是自动增加的  
    } else {//否  
      writeString(message.name);//写入功能方法的名称  
      writeByte(message.type);//写入类型  
      writeI32(message.seqid);//写入客户端的标识,这个标识是自动增加的  
    }  
}  

3.client[method](args, callback)

下面以caculate方法为例:

方法首先调用CalculatorClient.prototype.calculate(),其中主要代码如下:

this._reqs[this.seqid()] = callback;
this.send_calculate(logid, w);

this._reqs[this.seqid()]即咱们上面所说的存储callback的地方。

send_calculate()代码如下:

var output = new this.pClass(this.output);
output.writeMessageBegin('calculate', Thrift.MessageType.CALL, this.seqid());
var params = {
    logid: logid,
    w: w
};
var args = new Calculator_calculate_args(params);
args.write(output);
output.writeMessageEnd();
return this.output.flush();

首先调用output.writeMessageBegin()接口,表示消息的开始; 接着args.write(output),其实就是运用thrift接口,把方法所需参数传递过去,代码如下:

    output.writeStructBegin('Calculator_calculate_args');
    if (this.logid !== null && this.logid !== undefined) {
        output.writeFieldBegin('logid', Thrift.Type.I32, 1);
        output.writeI32(this.logid);
        output.writeFieldEnd();
    }
    if (this.w !== null && this.w !== undefined) {
        output.writeFieldBegin('w', Thrift.Type.STRUCT, 2);
        this.w.write(output);
        output.writeFieldEnd();
    }
    output.writeFieldStop();
    output.writeStructEnd();
    return;

最后调用flush清空缓冲区,数据发出

当结果返回后,RPC将调用CalculatorClient.prototype.recv_calculate(),代码如下:

var callback = this._reqs[rseqid] || function() {};
delete this._reqs[rseqid];
......
var result = new Calculator_calculate_result();
result.read(input);
input.readMessageEnd();

if (null !== result.ouch) {
    return callback(result.ouch);
}
if (null !== result.success) {
    return callback(null, result.success);
}
return callback('calculate failed: unknown result');

其实就是实例化一个Calculator_calculate_result对象,调用了thrift各种read接口,最后执行callback。

server:

1.createServer()

var Calculator = require("./gen-nodejs/Calculator");
.....
var server = thrift.createServer(Calculator, {
    calculate: function(logid, work, result) {
        console.log("calculate(", logid, ",", work, ")");

        var val = 0;
        if (work.op == ttypes.Operation.ADD) {
          val = work.num1 + work.num2;
        } else if (work.op === ttypes.Operation.SUBTRACT) {
          val = work.num1 - work.num2;
        } else if (work.op === ttypes.Operation.MULTIPLY) {
          val = work.num1 * work.num2;
        } else if (work.op === ttypes.Operation.DIVIDE) {
          if (work.num2 === 0) {
            var x = new ttypes.InvalidOperation();
            x.whatOp = work.op;
            x.why = 'Cannot divide by 0';
            result(x);
            return;
          }
          val = work.num1 / work.num2;
        } else {
          var x = new ttypes.InvalidOperation();
          x.whatOp = work.op;
          x.why = 'Invalid operation';
          result(x);
          return;
        }

        var entry = new SharedStruct();
        entry.key = logid;
        entry.value = ""+val;
        data[logid] = entry;

        result(null, val);
     },
 }

这里的createServer创建了一个tcp/tls的服务,监听的回调如下:

var self = this;
    stream.on('error', function(err) {
        self.emit('error', err);
    });
    stream.on('data', transport.receiver(function(transportWithData) {
      var input = new protocol(transportWithData);
      var output = new protocol(new transport(undefined, function(buf) {
        try {
            stream.write(buf);
        } catch (err) {
            self.emit('error', err);
            stream.end();
        }
      }));

      try {
        do {
          processor.process(input, output);
          transportWithData.commitPosition();
        } while (true);
      } catch (err) {
        ......
      }
    }));

    stream.on('end', function() {
      stream.end();
    });

这里的process是gen-nodejs/Calculator中export出来的processer的实例,input和output对象分别包含读和写的thrift接口。

2.processor.process()

CalculatorProcessor.prototype.process = function(input, output) {
  var r = input.readMessageBegin();
  if (this['process_' + r.fname]) {
    return this['process_' + r.fname].call(this, r.rseqid, input, output);
  } else {
    input.skip(Thrift.Type.STRUCT);
    input.readMessageEnd();
    var x = new Thrift.TApplicationException(Thrift.TApplicationExceptionType.UNKNOWN_METHOD, 'Unknown function ' + r.fname);
    output.writeMessageBegin(r.fname, Thrift.MessageType.EXCEPTION, r.rseqid);
    x.write(output);
    output.writeMessageEnd();
    output.flush();
  }
}

this['process_' + r.fname].call(this, r.rseqid, input, output),也就是说当client调用caculate方法时,会执行process_caculate方法。

process_caculate方法中,首先实例化CalculatorProcessor.prototype.processcaculate,然后调用其read方法处理参数,接着调用我们注册在createServer()中的方法,最后调用output.writexxxoutput.flush()将结果编码并返回。

至此,我们缕顺了RPC调用和处理的流程,下面我们总结下我们上面一直Calculator.js中到底有什么?

1.服务中每个方法的参数对象(包含read、write方法)

2.服务中每个方法的调用返回结果对象(包含read、write方法)

3.client,其原型中包含所有同service中声明的方法名相同的方法(比如xxxmethod)、send_xxxmethod、recv_xxxmethod

4.processor,其原型中包含与service所有声明的方法一一对应的process方法(process_xxxmethod)

RPC调用原理

1.server端启动程序,侦听端口,实现提供给client调用的函数,保存在一个对象里。

2.client端启动程序,连接服务端,连接完成后发送describe命令,要求server返回它能提供调用的函数名。

3.server端接收到describe命令,把自己可供调用的函数名包装好发送出去

4.client端接收到server发送的函数名,注册到自己的对象里,给每个函数名包装一个方法,使本地调用这些函数时实际上是向server端发送请求:

5.client端调用server端的函数:

1) 给传入的callback函数生成一个唯一ID,称为callbackId,记录到client的一个对象里。

2) 包装好以下数据发送给server端:调用函数名,JSON序列化后的参数列表,callbackId

6.server端接收到上述信息,解析数据,对参数列表反序列化,根据函数名和参数调用函数。

7.函数运行完成后,把结果序列化,连同之前收到的callbackId发送回client端

8.client端接收到函数运行结果和callbackId,根据callbackId取出回调函数,把运行结果传入回调函数中执行。