dduo518 / hexo-blog

hexo静态blog点击 https://github.com/chong0808/hexo-blog/issues
3 stars 0 forks source link

TCP 链路追踪 -- jaeger #38

Open dduo518 opened 3 years ago

dduo518 commented 3 years ago

TCP 链路追踪 -- jaeger

TCP的链路追踪不同与http,但是有点跟GRPC一样,由于GRPC也是字节流形式的二进制数据数据,所以在GRPC中会把apentracing api的数据注入到metadata里面,类似于请求头 GRPC是通过拦截器实现的 客户端的实现

func ClientInterceptor(tracer opentracing.Tracer) grpc.UnaryClientInterceptor {
    return func(ctx context.Context, method string, request, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {

        //一个RPC调用的服务端的span,和RPC服务客户端的span构成ChildOf关系
        var parentCtx opentracing.SpanContext
        parentSpan := opentracing.SpanFromContext(ctx)
        if parentSpan != nil {
            parentCtx = parentSpan.Context()
        }
        span := tracer.StartSpan(
            method,
            opentracing.ChildOf(parentCtx),
            opentracing.Tag{Key: string(ext.Component), Value: "gRPC Client"},
            ext.SpanKindRPCClient,
        )

        defer span.Finish()
        md, ok := metadata.FromOutgoingContext(ctx)
        if !ok {
            md = metadata.New(nil)
        } else {
            md = md.Copy()
        }
        // 核心通过将span信息注入到metadata 里面
        err := tracer.Inject(
            span.Context(),
            opentracing.TextMap,
            MDCarrier{md}, // 自定义 carrier
        )

        if err != nil {
            log.Errorf("inject span error :%v", err.Error())
        }

        newCtx := metadata.NewOutgoingContext(ctx, md)
        err = invoker(newCtx, method, request, reply, cc, opts...)

        if err != nil {
            log.Errorf("call error : %v", err.Error())
        }
        return err
    }
}

服务端的实现

// ServerInterceptor Server 端的拦截器
func ServerInterceptor(tracer opentracing.Tracer) grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
        md, ok := metadata.FromIncomingContext(ctx)
        if !ok {
            md = metadata.New(nil)
        }
        // 将数据从metadata 中提取出来
        spanContext, err := tracer.Extract(
            opentracing.TextMap,
            MDCarrier{md},
        )

        if err != nil && err != opentracing.ErrSpanContextNotFound {
            grpclog.Errorf("extract from metadata err: %v", err)
        } else {
            span := tracer.StartSpan(
                info.FullMethod,
                ext.RPCServerOption(spanContext),
                opentracing.Tag{Key: string(ext.Component), Value: "gRPC Server"},
                ext.SpanKindRPCServer,
            )
            defer span.Finish()

            ctx = opentracing.ContextWithSpan(ctx, span)
        }

        return handler(ctx, req)

    }

}

TCP的实现

TCP的实现方式有2种,

1. 一种是GRPC实现方式一样
2. 链路追踪的数据加入到数据流的尾部

第一种方式,在一个完整地数据包里面加个metadata字段,用于承载链路追踪的数据, 这个方式需要自己实现一个注入与提取的方法,metadata实现以下接口

type interface{
  ForeachKey(handler func(key, val string) error) error
  Set(key, val string)
}

一个完整地数据包结构

type Message struct {
    Version      string                 `json:"v"`  // 版本号 1.0 4字节
    EncodingType int8                   `json:"et"` // 消息内容类型 [default:0 json] [1:protobuf] 1字节
    Cmd          uint32                 `json:"c"`  // 消息类型 4字节
    Sig          []byte                 `json:"s"`  // 签名 16 字节
    Time         int64                  `json:"t"`  // 时间戳 8字节
    Content      []byte                 `json:"ct"` // 消息内容
    Metadata     map[string]interface{} `json:"md"` // 元数据
}

metadata的结构


type tcpMetadataCarrier map[string]interface{}

func (tmd tcpMetadataCarrier) ForeachKey(handler func(key, val string) error) error {
    for k, val := range tmd {
        v, ok := val.(string)
        if !ok {
            continue
        }
        if err := handler(k, v); err != nil {
            return err
        }
    }
    return nil
}

func (tmd tcpMetadataCarrier) Set(key, val string) {
    tmd[key] = val
}

func Inject(span opentracing.Span, tmd map[string]interface{}) error {
    c := tcpMetadataCarrier(tmd)
    return opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap, c)
}

func Extract(tmd map[string]interface{}) (opentracing.SpanContext, error) {
    c := tcpMetadataCarrier(tmd)
    return opentracing.GlobalTracer().Extract(opentracing.TextMap, c)
}

第二种方式直接将opentracing api的数据追加到数据流的尾部,在解析的时候先解析出业务数据,然后再解析opentracing api的数据,并且可直接以二进制流的方式解析,不需要Unmarshal io write

unc (p *PackIO) Write(ctx context.Context, message *Message) error {
    message.sign()
    var header = make([]byte, 17)
    binary.BigEndian.PutUint32(header[:4], uint32(len(message.Content)))
    binary.BigEndian.PutUint32(header[4:8], message.Cmd)
    binary.BigEndian.PutUint64(header[8:16], uint64(message.Time))
    header[16] = uint8(message.EncodingType)

    var lenNum = make([]byte, 0, defaultHeaderLen)
    var buf = bytes.NewBuffer(lenNum)
    if _, err := buf.Write(header); err != nil {
        return err
    }

    if _, err := buf.Write(message.Sig); err != nil {
        return err
    }
    if _, err := buf.Write(message.Content); err != nil {
        return err
    }
    if _, err := p.writer.Write(buf.Bytes()); err != nil {
        return err
    }

    if p.opts.trace {
        sp := opentracing.SpanFromContext(ctx)
        tra := opentracing.GlobalTracer()
        tracer, ok := tra.(*jaeger.Tracer)
        if ok && sp != nil {
            b := bytes.NewBuffer(make([]byte, 0))
      // 将数据直接以二进制流的方式写入,且jager提供了接口,第三个参数直接传入io writer 接口
            err := tracer.Inject(sp.Context(), opentracing.Binary, b)
            if err != nil {
                return err
            }
            if _, err := p.writer.Write(b.Bytes()); err != nil {
                return err
            }
        }
    }
    return p.writer.Flush()
}

io read

func (p *PackIO) Read(ctx context.Context) (context.Context, *Message, error) {
    for {
        select {
        case <-ctx.Done():
            return nil, nil, fmt.Errorf("read end")
        default:
            var header = make([]byte, defaultHeaderLen)
            if p.opts.readTimeout > 0 {
                _ = p.Conn.(*net.TCPConn).SetReadDeadline(time.Now().Add(p.opts.readTimeout * time.Second))
            }
            if _, err := io.ReadFull(p.reader, header[:]); err != nil {
                return nil, nil, err
            }

            msgLen := int(binary.BigEndian.Uint32(header[:4]))
            var m = &Message{
                Cmd:          binary.BigEndian.Uint32(header[4:8]),
                EncodingType: int8(header[16]),
                Sig:          header[17:33],
                Time:         int64(binary.BigEndian.Uint64(header[8:16])),
                Content:      make([]byte, msgLen),
            }
            if _, err := io.ReadFull(p.reader, m.Content[:]); err != nil {
                return nil, nil, err
            }
            var spanCtx opentracing.SpanContext
            var err error
            if p.opts.trace {
                tra := opentracing.GlobalTracer()
                tracer, ok := tra.(*jaeger.Tracer)
                if ok {
          // 将数据直接从二进制流中提取,且jager提供了接口,第三个参数直接传入io reader 接口
                    spanCtx, err = tracer.Extract(opentracing.Binary, p.reader)
                    if err != nil {
                        return nil, nil, err
                    }
                }
            }

            if !m.check() {
                return nil, nil, fmt.Errorf("uncheck sig message")
            }

            if spanCtx != nil {
                sp := opentracing.StartSpan("receive_packet", opentracing.FollowsFrom(spanCtx))
                sp.Finish()
                return opentracing.ContextWithSpan(ctx, sp), m, nil
            }
            return context.TODO(), m, nil
        }
    }
}

以上俩种方式承载的方式不同,第一种直接将opentracing api的数据加入到业务数据包内,第二种方式将数据追加到字节序的尾部,第二种方式减少了序列化的开销,但是需要客户端与服务端做好协商,如果服务端加入了,客户端没有接收,会有可能造成数据流解析失败