observIQ / stanza

Fast and lightweight log transport and processing.
Apache License 2.0
180 stars 24 forks source link

GoFlow stop fail bug #775

Open jxplus opened 1 month ago

jxplus commented 1 month ago

Describe the bug this goroutine will never stop,because flow.FlowRoutine also has an Infinite loop。 see codes,

  1. operator/builtin/input/goflow/goflow.go:Start() lin109

    
    func (n *GoflowInput) Start() error {
    n.ctx, n.cancel = context.WithCancel(context.Background())
    
    go func() {
        ...
        for {
            n.Infof("Starting Goflow on %s:%d in %s mode", n.address, n.port, n.mode)
            switch n.mode {
            case modeSflow:
                flow := &utils.StateSFlow{Transport: n, Logger: n}
                goflowErr = flow.FlowRoutine(n.workers, n.address, n.port, reuse)
            case modeNetflowV5:
                flow := &utils.StateNFLegacy{Transport: n, Logger: n}
                goflowErr = flow.FlowRoutine(n.workers, n.address, n.port, reuse)
            case modeNetflowIPFIX:
                flow := &utils.StateNetFlow{Transport: n, Logger: n}
                goflowErr = flow.FlowRoutine(n.workers, n.address, n.port, reuse)
            }
    
                        // never exec
            select {
            case <-n.ctx.Done():
                return
            default:
            }
                       ...
        }
    }()
    
    return nil
    }`
  2. C:/Users/Administrator/go/pkg/mod/github.com/observiq/goflow/v3@v3.4.4/utils/utils.go:256

    
    ecb := DefaultErrorCallback{
        Logger: logger,
    }
       ...
       // this also is a ifinite loop
    for {
        size, pktAddr, _ := udpconn.ReadFromUDP(payload)
        payloadCut := make([]byte, size)
        copy(payloadCut, payload[0:size])
    
        baseMessage := BaseMessage{
            Src:     pktAddr.IP,
            Port:    pktAddr.Port,
            Payload: payloadCut,
        }
        processor.ProcessMessage(baseMessage)
    
        MetricTrafficBytes.With(
            prometheus.Labels{
                "remote_ip":   pktAddr.IP.String(),
                "remote_port": strconv.Itoa(pktAddr.Port),
                "local_ip":    localIP,
                "local_port":  strconv.Itoa(addrUDP.Port),
                "type":        name,
            }).
            Add(float64(size))
        MetricTrafficPackets.With(
            prometheus.Labels{
                "remote_ip":   pktAddr.IP.String(),
                "remote_port": strconv.Itoa(pktAddr.Port),
                "local_ip":    localIP,
                "local_port":  strconv.Itoa(addrUDP.Port),
                "type":        name,
            }).
            Inc()
        MetricPacketSizeSum.With(
            prometheus.Labels{
                "remote_ip":   pktAddr.IP.String(),
                "remote_port": strconv.Itoa(pktAddr.Port),
                "local_ip":    localIP,
                "local_port":  strconv.Itoa(addrUDP.Port),
                "type":        name,
            }).
            Observe(float64(size))
    }
    }```

To Reproduce Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior A clear and concise description of what you expected to happen.

Screenshots If applicable, add screenshots to help explain your problem.

Environment:

Additional context Add any other context about the problem here.