BruceChen7 / gitblog

My blog
6 stars 1 forks source link

jaeger agent解析 #24

Open BruceChen7 opened 4 years ago

BruceChen7 commented 4 years ago

jaeger-agent的作用

Agent部署在每个集群机器中,用来转发jaeger-client的流量信息给collector,充当一个proxy。

68747470733a2f2f75706c6f61642e63632f692f324a41516b702e706e67

入口

func (a *Agent) Run() error { 
    // 监听http端口 
    listener, err := net.Listen("tcp", a.httpServer.Addr)                                    
    if err != nil {                        
        return err                        
    }                                                                                                                
    // 存储http接口的地址。                         
    a.httpAddr.Store(listener.Addr().String())  
    a.closer = listener  
    go func() {                         
        a.logger.Info("Starting jaeger-agent HTTP server", zap.Int("http-port", listener.Addr().(*net.TCPAddr).Port))
        // 启动http server                
        if err := a.httpServer.Serve(listener); err != nil { 
            a.logger.Error("http server failure", zap.Error(err)) 
        }                                                                                                            
        a.logger.Info("agent's http server exiting")                                                                 
    }() 

    // 启动3个goroutine      
    for _, processor := range a.processors {   
        go processor.Serve()             
    }                                                                                                                
    return nil                           
}

可以看到,该agent实际上启动了4个go routine,3个go routine是用来接收不同协议的数据,并将数据格式为thirft的数据转成Batch类型的数据,然后通过TChannel RPC请求,转发给collector,事情就这么简单。我们看下agent能够处理的数据格式

var defaultProcessors = []struct {                            
    model    Model                                            
    protocol Protocol                                         
    hostPort string                                           
}{                                                            
    {model: "zipkin", protocol: "compact", hostPort: ":5775"},
    {model: "jaeger", protocol: "compact", hostPort: ":6831"},
    {model: "jaeger", protocol: "binary", hostPort: ":6832"}, 
}

compactProtocol Protocol = "compact"  
binaryProtocol           = "binary"

protocolFactoryMap = map[Protocol]thrift.TProtocolFactory{        
    compactProtocol: thrift.NewTCompactProtocolFactory(),         
    binaryProtocol:  thrift.NewTBinaryProtocolFactoryDefault(),   
}

func (b *Builder) getProcessors(rep reporter.Reporter, mFactory metrics.Factory, logger *zap.Logger) ([]processors.Processor, error) { 
    retMe := make([]processors.Processor, len(b.Processors)) 
    for idx, cfg := range b.Processors {                    
        protoFactory, ok := protocolFactoryMap[cfg.Protocol]  
        if !ok {                                          
            return nil, fmt.Errorf("cannot find protocol factory for protocol %v", cfg.Protocol)                                       
        }                                                 
        var handler processors.AgentProcessor                
        switch cfg.Model {                                 
         // 根据数据模型,创建不同的handler,用来上报                
        case jaegerModel:                                  
            handler = jaegerThrift.NewAgentProcessor(rep)     
        case zipkinModel:                                     
            handler = zipkinThrift.NewAgentProcessor(rep)     
        default:                                            
            return nil, fmt.Errorf("cannot find agent processor for data model %v", cfg.Model)                                         
        }                                                                                                                              
        // 上报的metrics                        
        metrics := mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{                                             
            "protocol": string(cfg.Protocol),                
            "model":    string(cfg.Model),                   
        }})
        // 会创建udp  tbuffer server
        // 返回ThriftProcessor
        processor, err := cfg.GetThriftProcessor(metrics, protoFactory, handler, logger)                          
        if err != nil {                                    
            return nil, err                                   
        }                                                    
        retMe[idx] = processor                               
    }                                   
    return retMe, nil                                        
}                                                        

这3个服务对应于3种数据模式,官方推荐使用6831端口。

数据的处理

// Serve initiates the readers and starts serving traffic     
func (s *ThriftProcessor) Serve() {                           
    // 默认是10个gorointue来处理buffer                        
    s.processing.Add(s.numProcessors)                         
    for i := 0; i < s.numProcessors; i++ {                    
        go s.processBuffer()                                  
    }                                                                                           
    // 调用TBufferedServer::Serve
    // 用来统计buffer的metrics,从上游接收数据,放到DataChan中,利用多个go routine来
    // 上报
    s.server.Serve()                                          
}                                                            

干了两件事情

 func (s *ThriftProcessor) processBuffer() {
     // 从server的Data chanel中来读
     for readBuf := range s.server.DataChan() { 
         // 获取协议对象 
         protocol := s.protocolPool.Get().(thrift.TProtocol) 
         payload := readBuf.GetBytes()                                                                                       
         // 写下body 
         protocol.Transport().Write(payload)                                                     
         s.logger.Debug("Span(s) received by the agent", zap.Int("bytes-received", len(payload)))                                                                    
         // 真正开始处理 
         if ok, err := s.handler.Process(protocol, protocol); !ok {
             s.logger.Error("Processor failed", zap.Error(err))
             s.metrics.HandlerProcessError.Inc(1)    
         }                                                                                       
         // 处理完了放回协议对象   
         s.protocolPool.Put(protocol)  
         s.server.DataRecd(readBuf) // acknowledge receipt and release the buffer                
     }                
     s.processing.Done() 
 }                                                                                               

上报给collector

func (p *agentProcessorEmitBatch) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) {  
    args := AgentEmitBatchArgs{}                                                         
    if err = args.Read(iprot); err != nil { 
        iprot.ReadMessageEnd()      
        return false, err           
    }                                                                                                                          
    iprot.ReadMessageEnd()          
    var err2 error                                                                           // 调用report的EmitBatch来上报                                 
    if err2 = p.handler.EmitBatch(args.Batch); err2 != nil {  
        return true, err2          
    }                                                                                                          
    return true, nil      
}                                

EmitBatch

// cmd/agent/app/reporter/tchannel/reporter.go中
// EmitBatch implements EmitBatch() of Reporter                         
func (r *Reporter) EmitBatch(batch *jaeger.Batch) error {               
    submissionFunc := func(ctx thrift.Context) error {                  
        _, err := r.jClient.SubmitBatches(ctx, []*jaeger.Batch{batch})  
        return err                                                      
    }                                                                   
    return r.submitAndReport(                                           
        submissionFunc,                                                 
        "Could not submit jaeger batch",                                
        int64(len(batch.Spans)),                                        
    )                                                                   
}
// rpc请求,thrift-gen/jaeger/tchan-jaeger.go
func (c *tchanCollectorClient) SubmitBatches(ctx thrift.Context, batches []*Batch) ([]*BatchSubmitResponse, error) {  
     var resp CollectorSubmitBatchesResult   
     args := CollectorSubmitBatchesArgs{     
         Batches: batches,                 
     }
     // 调用rpc请求提交
     success, err := c.client.Call(ctx, c.thriftService, "submitBatches", &args, &resp)                                
     if err == nil && !success {            
         switch {                            
         default:                            
             err = fmt.Errorf("received no result or unknown exception for submitBatches")                             
         }                                   
     }                                     
     return resp.GetSuccess(), err  
 }
 type Batch struct {  
    Spans   []*Span  `protobuf:"bytes,1,rep,name=spans" json:"spans,omitempty"`      
    Process *Process `protobuf:"bytes,2,opt,name=process" json:"process,omitempty"`  
}                                                                                    

 Agent把数据提交到Collector是通过RPC框架TChannel,框架由Uber开发,使用TChannel,Agent可以把数据批量提交到Collector。  

参考资料