zhangxiang958 / Blog

阿翔的个人技术博客,博文写在 Issues 里,如有收获请 star 鼓励~
https://github.com/zhangxiang958/zhangxiang958.github.io/issues
153 stars 11 forks source link

RPC in Golang(一) #55

Open zhangxiang958 opened 4 years ago

zhangxiang958 commented 4 years ago

RPC,即远程过程调用,在 grpc 等协议里面,数据的传输是通过二进制的方式进行传输的。而我们在使用 rpc 框架的时候,为了屏蔽底层的数据传输细节,像通过调用本地代码一样完成远程调用,往往会发现有一个 stub 模块。

stub 模块

什么是 stub 模块?其实就是将客户端的请求参数打包成网络信息,然后通过网络发送给服务方。服务端也有 stub 模块,同样也是封装了将网络消息解包,调用本地方法的过程。

简单的 RPC 框架

服务端

客户端传到服务端的是一个函数名,服务端需要维护一个函数名到函数具体逻辑的映射关系。具体是怎么处理映射关系呢?请看下面代码:

package main

import (
    "fmt"
    "reflect"
)

func CallMeBaby(lastName, firstName string) (string, error) {
    res := fmt.Sprintf("%s%s is my baby!", lastName, firstName)
    return res, nil
}

func main() {
    funcs := make(map[string]reflect.Value)
    funcs["CallMeBaby"] = reflect.ValueOf(CallMeBaby)

    req := []reflect.Value{
        reflect.ValueOf("xiang"),
        reflect.ValueOf("zhang"),
    }

    val := funcs["CallMeBaby"].Call(req)
    var rsp []interface{}
    for _, v := range val {
        rsp = append(rsp, v.Interface())
    }

    fmt.Println("result", rsp)
}

像上面这样的代码,将函数与函数名维护在一个 Map 里面,类似这样,如果客户端传过来函数名字,就能通过 Call 方法来将参数传进函数并执行。

类似这样,我们可以提供一个注册的方式,用服务端编写者能够将方法进行注册,将方法放进映射里面。

import (
    "reflect"
)

type Server struct {
    handlers map[string]reflect.Value
}

func (s *Server) Register(name string, f interface{}) {
    if f, exist := s.handlers[name]; exist {
        return;
    }

    s.handlers[name] = reflect.ValueOf(f);
}

这里网络传输是基于 TCP 协议的,我们还需要建立一个 TCP 服务来接收网络信息的传输。

func (s *Server) Serve(addr string) {
    server, err := net.Listen("tcp", addr)
    if err != nil {
        log.Fatal("serve start err", err)
    }
    fmt.Printf("serve listening on %s\n", addr)
    defer server.Close()

    for {
        conn, err := server.Accept()
        if err != nil {
            log.Fatal("serve accep err", err)
            continue
        }
        go func() {
            defer conn.Close()
            buf := make([]byte, 4096)
            for {
                n, err := conn.Read(buf)
                if err != nil || n == 0 {
                    log.Fatal("tcp handler", err)
                    break
                }
                receiveStr := strings.TrimSpace(string(buf[0:n]))
                fmt.Printf("receive: %v", receiveStr)
                conn.Write([]byte(receiveStr))
            }
            fmt.Printf("Connection from %v closed. \n", conn.RemoteAddr())
        }()
    }
}

这里需要使用 while 循环来不停地接收客户端传来的信息,golang 中没有 while 关键字,直接使用 for,省略掉循环条件就可以了。

这里需要使用协程来保证服务端的并发。假设这里只是传一个函数名过来,让服务端执行,并返回函数值,相关代码如下:

go func() {
    defer conn.Close()
    buf := make([]byte, 4096)
    for {
        n, err := conn.Read(buf)
        if err != nil || n == 0 {
            log.Fatal("tcp handler", err)
            break
        }
        receiveStr := strings.TrimSpace(string(buf[0:n]))
        fmt.Printf("receive: %v", receiveStr)
        f, RPCExist := s.handlers[receiveStr]
        if !RPCExist {
            err := fmt.Sprintf("rpc %s is not exist", receiveStr)
            conn.Write([]byte(err))
            continue
        }
        req := []reflect.Value{
            reflect.ValueOf("rpc"),
            reflect.ValueOf("name"),
        }
        result := f.Call(req)
        var rsp []interface{}
        for _, v := range result {
            rsp = append(rsp, v.Interface())
        }
        fmt.Println(rsp)
        sss := rsp[0].(string)
        conn.Write([]byte(sss))
    }
    fmt.Printf("Connection from %v closed. \n", conn.RemoteAddr())
}()

此时,服务端编写者已有简易版的 rpc 框架的写法了:

package main

import (
    "fmt"

    rpc "rpc-lite"
)

func test(n, m string) string {
    res := fmt.Sprintln("testS", n, m)
    return res
}

func main() {
    server := rpc.NewServer()

    server.Register("test", test)

    server.Serve()
}

只需要客户端使用 net 模块的 Dial 方法,建立 socket 连接,并传送数据 "test",就可以执行服务端的代码并拿到对应的返回数据了。