wangming1993 / issues

记录学习中的一些问题,体会与心得 https://wangming1993.github.io/issues
8 stars 4 forks source link

升级 grpc 到 1.6.0 过程中 context 问题 #70

Open wangming1993 opened 6 years ago

wangming1993 commented 6 years ago

升级 grpc 到 1.6.0 过程中 context 问题

https://github.com/grpc/grpc-go/blob/v1.6.0/Documentation/grpc-metadata.md#retrieving-metadata-from-context

最近在将 grpc 由 1.0.5 升级到 1.6.0 , 发现 metadata 包下面的接口发生了改变:

其中发送 grpc 调用时,需要使用 NewOutgoingContext 去创建 context, 在 grpc 的 service 里面调用

FromIncomingContext 去获取。

吐槽:真心感觉这样很是麻烦,原来的方式就很好啊, 可是人家觉得这样能分清楚

于是看了一下这其中的转化过程,为什么发送的是 NewOutgoingContext, 但是接收需要使用: NewIncomingContext:

client 发送数据

service: MemberService

method: GetMember

下面给出执行流程,但是只会涉及关键代码,部分代码会被省略:

grpc.Invoke(ctx, "/rpc.member.MemberService/GetMember", in, out, c.cc, opts...)

// google.golang.org/grpc/call.go
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    return invoke(ctx, method, args, reply, cc, opts...)
}

func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
    for {
        t, put, err = cc.getTransport(ctx, gopts)
        stream, err = t.NewStream(ctx, callHdr)
        if peer, ok := peer.FromContext(stream.Context()); ok {
            c.peer = peer
        }
        err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, stream, t, args, topts)
        err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)
        t.CloseStream(stream, nil)
        return stream.Status().Err()
    }
}

// google.golang.org/grpc/transport/http2_client.go
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
    if md, ok := metadata.FromOutgoingContext(ctx); ok {
        for k, vv := range md {
            for _, v := range vv {
                t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
            }
        }
    }
}

server 处理数据

// google.golang.org/grpc/server.go
func (s *Server) Serve(lis net.Listener) error {
    for {
        rawConn, err := lis.Accept()
        // Start a new goroutine to deal with rawConn
        // so we don't stall this Accept loop goroutine.
        go s.handleRawConn(rawConn)
    }
}

func (s *Server) handleRawConn(rawConn net.Conn) {
    s.serveHTTP2Transport(conn, authInfo)
}

func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
    st, err := transport.NewServerTransport("http2", c, config)
    s.serveStreams(st)
}

// google.golang.org/grpc/transport/transport.go
func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
    return newHTTP2Server(conn, config)
}

// google.golang.org/grpc/transport/http2_server.go
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
  return &http2Server{}, nil
}

func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
    for {
        frame, err := t.framer.readFrame()

        switch frame := frame.(type) {
        case *http2.MetaHeadersFrame:
            if t.operateHeaders(frame, handle, traceCtx) {
                t.Close()
                break
            }
        case *http2.DataFrame:
            t.handleData(frame)
    }
}

func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
    var state decodeState
    for _, hf := range frame.Fields {
        if err := state.processHeaderField(hf); err != nil {
        }
    }
    s := &Stream{}
    if len(state.mdata) > 0 {
        s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
    }
}

总结起来就是发送数据时,会根据 metadata.FromOutgoingContext(ctx) 来将数据写入 Header 帧,

server 在接收到客户端请求时,从 Header 帧获取,并写入 metadata.NewIncomingContext(s.ctx, state.mdata)

PilockHulmes commented 6 years ago

视使用的库不同, 有一部分有用的数据会直接存在ctx里而不是ctx的metadata里. 需要当心这个坑.