flashcatcloud / categraf

one-stop telemetry collector for nightingale
https://flashcat.cloud/docs/
MIT License
854 stars 259 forks source link

协程泄露 #1094

Open showsmall opened 2 days ago

showsmall commented 2 days ago

Relevant config.toml

[global]
print_configs = false
hostname = ""
omit_hostname = false
interval = 15
providers = ["local"]
concurrency = -1
[global.labels]
[log]
file_name = "/Data/Categraf/categraf-v0.3.82-linux-amd64/run.log"
max_size = 100
max_age = 1
max_backups = 7
local_time = true
compress = false
[writer_opt]
batch = 1000
chan_size = 1000000
[[writers]]
url = "http://10.10.2.19:17000/prometheus/v1/write"
basic_auth_user = ""
basic_auth_pass = ""
timeout = 5000
dial_timeout = 2500
max_idle_conns_per_host = 100
[http]
enable = false
address = ":9100"
print_access = false
run_mode = "release"
ignore_hostname = false
agent_host_tag = ""
ignore_global_labels = false
[ibex]
enable = false
interval = "1000ms"
servers = ["127.0.0.1:20090"]
meta_dir = "./meta"
[heartbeat]
enable = true
url = "http://10.10.2.19:17000/v1/n9e/heartbeat"
interval = 10
basic_auth_user = ""
basic_auth_pass = ""
timeout = 5000
dial_timeout = 2500
max_idle_conns_per_host = 100
[prometheus]
enable = false
scrape_config_file = "/path/to/in_cluster_scrape.yaml"
log_level = "info"

Logs from categraf

[logs]
api_key = "ef4ahfbwzwwtlwfpbertgq1i6mq0ab1q"
enable = true
send_to = "xxx.xxx.xxx.xxx"
send_type = "http"
topic = "flashcatcloud"
use_compress = false
send_with_tls = false
batch_wait = 5
run_path = "/Data/Categraf/categraf-v0.3.82-linux-amd64/run"
open_files_limit = 500
scan_period = 10
frame_size = 9000
chan_size = 1000
pipeline=4
kafka_version="3.3.2"
batch_max_concurrence = 0
batch_max_size=100
batch_max_content_size=1000000
producer_timeout= 10
sasl_enable = false
sasl_user = "admin"
sasl_password = "admin"
sasl_mechanism= "PLAIN"
sasl_version=1
sasl_handshake = true
enable_collect_container=false
collect_container_all = false
#总有16个类似的采集规则
  [[logs.items]]
  type = "file"
  path = "/xxx/xxx/xxx/log/error/2024-11-*.log"
  source = "xxx-error"
  service = "xxx-error"
    [[logs.items.log_processing_rules]]
      type = "multi_line"
      name = "new_line_with_date"
      pattern="\\d{4}-\\d{2}-\\d{2}"

System info

v0.3.82

Docker

No response

Steps to reproduce

监控发现突然线程升高: image 经过开启pprof 排查发现:高度怀疑是日志采集这个地方导致的。 (pprof) top Showing nodes accounting for 1468, 99.86% of 1470 total Dropped 49 nodes (cum <= 7) Showing top 10 nodes out of 19 flat flat% sum% cum cum% 1468 99.86% 99.86% 1468 99.86% runtime.gopark 0 0% 99.86% 19 1.29% flashcat.cloud/categraf/agent.(InputReader).startInput 0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/decoder.(Decoder).run 0 0% 99.86% 281 19.12% flashcat.cloud/categraf/logs/decoder.(MultiLineHandler).run 0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/decoder.(SingleLineParser).run 0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/input/file.(Tailer).forwardMessages 0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/input/file.(Tailer).readForever 0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/input/file.(Tailer).wait (inline) 0 0% 99.86% 7 0.48% internal/poll.(FD).Read 0 0% 99.86% 8 0.54% internal/poll.(*pollDesc).wait (pprof)

(pprof) list Tailer Total: 1470 ROUTINE ======================== flashcat.cloud/categraf/logs/input/file.(Tailer).forwardMessages in /root/go/src/sre/categraf/logs/input/file/tailer.go 0 283 (flat, cum) 19.25% of Total . . 244:func (t Tailer) forwardMessages() { . . 245: defer func() { . . 246: // the decoder has successfully been flushed . . 247: atomic.StoreInt32(&t.shouldStop, 1) . . 248: close(t.done) . . 249: }() . 283 250: for output := range t.decoder.OutputChan { . . 251: offset := t.decodedOffset + int64(output.RawDataLen) . . 252: identifier := t.Identifier() . . 253: if !t.shouldTrackOffset() { . . 254: offset = 0 . . 255: identifier = "" ROUTINE ======================== flashcat.cloud/categraf/logs/input/file.(Tailer).readForever in /root/go/src/sre/categraf/logs/input/file/tailer.go 0 283 (flat, cum) 19.25% of Total . . 171:func (t Tailer) readForever() { . . 172: defer t.onStop() . . 173: for { . . 174: n, err := t.read() . . 175: if err != nil { . . 176: return . . 177: } . . 178: t.recordBytes(int64(n)) . . 179: . . 180: select { . . 181: case <-t.stop: . . 182: if n != 0 && atomic.LoadInt32(&t.didFileRotate) == 1 { . . 183: log.Println("W! Tailer stopped after rotation close timeout with remaining unread data") . . 184: } . . 185: // stop reading data from file . . 186: return . . 187: default: . . 188: if n == 0 { . . 189: // wait for new data to come . 283 190: t.wait() . . 191: } . . 192: } . . 193: } . . 194:} . . 195: ROUTINE ======================== flashcat.cloud/categraf/logs/input/file.(Tailer).wait in /root/go/src/sre/categraf/logs/input/file/tailer.go 0 283 (flat, cum) 19.25% of Total . . 312:func (t Tailer) wait() { . 283 313: time.Sleep(t.sleepDuration) . . 314:} . . 315: . . 316:func (t *Tailer) recordBytes(n int64) { . . 317: t.bytesRead += n . . 318: t.file.Source.BytesRead.Add(n) (pprof)

(pprof) list Decoder Total: 1470 ROUTINE ======================== flashcat.cloud/categraf/logs/decoder.(Decoder).run in /root/go/src/sre/categraf/logs/decoder/decoder.go 0 283 (flat, cum) 19.25% of Total . . 225:func (d Decoder) run() { . 283 226: for data := range d.InputChan { . . 227: d.decodeIncomingData(data.content) . . 228: } . . 229: // finish to stop decoder . . 230: d.lineParser.Stop() . . 231:} (pprof)

(pprof) [root@devsystem categraf]# go tool pprof --text ./categraf goroutine.pprof File: categraf Type: goroutine Time: Nov 24, 2024 at 12:07am (CST) Showing nodes accounting for 1468, 99.86% of 1470 total Dropped 49 nodes (cum <= 7) flat flat% sum% cum cum% 1468 99.86% 99.86% 1468 99.86% runtime.gopark 0 0% 99.86% 19 1.29% flashcat.cloud/categraf/agent.(InputReader).startInput 0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/decoder.(Decoder).run 0 0% 99.86% 281 19.12% flashcat.cloud/categraf/logs/decoder.(MultiLineHandler).run 0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/decoder.(SingleLineParser).run 0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/input/file.(Tailer).forwardMessages 0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/input/file.(Tailer).readForever 0 0% 99.86% 283 19.25% flashcat.cloud/categraf/logs/input/file.(Tailer).wait (inline) 0 0% 99.86% 7 0.48% internal/poll.(FD).Read 0 0% 99.86% 8 0.54% internal/poll.(pollDesc).wait 0 0% 99.86% 8 0.54% internal/poll.(pollDesc).waitRead (inline) 0 0% 99.86% 8 0.54% internal/poll.runtime_pollWait 0 0% 99.86% 7 0.48% net.(conn).Read 0 0% 99.86% 7 0.48% net.(netFD).Read 0 0% 99.86% 863 58.71% runtime.chanrecv 0 0% 99.86% 861 58.57% runtime.chanrecv2 0 0% 99.86% 8 0.54% runtime.netpollblock 0 0% 99.86% 311 21.16% runtime.selectgo 0 0% 99.86% 286 19.46% time.Sleep

Expected behavior

协程泄露

Actual behavior

协程泄露

Additional info

No response

showsmall commented 2 days ago

蓝色线是因为我重启后线程降下来的,每次重启后,运行几个小时,或者一天以上或者几天,就会出现这种情况, image image image