mozilla-services / heka

DEPRECATED: Data collection and processing made easy.
http://hekad.readthedocs.org/
Other
3.39k stars 531 forks source link

KafkaOuput with use_buffering = true never updates checkpoint.txt #1886

Closed ericpai closed 8 years ago

ericpai commented 8 years ago

Heka code branch: versions/0.10 OS: CentOS7 GO version: 1.5.3

When I use KafkaOuput with the following configuration

[lain_kafka_output]
type = "KafkaOutput"
message_matcher = "Type == 'lain.docker_syslog' || Type == 'lain-app-log'"
topic_variable = "Fields[kafka_topic]"
addrs = ["10.100.67.30:9092"]
encoder = "SyslogLainEncoder"
max_buffer_time = 15000
use_buffering = true

[lain_kafka_output.buffering]
max_file_size = 268435456 #256MB
max_buffer_size = 1073741824 #1GB
full_action = "block"

I can read all the messages from Kafka, but the old log files in output_queue/lain_kafka_output are not removed. Here are the content of output_queue/lain_kafka_output

[root@xyz-101 lain_kafka_output]# ll -h
total 537M
-rw-r--r--. 1 root root 256M Mar 15 01:35 0.log
-rw-r--r--. 1 root root 256M Mar 15 09:15 1.log
-rw-r--r--. 1 root root  25M Mar 15 09:59 2.log
-rw-r--r--. 1 root root 676K Mar 15 10:00 3.log
-rw-r--r--. 1 root root    3 Mar 15 09:59 checkpoint.txt
[root@xyz-101 lain_kafka_output]# cat checkpoint.txt
0 0[root@xyz-101 lain_kafka_output]#

I found that the content of checkpoint.txt is always 0 0. In the updateCursor() of queue_buffer.go, the writeCheckpoint() is updated according to the queueCursor. However, in the Run() of kafka_output.go,the updateCursor() is not called if the message is processed successfully. I think this might be the reason.

I also did another test. I used LogOutput instead of KafkaOutput and the checkpoint.txt was updated as expected.

I forked this branch, added updateCursor(), recompiled and tested it under the same environment, the checkpoint.txt was updated as well.

ericpai commented 8 years ago

I can't pass the unit test because the program is hanging when mockOutputRunner calls UpdateCursor(), but it works well under production environment. Is there any issue of the gomock libraries?

trixpan commented 8 years ago

This is a duplicate of

https://github.com/mozilla-services/heka/issues/1749

ericpai commented 8 years ago

@trixpan Thanks for your reply. Your solution is to use SyncProducer instead which may impact the performance. Is there any solution with AsyncProducer?

rafrombrc commented 8 years ago

Fixed in #1887.