mozilla-services / heka

DEPRECATED: Data collection and processing made easy.
http://hekad.readthedocs.org/
Other
3.39k stars 530 forks source link

Heka is ok when maxprocs = 1, but it stop after 50-60 seconds when I set maxprocs = 4, why? #1954

Closed wangfeiping closed 7 years ago

wangfeiping commented 8 years ago

Heka is ok when maxprocs = 1, but it stop after 50-60 seconds when I set maxprocs = 4, why? Thanks.

toml

maxprocs = 4
base_dir = "/apps/heka-0_10_0-linux-amd64/share/heka"
share_dir = "/apps/heka-0_10_0-linux-amd64/share/heka"

[log_input]
type="UdpInput"
address = ":514"

[ts]
type = "TokenSplitter"
delimiter = '\n'

[pd]
type = "PayloadRegexDecoder"
match_regex = '(\S*\n)'

[t_input]
type = "TcpInput"
address = ":514"
splitter = "ts"
decoder = "pd"
use_tls = false

[log_filter]
type = "SandboxFilter"
filename = "log_filter_counter.lua"
ticker_interval = 1
message_matcher = "Logger == 'log_input'"
output_limit = 65600

[PayloadEncoder]
append_newlines = false

[FileOutput]
type = "FileOutput"
message_matcher = "Fields[payload_name]=='local_report'"
path = "/apps/logdata/local-160.log"
flush_count = 1000
flush_operator = "OR"
encoder = "PayloadEncoder"

[local_filter]
type = "SandboxFilter"
filename = "local_log_filter.lua"
ticker_interval = 1
message_matcher = "Fields[payload_name]=='local_report'"
output_limit = 65600

[KO]
type = "KafkaOutput"
message_matcher = "Fields[payload_name]=='logFilterCounter'"
partitioner = "RoundRobin"
topic = "LOGBUF30"
addrs = ["10.19.35.160:9092"]
encoder = "ProtobufEncoder"```

### lua

```require "string"
require "os"
require "table"
local toolbox = require("toolbox")

local tabconcat = table.concat

local counter = 0
local INFLUX_LEVEL = ""

function process_message ()
    local payload = read_message("Payload")
    add_to_payload(payload)

    inject_payload("txt", "logFilterCounter")

    counter = counter + 1
    return 0
end

function timer_event(ns)
    influxReport()
    counter = 0
end

function appendInfluxPrefix()
    add_to_payload("<INFLUX> ")
    local dts = toolbox.getOsDatetimeString()
    add_to_payload(dts)
    add_to_payload(" ")
end

function influxReport()
    appendInfluxPrefix()
    local _, t = toolbox.getOsTime(nil, 0)
    local pd = {
                INFLUX_LEVEL,
                ",vip=",
                ",ip=",
                ",topic=LOGBUF",
                " size=", counter,
                ",value=1 ",
                t, "000000000"
    }
    local post_data = tabconcat(pd);
    add_to_payload(post_data, "\n")
    inject_payload("txt", "local_report")
end```

### heka logs

```2016/06/23 14:25:35 Pre-loading: [KO]
2016/06/23 14:25:35 Pre-loading: [pd]
2016/06/23 14:25:35 Pre-loading: [log_filter]
2016/06/23 14:25:35 Pre-loading: [ts]
2016/06/23 14:25:35 Pre-loading: [log_input]
2016/06/23 14:25:35 Pre-loading: [local_filter]
2016/06/23 14:25:35 Pre-loading: [PayloadEncoder]
2016/06/23 14:25:35 Pre-loading: [t_input]
2016/06/23 14:25:35 Pre-loading: [FileOutput]
2016/06/23 14:25:35 Pre-loading: [TcpInput]
2016/06/23 14:25:35 Pre-loading: [ProtobufDecoder]
2016/06/23 14:25:35 Loading: [ProtobufDecoder]
2016/06/23 14:25:35 Pre-loading: [ProtobufEncoder]
2016/06/23 14:25:35 Loading: [ProtobufEncoder]
2016/06/23 14:25:35 Pre-loading: [TokenSplitter]
2016/06/23 14:25:35 Loading: [TokenSplitter]
2016/06/23 14:25:35 Pre-loading: [HekaFramingSplitter]
2016/06/23 14:25:35 Loading: [HekaFramingSplitter]
2016/06/23 14:25:35 Pre-loading: [NullSplitter]
2016/06/23 14:25:35 Loading: [NullSplitter]
2016/06/23 14:25:35 Loading: [pd]
2016/06/23 14:25:35 Loading: [PayloadEncoder]
2016/06/23 14:25:35 Loading: [ts]
2016/06/23 14:25:35 Loading: [log_input]
2016/06/23 14:25:35 Loading: [t_input]
2016/06/23 14:25:35 Loading: [TcpInput]
2016/06/23 14:25:35 Loading: [log_filter]
2016/06/23 14:25:35 Loading: [local_filter]
2016/06/23 14:25:35 Loading: [KO]
2016/06/23 14:25:36 Loading: [FileOutput]
2016/06/23 14:25:36 Starting hekad...
2016/06/23 14:25:36 Output started: KO
2016/06/23 14:25:36 Output started: FileOutput
2016/06/23 14:25:36 Filter started: log_filter
2016/06/23 14:25:36 Filter started: local_filter
2016/06/23 14:25:36 MessageRouter started.
2016/06/23 14:25:36 Input started: log_input
2016/06/23 14:25:36 Input started: t_input
2016/06/23 14:25:36 Input started: TcpInput
2016/06/23 14:30:36 Diagnostics: 32 packs have been idle more than 120 seconds.
2016/06/23 14:30:36 Diagnostics: (input) Plugin names and quantities found on idle packs:
2016/06/23 14:30:36 Diagnostics: 30 packs have been idle more than 120 seconds.
2016/06/23 14:30:36 Diagnostics:    log_filter: 32
2016/06/23 14:30:36 Diagnostics: (inject) Plugin names and quantities found on idle packs:
2016/06/23 14:30:36 
2016/06/23 14:30:36 Diagnostics:    KO: 30
2016/06/23 14:30:36 
2016/06/23 14:31:06 Diagnostics: 32 packs have been idle more than 120 seconds.
2016/06/23 14:31:06 Diagnostics: (input) Plugin names and quantities found on idle packs:
2016/06/23 14:31:06 Diagnostics:    log_filter: 32
2016/06/23 14:31:06 
2016/06/23 14:31:06 Diagnostics: 30 packs have been idle more than 120 seconds.
2016/06/23 14:31:06 Diagnostics: (inject) Plugin names and quantities found on idle packs:
2016/06/23 14:31:06 Diagnostics:    KO: 30
2016/06/23 14:31:06 
2016/06/23 14:31:36 Diagnostics: 32 packs have been idle more than 120 seconds.
2016/06/23 14:31:36 Diagnostics: 30 packs have been idle more than 120 seconds.
2016/06/23 14:31:36 Diagnostics: (inject) Plugin names and quantities found on idle packs:
2016/06/23 14:31:36 Diagnostics:    KO: 30
2016/06/23 14:31:36 
2016/06/23 14:31:36 Diagnostics: (input) Plugin names and quantities found on idle packs:
2016/06/23 14:31:36 Diagnostics:    log_filter: 32
2016/06/23 14:31:36 
2016/06/23 14:32:06 Diagnostics: 32 packs have been idle more than 120 seconds.
2016/06/23 14:32:06 Diagnostics: 30 packs have been idle more than 120 seconds.
2016/06/23 14:32:06 Diagnostics: (input) Plugin names and quantities found on idle packs:
2016/06/23 14:32:06 Diagnostics: (inject) Plugin names and quantities found on idle packs:
2016/06/23 14:32:06 Diagnostics:    KO: 30
2016/06/23 14:32:06 
2016/06/23 14:32:06 Diagnostics:    log_filter: 32
2016/06/23 14:32:06 
2016/06/23 14:32:36 Diagnostics: 32 packs have been idle more than 120 seconds.
2016/06/23 14:32:36 Diagnostics: (input) Plugin names and quantities found on idle packs:
2016/06/23 14:32:36 Diagnostics:    log_filter: 32
2016/06/23 14:32:36 
2016/06/23 14:32:36 Diagnostics: 30 packs have been idle more than 120 seconds.
2016/06/23 14:32:36 Diagnostics: (inject) Plugin names and quantities found on idle packs:
2016/06/23 14:32:36 Diagnostics:    KO: 30```
michaelgibson commented 8 years ago

Increasing your threads probably increased the number of packs in heka's pipeline. When heka runs out of available packs, it will block until it can free up more. You may need to increase either/both the "plugin_chansize"(default: 30) and/or "poolsize"(default: 100) to allow more packs to exist in the daemon.

http://hekad.readthedocs.io/en/latest/config/index.html#global-configuration-options

wangfeiping commented 7 years ago

@michaelgibson Ok, I see, thanks!