joekiller / logstash-kafka

Kafka plugin for Logstash
Apache License 2.0
206 stars 63 forks source link

Logstash Producer Error #20

Closed vjoonutula closed 10 years ago

vjoonutula commented 10 years ago

Hi, As part of POC project, I started exploring Kafka as a replacement of Redis by using logstash-kafka plugin but during initial test runs, I am receiving following exception on Producer and Kafka broker,

Setup :-

Logstash instance with Kafka output --> Kafka Broker and Zookeeper --> Logstash instance with Kafka input

Producer conf -> kafka { broker_list => "x.x.x.x.x:9092" }

Error :-

output received {:event=>{"message"=>"test", "@version"=>"1", "@timestamp"=>"2014-05-17T10:43:46.197Z", "type"=>"stdin-type", "host"=>"XXXXXXXX"}, :level=>:debug, :file=>"(eval)", :line=>"16"}

kafka producer threw exception, restarting {:exception=>#<KafkaError: Got FailedToSendMessageException: Failed to send messages after 3 tries.>, :level=>:warn, :file=>"logstash/outputs/kafka.rb", :line=>"69"}

On Kafka Server I am seeing following logs :-

2014-05-17 03:33:14,553] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log) [2014-05-17 03:33:14,554] INFO Created log for partition [test,0] in /tmp/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager) [2014-05-17 03:33:14,555] WARN Partition [test,0] on broker 0: No checkpointed highwatermark is found for partition test,0 [2014-05-17 03:33:14,615] INFO Closing socket connection to /x.x.x.x. (kafka.network.Processor) [2014-05-17 03:33:14,969] INFO Closing socket connection to /x.x.x.x. (kafka.network.Processor) [2014-05-17 03:33:15,077] INFO Closing socket connection to /x.x.x.x. (kafka.network.Processor) [2014-05-17 03:33:15,186] INFO Closing socket connection to /x.x.x.x. (kafka.network.Processor)

Thanks Venkat

joekiller commented 10 years ago

Can you run logstash with a -v to turn on verbose logging? Also please post the full config file. Are you running on windows or Linux? Finally did you build the plugin manually or via the makefile? On May 17, 2014 3:10 PM, "vjoonutula" notifications@github.com wrote:

Hi, As part of POC project, I started exploring Kafka as a replacement of Redis by using logstash-kafka plugin but during initial test runs, I am receiving following exception on Producer and Kafka broker,

Setup :-

Logstash instance with Kafka output --> Kafka Broker and Zookeeper --> Logstash instance with Kafka input

Producer conf -> kafka { broker_list => "x.x.x.x.x:9092" }

Error :-

output received {:event=>{"message"=>"test", "@versionhttps://github.com/version"=>"1", "@timestamp https://github.com/timestamp"=>"2014-05-17T10:43:46.197Z", "type"=>"stdin-type", "host"=>"XXXXXXXX"}, :level=>:debug, :file=>"(eval)", :line=>"16"}

kafka producer threw exception, restarting {:exception=>#, :level=>:warn, :file=>"logstash/outputs/kafka.rb", :line=>"69"}

On Kafka Server I am seeing following logs :-

2014-05-17 03:33:14,553] INFO Completed load of log test-0 with log end offset 0 (kafka.log.Log) [2014-05-17 03:33:14,554] INFO Created log for partition [test,0] in /tmp/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager) [2014-05-17 03:33:14,555] WARN Partition [test,0] on broker 0: No checkpointed highwatermark is found for partition test,0http://kafka.cluster.Partition [2014-05-17 03:33:14,615] INFO Closing socket connection to /x.x.x.x. (kafka.network.Processor) [2014-05-17 03:33:14,969] INFO Closing socket connection to /x.x.x.x. (kafka.network.Processor) [2014-05-17 03:33:15,077] INFO Closing socket connection to /x.x.x.x. (kafka.network.Processor) [2014-05-17 03:33:15,186] INFO Closing socket connection to /x.x.x.x. (kafka.network.Processor)

Thanks Venkat

— Reply to this email directly or view it on GitHubhttps://github.com/joekiller/logstash-kafka/issues/20 .

vjoonutula commented 10 years ago

Hi,

I built plugin using makefile using default version's for JRuby, Logstash and Scala. Current POC is being performed on CentOS release 6.4 version and Kafka broker version is "kafka_2.8.0-0.8.1".

Logstash.conf,

input { stdin { type => "stdin-type" } } output { kafka { broker_list => "x.x.x.x:9092" } }

Logs with Verbose option:

Using milestone 1 output plugin 'kafka'. This plugin should work, but would benefit from use by folks like you. Please let us know if you find bugs or have suggestions on how to improve this plugin. For more information on plugin milestones, see http://logstash.net/docs/1.4.0/plugin-milestones {:level=>:warn, :file=>"logstash/config/mixin.rb", :line=>"209"} config LogStash::Codecs::JSON/@charset = "UTF-8" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@broker_list = "x.x.x.x:9092" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@type = "" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@tags = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@exclude_tags = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@codec = <LogStash::Codecs::JSON charset=>"UTF-8"> {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@workers = 1 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@topic_id = "test" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@compression_codec = "none" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@compressed_topics = "" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@request_required_acks = 0 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@serializer_class = "kafka.serializer.StringEncoder" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@partitioner_class = "kafka.producer.DefaultPartitioner" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@request_timeout_ms = 10000 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@producer_type = "sync" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@key_serializer_class = nil {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@message_send_max_retries = 3 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@retry_backoff_ms = 100 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@topic_metadata_refresh_interval_ms = 600000 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@queue_buffering_max_ms = 5000 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@queue_buffering_max_messages = 10000 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@queue_enqueue_timeout_ms = -1 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@batch_num_messages = 200 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@send_buffer_bytes = 102400 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} config LogStash::Outputs::Kafka/@client_id = "" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"105"} Pipeline started {:level=>:info, :file=>"logstash/pipeline.rb", :line=>"78"}

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). log4j:WARN Please initialize the log4j system properly. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Registering kafka producer {:topic_id=>"test", :broker_list=>"x.x.x.x:9092", :level=>:info, :file=>"logstash/outputs/kafka.rb", :line=>"61"}

testmessage

output received {:event=>{"message"=>"testmessage", "@version"=>"1", "@timestamp"=>"2014-05-18T02:50:20.865Z", "type"=>"stdin-type", "host"=>"y.y.y.y"}, :level=>:debug, :file=>"(eval)", :line=>"16"} kafka producer threw exception, restarting {:exception=>#<KafkaError: Got FailedToSendMessageException: Failed to send messages after 3 tries.>, :level=>:warn, :file=>"logstash/outputs/kafka.rb", :line=>"69"}

Thanks Venkat J

vjoonutula commented 10 years ago

Hi,

Issue seems to be resolved after changing hostname variable in Zookeeper config's from localhost to IP address.

Thanks for your help and great tool,

Thanks, Venkat