mozilla-services / heka

DEPRECATED: Data collection and processing made easy.
http://hekad.readthedocs.org/
Other
3.39k stars 531 forks source link

Heka HttpOutput issue: Diagnostics: 100 packs have been idle more than 120 seconds #1379

Open hejinlong opened 9 years ago

hejinlong commented 9 years ago

I have a problem, need help. Thanks.

Heka version is 0.8.3

The detail is in following:

2015/02/28 11:21:23 {"metric":"trend_nginx", "timestamp":1425093683,"tags":{"remote_addr":"172.16.89.127","status":200,"target":"ad","from":"1051293010","uid":"1782536072"},"value":0.017}
2015/02/28 11:23:46 Diagnostics: 100 packs have been idle more than 120 seconds.
2015/02/28 11:23:46 Diagnostics: (input) Plugin names and quantities found on idle packs:
2015/02/28 11:23:46 Diagnostics:    LogOutput: 100
2015/02/28 11:23:46 Diagnostics:    HttpOutput: 51
2015/02/28 11:23:46 
2015/02/28 11:24:16 Diagnostics: 100 packs have been idle more than 120 seconds.
2015/02/28 11:24:16 Diagnostics: (input) Plugin names and quantities found on idle packs:
2015/02/28 11:24:16 Diagnostics:    HttpOutput: 51
2015/02/28 11:24:16 Diagnostics:    LogOutput: 100
2015/02/28 11:24:16 

hekad toml file:

[hekad]
maxprocs = 8
#poolsize = 1000
#plugin_chansize = 30

[:NginxLogInput]
type = "LogstreamerInput"
log_directory = "/data0/nginx/logs/"
file_match = 'access\.log'
decoder = "NginxLogDecoder"

[NginxLogDecoder]
type = "SandboxDecoder"
filename = "lua_decoders/nginx_access.lua"

[NginxLogDecoder.config]
log_format = '$remote_addr - $remote_user [$time_local] "$request" $request_body $status $body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for"  "$request_time"'
type = "nginx-access"

[TrendnginxEncoder]

[LogOutput]
type = "LogOutput"
message_matcher = "Type == 'nginx-access'"
encoder = "TrendnginxEncoder"

[HttpOutput]
message_matcher = "Type == 'nginx-access'"
address = "http://127.0.0.1:4242/api/put"
encoder = "TrendnginxEncoder"
rafrombrc commented 9 years ago

The diagnostic messages you've got are saying that messages are being fed into your output plugins but they're never being freed up. Whenever a message (aka a "pack") is fed into a filter or an output plugin, Heka records that information. When the message has finished processing and is recycled for reuse, this is also recorded. Your output is saying that 100 messages have been fed in to the LogOutput, but even a full 2 minutes later they have not yet been recycled, which is clearly no good, 2 minutes is an eternity of time for a plugin to process a message.

The LogOutput code for processing a message is very minimal. The message is encoded, either an error is logged or the encoding output is logged, and the pack is recycled. My guess, then, is that something is happening inside the encoder such that the Encode() call is hanging, or at least is taking a very long time to return, gumming up the works.

You seem to be using a custom encoder. Is the source code to that available anywhere?

hejinlong commented 9 years ago

Thank you rafrombrc.

I use Heka to collect nginx access log for Opentsdb. Heka is a real-time, and Opentsdb is the second, I suspect it is because the problem of time difference. Opentsdb is too busy dealing with the past.

BTW, Encoder source code is following:

package plugins

import (
    "bytes"
    "encoding/base64"
    "fmt"
    "github.com/mozilla-services/heka/message"
    "github.com/mozilla-services/heka/pipeline"
    "strconv"
    "strings"
    "time"
)

// TrendnginxEncoder generates a restructured text rendering of a Heka message,
// useful for debugging.
type TrendnginxEncoder struct {
    typeNames []string
}

func (re *TrendnginxEncoder) Init(config interface{}) (err error) {
    if err != nil {
        continue
    } 
    re.typeNames = make([]string, len(message.Field_ValueType_name))
    for i, typeName := range message.Field_ValueType_name {
        re.typeNames[i] = strings.ToLower(typeName)
    }
    return
}

func (re *TrendnginxEncoder) writeAttr(buf *bytes.Buffer, name, value string) {
    buf.WriteString(fmt.Sprintf("%s", value))
}

func (re *TrendnginxEncoder) writeField(buf *bytes.Buffer, name, typeName, repr string,
    values []string) {
    ....
}

func (re *TrendnginxEncoder) Encode(pack *pipeline.PipelinePack) (output []byte, err error) {
    // Writing out the message attributes is easy.
    buf := new(bytes.Buffer)
    //timestamp := time.Unix(0, pack.Message.GetTimestamp()).UTC()
    //re.writeAttr(buf, "Timestamp", timestamp.String())
    t := time.Now().Unix()
    buf.WriteString(fmt.Sprintf("{\"metric\":\"trend_nginx\", \"timestamp\":%d", t))
    // Writing out the dynamic message fields is a bit of a PITA.
    fields := pack.Message.GetFields()
    if len(fields) > 0 {
        v1 := "";
        for _, field := range fields {
            pack.Recycle()
            ...
        }
        buf.WriteString(fmt.Sprintf(",\"value\":%s",v1))
    }
    buf.WriteString("}")
    return buf.Bytes(), nil
}

func init() {
    pipeline.RegisterPlugin("TrendnginxEncoder", func() interface{} {
        return new(TrendnginxEncoder)
    })
}
rafrombrc commented 9 years ago

You've got a pack.Recycle() call inside the for loop that's iterating through your message fields. This is wrong for a couple of reasons. First, encoders shouldn't be recycling packs at all; the output plugin "owns" the pack, so recycling should happen in the output. Second, even if it were appropriate to be recycling the pack inside your encoder code, any given plugin should only recycle a pack one time. Recycling the same pack over and over again is definitely going to cause problems, possibly including (but not limited to) the idle pack messages you're seeing in your output.

Try removing that line from your code and see if the problem goes away.

hejinlong commented 9 years ago

Thank you very much, rafrombrc !

It is still a problem.

Heka is too busy, output datas GT 2000 at the same time,

How can Heka http out many datas per second?

2015/03/10 14:42:33 {"metric":"trend_nginx", "timestamp":1425969753,"tags":{"remote_addr":"202.106.185.37","status":404},"value":0}
2015/03/10 14:42:33 {"metric":"trend_nginx", "timestamp":1425969753,"tags":{"remote_addr":"202.106.185.37","status":404},"value":0}
2015/03/10 14:42:33 {"metric":"trend_nginx", "timestamp":1425969753,"tags":{"remote_addr":"202.106.185.37","status":404},"value":0}
2015/03/10 14:42:33 Plugin 'HttpOutput' error: HTTP Error code returned: 500 500 Internal Server Error - {"error":{"code":500,"message":"Should never be here","trace":"java.lang.RuntimeException: Should never be here\n\tat net.opentsdb.uid.UniqueId.getOrCreateId(UniqueId.java:602) ~[tsdb-2.0.1.jar:]\n\tat net.opentsdb.core.Tags.resolveAllInternal(Tags.java:386) ~[tsdb-2.0.1.jar:]\n\tat net.opentsdb.core.Tags.resolveOrCreateAll(Tags.java:373) ~[tsdb-2.0.1.jar:]\n\tat net.opentsdb.core.IncomingDataPoints.rowKeyTemplate(IncomingDataPoints.java:135) ~[tsdb-2.0.1.jar:]\n\tat net.opentsdb.core.TSDB.addPointInternal(TSDB.java:640) ~[tsdb-2.0.1.jar:]\n\tat net.opentsdb.core.TSDB.addPoint(TSDB.java:549) ~[tsdb-2.0.1.jar:]\n\tat net.opentsdb.tsd.PutDataPointRpc.execute(PutDataPointRpc.java:143) ~[tsdb-2.0.1.jar:]\n\tat net.opentsdb.tsd.RpcHandler.handleHttpQuery(RpcHandler.java:255) [tsdb-2.0.1.jar:]\n\tat net.opentsdb.tsd.RpcHandler.messageReceived(RpcHandler.java:163) [tsdb-2.0.1.jar:]\n\tat org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:459) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88) [netty-3.9.4.Final.jar:na]\n\tat net.opentsdb.tsd.ConnectionManager.handleUpstream(ConnectionManager.java:87) [tsdb-2.0.1.jar:]\n\tat org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [netty-3.9.4.Final.jar:na]\n\tat org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [netty-3.9.4.Final.jar:na]\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_75]\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(Th
2015/03/10 14:42:33 {"metric":"trend_nginx", "timestamp":1425969753,"tags":{"remote_addr":"202.106.185.37","status":404},"value":0}
2015/03/10 14:45:03 Diagnostics: 100 packs have been idle more than 120 seconds.
2015/03/10 14:45:03 Diagnostics: (input) Plugin names and quantities found on idle packs:
2015/03/10 14:45:03 Diagnostics:    LogOutput: 100
2015/03/10 14:45:03 Diagnostics:    HttpOutput: 51
2015/03/10 14:45:03 
rafrombrc commented 9 years ago

OpenTSDB might be causing some problem for you, but I don't think that the "Diagnostics" errors that you're seeing are related. The fact that the # of packs trapped in LogOutput and HttpOutput are different, even though they have the same message_matcher, and the fact that LogOutput seems to have more packs trapped than HttpOutput, even though it's clearly doing much less work, all point to something inside of Heka and, more specifically, something inside of your Encoder.