mozilla-services / heka

DEPRECATED: Data collection and processing made easy.
http://hekad.readthedocs.org/
Other
3.4k stars 531 forks source link

heka gets wedged on kakfa output #1172

Closed davidbirdsong closed 9 years ago

davidbirdsong commented 9 years ago

This is from commit: 7fcbd42b32ac7d75e709e73a60eae386a8ad181a

I'll add that I got the same results from similar code that I was testing yesterday for a few hours. The results are always the same: heka gets totally wedged, Queue report shows the kafka output as fully blocked up, and all the the sarama goroutines are all executing the same line of code.

Notably the plugin runner for KafkaOutput is always in:

1462024 github.com/Shopify/sarama.(*brokerProducer).flushIfOverCapacity(0xc20813a580, 0x320000000000001)
1462025     /home/david/src/third-party/heka/build/heka/src/github.com/Shopify/sarama/producer.go:292 +0xa0

While what appear to be the 2 sarama flusher goroutines are both blocked on sending errors on the error channel to our plugin runner:

1462209 goroutine 63 [chan send, 3 minutes]:
1462210 github.com/Shopify/sarama.func·009(0x0, 0x0)
1462211     /home/david/src/third-party/heka/build/heka/src/github.com/Shopify/sarama/producer.go:337 +0xfc
1462212 github.com/Shopify/sarama.(*brokerProducer).flushRequest(0xc20813a500, 0xc20811a000, 0xc2107a8000, 0x5f63, 0x7000, 0x7f16d2829e58, 0x5781)
1462213     /home/david/src/third-party/heka/build/heka/src/github.com/Shopify/sarama/producer.go:394 +0x2ed
....
1462221 goroutine 64 [chan send, 3 minutes]:
1462222 github.com/Shopify/sarama.func·009(0x0, 0x0)
1462223     /home/david/src/third-party/heka/build/heka/src/github.com/Shopify/sarama/producer.go:337 +0xfc
1462224 github.com/Shopify/sarama.(*brokerProducer).flushRequest(0xc20813a580, 0xc20811a000, 0xc2133b2000, 0xad50, 0xb400, 0x7f16d2827e28, 0x8db4)

Here's the full stack dump from sending SIGQUIT. http://f.cl.ly/items/3o2t402E352S262O0L3e/heka-kafa-wedge.txt

Queue report:

2014/11/05 19:13:17 Queue report initiated.
2014/11/05 19:13:17 ========[heka.all-report]========

====Globals====
inputRecycleChan:
    InChanCapacity: 500
    InChanLength: 0
injectRecycleChan:
    InChanCapacity: 500
    InChanLength: 444
Router:
    InChanCapacity: 200
    InChanLength: 153
    ProcessMessageCount: 6.78281e+06

====Inputs====
inbound:
main_accum:

====Decoders====
inbound-10.0.2.20-session_sync_decoder:
    InChanCapacity: 200
    InChanLength: 0
    ProcessMessageCount: 805
    ProcessMessageAvgDuration: 101678
inbound-10.0.2.19-session_sync_decoder:
    InChanCapacity: 200
    InChanLength: 0
    ProcessMessageCount: 4.684581e+06
    ProcessMessageAvgDuration: 74075
inbound-10.0.1.82-session_sync_decoder:
    InChanCapacity: 200
    InChanLength: 0
    ProcessMessageCount: 2.097356e+06
    ProcessMessageAvgDuration: 170969

====Filters====
stats_session_decode_success:
    InChanCapacity: 200
    InChanLength: 0
    MatchChanCapacity: 200
    MatchChanLength: 0
    MatchAvgDuration: 249
stats_session_decode_error:
    InChanCapacity: 200
    InChanLength: 0
    MatchChanCapacity: 200
    MatchChanLength: 0
    MatchAvgDuration: 529
====Outputs====
sessions_to_kafka:
    InChanCapacity: 200
    InChanLength: 200
    MatchChanCapacity: 200
    MatchChanLength: 200
    MatchAvgDuration: 256
    ProcessMessageCount: 6.728459e+06
stats_forwarder:
    InChanCapacity: 200
    InChanLength: 0
    MatchChanCapacity: 200
    MatchChanLength: 0
    MatchAvgDuration: 156
http_dash:
    InChanCapacity: 200
    InChanLength: 0
    MatchChanCapacity: 200
    MatchChanLength: 0
    MatchAvgDuration: 255
bad_match_output:
    InChanCapacity: 200
    InChanLength: 0
    MatchChanCapacity: 200
    MatchChanLength: 0
    MatchAvgDuration: 131
voutput:
    InChanCapacity: 200
    InChanLength: 0
    MatchChanCapacity: 200
    MatchChanLength: 0
    MatchAvgDuration: 116

====Encoders====
sessions_to_kafka-ProtobufEncoder:
    ProcessMessageCount: 6.728459e+06
    ProcessMessageAvgDuration: 5389
stats_forwarder-ProtobufEncoder:
    ProcessMessageCount: 23
    ProcessMessageAvgDuration: 2001
bad_match_output-RstEncoder:
voutput-RstEncoder:
========

My KafkaOutput plugin settings are pretty simple, went with many defaults:

[sessions_to_kafka]
Type = "KafkaOutput"
addrs = ["svac21:8092"]
partitioner = "Hash"
hash_variable = "Fields[kafka_key]"
topic_variable = "Fields[kafka_topic]"
message_matcher = 'Logger == "sessions" && Type != "decode_error"'
encoder = "ProtobufEncoder"
use_framing = false

[ProtobufEncoder]
rafrombrc commented 9 years ago

Resolved in 6222cf03d453a839b9584c1d7b8f84ffe31c508e by moving errChan processing into a separate goroutine.