I'm trying to add restartability to the TcpOutput plugin and ran into a kind of blocker. I have buffering enabled too. (Using Heka 0.10.0)
I did a basic test implementation by returning a PluginExitError per 100 messages and tried the same by writing to the output runner StopChan. I've set it to restart infinitely.
In both cases, the plugin restarts at first but seems to fail on subsequent restarts with an unmarshalling error from the _queuebuffer.go
_
error: StreamOutput stopped: can't get record: can't unmarshal record: proto: wrong wireType = 0 for field Uuid
_
`
The wire type changes time to time.
It seems like the restart somehow corrupts buffer entries or the checkpoint file.
Following is the change I did to the tcp_output.go - Inside the ProcessMessage method
if t.processMessageCount % 200 == 0 && t.processMessageCount != 0 { return NewPluginExitError("Exiting the Plugin: %s", err) }
I'm trying to add restartability to the TcpOutput plugin and ran into a kind of blocker. I have buffering enabled too. (Using Heka 0.10.0) I did a basic test implementation by returning a PluginExitError per 100 messages and tried the same by writing to the output runner StopChan. I've set it to restart infinitely. In both cases, the plugin restarts at first but seems to fail on subsequent restarts with an unmarshalling error from the _queuebuffer.go
_
_
` The wire type changes time to time. It seems like the restart somehow corrupts buffer entries or the checkpoint file.
Following is the change I did to the tcp_output.go - Inside the ProcessMessage method
if t.processMessageCount % 200 == 0 && t.processMessageCount != 0 { return NewPluginExitError("Exiting the Plugin: %s", err) }