elastic / beats

:tropical_fish: Beats - Lightweight shippers for Elasticsearch & Logstash
https://www.elastic.co/products/beats
Other
103 stars 4.92k forks source link

[libbeat] Is there a goroutine leak in libbeat? #19193

Closed erenming closed 4 years ago

erenming commented 4 years ago

Version: 7.6.2+

Hi all!

I'm doing a pressure test for filebeat with docker compose, but I found github.com/elastic/beats/libbeat/reader/readfile.(*TimeoutReader).Next.func1 is continuous growing, just like a goroutine leak.

The presure test condition is described as follows:

  1. i generate 100 file concurrently
  2. and generate log line without sleep for about five seconds
  3. limit the container with 0.2 cpus, 512Mi.

this is memory, cpu utilization and the gouroutine pprof:

image

this is the goroutine compares, and the duration is about 30minutes. image

Then, inorder to verify my hypothesis, i add a variable to monitor the goroutine's number and do a test on my Mac. i change the timeout.go, the code is like this:

var exit int64
func init() {
    go func() {
        for {
            fmt.Printf("\n************\nthe number of Next.Func1=%d\n************\n", exit)
            time.Sleep(time.Second*3)
        }
    }()
}

// Next returns the next line. If no line was returned before timeout, the
// configured timeout error is returned.
// For handline timeouts a goroutine is started for reading lines from
// configured line reader. Only when underlying reader returns an error, the
// goroutine will be finished.
func (r *TimeoutReader) Next() (reader.Message, error) {
    if !r.running {
        r.running = true
        go func() {
            atomic.AddInt64(&exit, 1)
            for {
                message, err := r.reader.Next()
                r.ch <- lineMessage{message, err}
                if err != nil {
                    fmt.Printf("*******get error: %s********\n", err)
                    break
                }
            }
            atomic.AddInt64(&exit, -1)
        }()
    }
    timer := time.NewTimer(r.timeout)
    select {
    case msg := <-r.ch:
        if msg.err != nil {
            r.running = false
        }
        timer.Stop()
        return msg.line, msg.err
    case <-timer.C:
        return reader.Message{}, r.signal
    }
}

Run filebeat, the variable exitNum don't decrease when the close_timeout mechanism triggerd. the output log is like this:

2020-06-15T17:41:53.022+0800    INFO    [crawler]       beater/crawler.go:141   Starting input (ID: %d)5446179678553644909
2020-06-15T17:41:53.022+0800    INFO    [crawler]       beater/crawler.go:108   Loading and starting Inputs completed. Enabled inputs: 1

************
the number of Next.Func1=0
************

************
the number of Next.Func1=0
************

************
the number of Next.Func1=0
************
2020-06-15T17:42:03.028+0800    INFO    log/harvester.go:297    Harvester started for file: /tmp/containers/f004a2d52d22fac5d68dfec0dd48a68f2db4f649a41b94e34a7e4b900a29ec5d/f004a2d52d22fac5d68dfec0dd48a68f2db4f649a41b94e34a7e4b900a29ec5d-json.log

************
the number of Next.Func1=1
************

************
the number of Next.Func1=1
************

************
the number of Next.Func1=1
************
2020-06-15T17:42:13.029+0800    INFO    log/harvester.go:288    Closing harvester because close_timeout was reached: /tmp/containers/f004a2d52d22fac5d68dfec0dd48a68f2db4f649a41b94e34a7e4b900a29ec5d/f004a2d52d22fac5d68dfec0dd48a68f2db4f649a41b94e34a7e4b900a29ec5d-json.log
2020-06-15T17:42:13.029+0800    INFO    harvester/forwarder.go:52       Input outlet closed
2020-06-15T17:42:13.030+0800    INFO    log/harvester.go:297    Harvester started for file: /tmp/containers/f004a2d52d22fac5d68dfec0dd48a68f2db4f649a41b94e34a7e4b900a29ec5d/f004a2d52d22fac5d68dfec0dd48a68f2db4f649a41b94e34a7e4b900a29ec5d-json.log

************
the number of Next.Func1=2
************

************
the number of Next.Func1=2
************

************
the number of Next.Func1=2
************
2020-06-15T17:42:23.020+0800    INFO    [monitoring]    log/log.go:145  Non-zero metrics in the last 30s        {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":8313,"time":{"ms":8313}},"total":{"ticks":42021,"time":{"ms":42022},"value":42021},"user":{"ticks":33708,"time":{"ms":33709}}},"info":{"ephemeral_id":"6363532c-ce3c-4a2c-a976-72279ff808ab","uptime":{"ms":30019}},"memstats":{"gc_next":14654576,"memory_alloc":10392224,"memory_total":11659725544,"rss":43659264},"runtime":{"goroutines":27}},"filebeat":{"events":{"active":496,"added":146421,"done":145925},"harvester":{"closed":1,"files":{"d488c4ab-aaa8-4df9-9d0c-d06a2ea83894":{"last_event_published_time":"2020-06-15T17:42:23.020Z","last_event_timestamp":"2020-06-02T02:34:50.966Z","name":"/tmp/containers/f004a2d52d22fac5d68dfec0dd48a68f2db4f649a41b94e34a7e4b900a29ec5d/f004a2d52d22fac5d68dfec0dd48a68f2db4f649a41b94e34a7e4b900a29ec5d-json.log","read_offset":826518320,"size":1459328465,"start_time":"2020-06-15T17:42:13.030Z"}},"open_files":1,"running":1,"started":2}},"libbeat":{"config":{"module":{"running":0}},"output":{"events":{"acked":145920,"batches":285,"total":145920},"type":"file","write":{"bytes":909353752}},"pipeline":{"clients":1,"events":{"active":492,"filtered":5,"published":146412,"total":146417},"queue":{"acked":145920}}},"registrar":{"states":{"cleanup":1,"current":1,"update":145925},"writes":{"success":288,"total":288}},"system":{"cpu":{"cores":8},"load":{"1":3.6602,"15":3.7178,"5":3.5918,"norm":{"1":0.4575,"15":0.4647,"5":0.449}}}}}}

************
the number of Next.Func1=2
************
2020-06-15T17:42:23.031+0800    INFO    log/harvester.go:288    Closing harvester because close_timeout was reached: /tmp/containers/f004a2d52d22fac5d68dfec0dd48a68f2db4f649a41b94e34a7e4b900a29ec5d/f004a2d52d22fac5d68dfec0dd48a68f2db4f649a41b94e34a7e4b900a29ec5d-json.log

************
the number of Next.Func1=2
************

************
the number of Next.Func1=2
************

************
the number of Next.Func1=2
************
2020-06-15T17:42:33.037+0800    INFO    log/harvester.go:297    Harvester started for file: /tmp/containers/f004a2d52d22fac5d68dfec0dd48a68f2db4f649a41b94e34a7e4b900a29ec5d/f004a2d52d22fac5d68dfec0dd48a68f2db4f649a41b94e34a7e4b900a29ec5d-json.log

************
the number of Next.Func1=3
************

************
the number of Next.Func1=3
************
^C2020-06-15T17:42:38.785+0800  INFO    beater/filebeat.go:369  Stopping filebeat
2020-06-15T17:42:38.785+0800    INFO    beater/crawler.go:148   Stopping Crawler
2020-06-15T17:42:38.785+0800    INFO    beater/crawler.go:158   Stopping 1 inputs
2020-06-15T17:42:38.785+0800    INFO    [crawler]       beater/crawler.go:163   Stopping input: 5446179678553644909
2020-06-15T17:42:38.785+0800    INFO    input/input.go:138      input ticker stopped
2020-06-15T17:42:38.785+0800    INFO    harvester/forwarder.go:52       Input outlet closed
2020-06-15T17:42:38.785+0800    INFO    beater/crawler.go:178   Crawler stopped
2020-06-15T17:42:38.785+0800    INFO    registrar/registrar.go:367      Stopping Registrar
2020-06-15T17:42:38.785+0800    INFO    registrar/registrar.go:293      Ending Registrar
2020-06-15T17:42:38.805+0800    INFO    [monitoring]    log/log.go:153  Total non-zero metrics  {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":10855,"time":{"ms":10855}},"total":{"ticks":55548,"time":{"ms":55548},"value":55548},"user":{"ticks":44693,"time":{"ms":44693}}},"info":{"ephemeral_id":"6363532c-ce3c-4a2c-a976-72279ff808ab","uptime":{"ms":45803}},"memstats":{"gc_next":17527168,"memory_alloc":14666952,"memory_total":16085510736,"rss":43769856},"runtime":{"goroutines":15}},"filebeat":{"events":{"active":705,"added":202018,"done":201313},"harvester":{"closed":3,"open_files":0,"running":0,"started":3}},"libbeat":{"config":{"module":{"running":0}},"output":{"events":{"acked":201818,"batches":395,"total":201818},"type":"file","write":{"bytes":1257734758}},"pipeline":{"clients":0,"events":{"active":193,"filtered":7,"published":202011,"total":202018},"queue":{"acked":201818}}},"registrar":{"states":{"cleanup":1,"current":1,"update":201313},"writes":{"success":399,"total":399}},"system":{"cpu":{"cores":8},"load":{"1":3.5132,"15":3.7046,"5":3.562,"norm":{"1":0.4391,"15":0.4631,"5":0.4453}}}}}}
2020-06-15T17:42:38.805+0800    INFO    [monitoring]    log/log.go:154  Uptime: 45.804771862s
2020-06-15T17:42:38.805+0800    INFO    [monitoring]    log/log.go:131  Stopping metrics logging.
2020-06-15T17:42:38.806+0800    INFO    instance/beat.go:462    filebeat stopped.

and this is my filebeat.yml:

paths: /tmp/filebeat/data
logging:
  level: info
  to_stderr: true
filebeat.inputs:
  - type: container
    enabled: true
    paths: ${INPUTS_CONTAINER_PATH:/tmp/containers/*/*.log}
    stream: ${INPUTS_CONTAINER_STREAM:all}
    encoding: ${INPUTS_CONTAINER_ENCODING:utf-8}
    ignore_older: ${INPUTS_CONTAINER_IGNORE_OLDER:2h}
    max_bytes: ${INPUTS_CONTAINER_MAX_BYTES:51200}
    multiline.pattern: '${TERMINUS_PATTERN:^\d{4}-\d{2}-\d{2}[^\d]+\d{2}:\d{2}:\d{2}[^\s]+\s+([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn(?:ing)?|WARN(?:ING)?|[Ee]rr(?:or)?|ERR(?:OR)?|[Cc]rit(?:ical)?|CRIT(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|[Ee]merg(?:ency)?|EMERG(?:ENCY)?)[\s-]+\[(.*?)][\s-]+}'
    multiline.negate: false
    multiline.match: after
    multiline.max_lines: ${INPUTS_CONTAINER_MULTILINE_MAX_LINES:500}
    multiline.timeout: ${INPUTS_CONTAINER_MULTILINE_TIMEOUT:1s}
    close_inactive: 5m
    close_removed: true
    close_timeout: 2m
    clean_removed: true
    mem_usage_percent: 50
queue:
  mem:
    events: ${QUEUE_MEM_EVENTS:1024}
    flush.min_events: ${QUEUE_MEM_FLUSH_MIN_EVENTS:512}
    flush.timeout: ${QUEUE_MEM_FLUSH_TIMEOUT:10s}
processors:
output.file:
  path: /tmp/filebeat
elasticmachine commented 4 years ago

Pinging @elastic/integrations-services (Team:Services)

urso commented 4 years ago

Thanks for investigating. This indeed looks like a go-routine leak. The internal go-routine quits on error (e.g. EOF), but in case the input is closed early we leak the routine.

Edit: checking the code I think the LineReader would propagate an error forcing this routine to shut down when the input quits. I wonder if we can reproduce the issue with one file only.

erenming commented 4 years ago

@urso Thanks for remind. Today, in order to reproduce, i did a test with only one file. I edit the timeout.go that can report the status of goroutine. And this is the code

// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package readfile

import (
    "errors"
    "fmt"
    "sync/atomic"
    "time"

    "github.com/elastic/beats/v7/libbeat/reader"
)

var (
    errTimeout = errors.New("timeout")
    tid        uint64
)

// TimeoutReader will signal some configurable timeout error if no
// new line can be returned in time.
type TimeoutReader struct {
    id      uint64
    reader  reader.Reader
    timeout time.Duration
    signal  error
    running bool
    ch      chan lineMessage
}

type lineMessage struct {
    line reader.Message
    err  error
}

// NewTimeoutReader returns a new timeout reader from an input line reader.
func NewTimeoutReader(reader reader.Reader, signal error, t time.Duration) *TimeoutReader {
    if signal == nil {
        signal = errTimeout
    }
    atomic.AddUint64(&tid, 1)

    return &TimeoutReader{
        id:      tid,
        reader:  reader,
        signal:  signal,
        timeout: t,
        ch:      make(chan lineMessage, 1),
    }
}

var exit int64

func init() {
    go func() {
        for {
            fmt.Printf("************the number of Next.Func1=%d************\n", exit)
            time.Sleep(time.Second * 5)
        }
    }()
}

// Next returns the next line. If no line was returned before timeout, the
// configured timeout error is returned.
// For handline timeouts a goroutine is started for reading lines from
// configured line reader. Only when underlying reader returns an error, the
// goroutine will be finished.
func (r *TimeoutReader) Next() (reader.Message, error) {
    if !r.running {
        r.running = true
        go func() {
            atomic.AddInt64(&exit, 1)
            fmt.Printf("*******<TimeoutReader: %d>'s routine start*********\n", r.id)
            for {
                message, err := r.reader.Next()
                r.ch <- lineMessage{message, err}
                if err != nil {
                    break
                }
            }
            fmt.Printf("*******<TimeoutReader: %d>'s routine end*********\n", r.id)
            atomic.AddInt64(&exit, -1)
        }()
    }
    timer := time.NewTimer(r.timeout)
    select {
    case msg := <-r.ch:
        if msg.err != nil {
            r.running = false
        }
        timer.Stop()
        return msg.line, msg.err
    case <-timer.C:
        return reader.Message{}, r.signal
    }
}

And i find something strange. When i add line to the log file every 1 seconds, the the goroutine can quit normally. this is the log normal.log But when i already generate a log file with about 40,000 lines, some of the goroutine leaks. this is the log for this scene. leak.log So, is there some mechanism hidden? this is filebeat.yml:

paths: ./data
logging:
  level: info
  to_stderr: true
filebeat.inputs:
  - type: container
    enabled: true
    paths: ${INPUTS_CONTAINER_PATH:/tmp/containers/f004a2d52d22fac5d68dfec0dd48a68f2db4f649a41b94e34a7e4b900a29ec5d/f004a2d52d22fac5d68dfec0dd48a68f2db4f649a41b94e34a7e4b900a29ec5d-json.log}
    stream: ${INPUTS_CONTAINER_STREAM:all}
    encoding: ${INPUTS_CONTAINER_ENCODING:utf-8}
    ignore_older: ${INPUTS_CONTAINER_IGNORE_OLDER:2h}
    max_bytes: ${INPUTS_CONTAINER_MAX_BYTES:51200}
    multiline.pattern: '${TERMINUS_PATTERN:^\d{4}-\d{2}-\d{2}[^\d]+\d{2}:\d{2}:\d{2}[^\s]+\s+([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn(?:ing)?|WARN(?:ING)?|[Ee]rr(?:or)?|ERR(?:OR)?|[Cc]rit(?:ical)?|CRIT(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|[Ee]merg(?:ency)?|EMERG(?:ENCY)?)[\s-]+\[(.*?)][\s-]+}'
    multiline.negate: false
    multiline.match: after
    multiline.max_lines: ${INPUTS_CONTAINER_MULTILINE_MAX_LINES:500}
    multiline.timeout: ${INPUTS_CONTAINER_MULTILINE_TIMEOUT:1s}
    close_inactive: 3m
    close_removed: true
    close_timeout: 10s
    clean_removed: true
queue:
  mem:
    events: ${QUEUE_MEM_EVENTS:1024}
    flush.min_events: ${QUEUE_MEM_FLUSH_MIN_EVENTS:512}
    flush.timeout: ${QUEUE_MEM_FLUSH_TIMEOUT:10s}
processors:
output.file:
  path: /tmp/filebeat
erenming commented 4 years ago

Hi all, i think i may found the reason of goroutine leak. When close_timeout event is triggered, input(harvester) is doing some clean up. But at this point, the goroutine is still invoke r.reader.Next(), and that will block at r.ch <- lineMessage{message, err} . After input's clean up done, even the LineReader return a error, but the routine is already blocked. Then, the goroutine leak.

For verification, i use a another goroutine to run the channel send. As a result, the Next.Func1 goroutine will not block and exit normally. the code is like this

func (r *TimeoutReader) Next() (reader.Message, error) {
    if !r.running {
        r.running = true
        go func() {
            for {
                message, err := r.reader.Next()
                go func() {
                    r.ch <- lineMessage{message, err}
                }()
                if err != nil {
                    break
                }
            }
        }()
    }
    timer := time.NewTimer(r.timeout)
    select {
    case msg := <-r.ch:
        if msg.err != nil {
            r.running = false
        }
        timer.Stop()
        return msg.line, msg.err
    case <-timer.C:
        return reader.Message{}, r.signal
    }
}
urso commented 4 years ago

Thanks for investigating and all the details you shared. I think the issue is very clear now.

Thinking about this and other potential issues I'd say the reader interface should also require io.Closer, so we can ensure a more structured top-down shutdown as well.