Open nealabh opened 4 years ago
VERSION: Logstash 5.6 Kafka: 2.2.1 Kafka output plugin 5.1.11
OS: Ubuntu 16.04
CONFIG: input { elasticsearch { hosts => "xxx:9200" size => 1000 index => "xxx" scroll => "5m" docinfo => true } } output { kafka { bootstrap_servers => "xxx:9094" codec => json topic_id => "xxx" ssl => true ssl_truststore_location => "/tmp/kafka.client.truststore.jks" } }
ERROR: [2020-04-30T12:11:28,697][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name bufferpool-wait-time [2020-04-30T12:11:28,699][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name buffer-exhausted-records [2020-04-30T12:11:28,735][INFO ][org.apache.kafka.clients.producer.KafkaProducer] Closing the Kafka producer with timeoutMillis = 0 ms. [2020-04-30T12:11:28,736][DEBUG][org.apache.kafka.clients.producer.KafkaProducer] The Kafka producer has closed. [2020-04-30T12:11:28,738][ERROR][logstash.outputs.kafka ] Unable to create Kafka producer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer} [2020-04-30T12:11:28,740][ERROR][logstash.pipeline ] Error registering plugin {:plugin=>"#<LogStash::OutputDelegator:0x580afd45 @namespaced_metric=#<LogStash::Instrument::NamespacedMetric:0x3ff04e36 @metric=#<LogStash::Instrument::Metric:0x233ba0df @collector=#<LogStash::Instrument::Collector:0x4063a978 @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x4f0c94e4 @store=#, @structured_lookup_mutex=#, @fast_lookup=#>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs, :\"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2\"]>, @metric=#<LogStash::Instrument::NamespacedMetric:0x5106c983 @metric=#<LogStash::Instrument::Metric:0x233ba0df @collector=#<LogStash::Instrument::Collector:0x4063a978 @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x4f0c94e4 @store=#, @structured_lookup_mutex=#, @fast_lookup=#>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs]>, @logger=#<LogStash::Logging::Logger:0xf557fe4 @logger=#>, @out_counter=LogStash::Instrument::MetricType::Counter - namespaces: [:stats, :pipelines, :main, :plugins, :outputs, :\"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2\", :events] key: out value: 0, @in_counter=LogStash::Instrument::MetricType::Counter - namespaces: [:stats, :pipelines, :main, :plugins, :outputs, :\"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2\", :events] key: in value: 0, @strategy=#<LogStash::OutputDelegatorStrategies::Shared:0x34fe0e7b @output=<LogStash::Outputs::Kafka bootstrap_servers=>\"b-1.host:9094,host2:9094\", codec=><LogStash::Codecs::JSON id=>\"json_92740a35-849e-4794-90d9-64ab2acfe240\", enable_metric=>true, charset=>\"UTF-8\">, topic_id=>\"test-migration-topic\", ssl=>true, ssl_truststore_location=>\"/tmp/kafka.client.truststore.jks\", id=>\"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2\", enable_metric=>true, workers=>1, acks=>\"1\", batch_size=>16384, block_on_buffer_full=>true, buffer_memory=>33554432, compression_type=>\"none\", key_serializer=>\"org.apache.kafka.common.serialization.StringSerializer\", linger_ms=>0, max_request_size=>1048576, metadata_fetch_timeout_ms=>60000, metadata_max_age_ms=>300000, receive_buffer_bytes=>32768, reconnect_backoff_ms=>10, retry_backoff_ms=>100, send_buffer_bytes=>131072, security_protocol=>\"PLAINTEXT\", sasl_mechanism=>\"GSSAPI\", timeout_ms=>30000, value_serializer=>\"org.apache.kafka.common.serialization.StringSerializer\">>, @id=\"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2\", @time_metric=LogStash::Instrument::MetricType::Counter - namespaces: [:stats, :pipelines, :main, :plugins, :outputs, :\"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2\", :events] key: duration_in_millis value: 0, @metric_events=#<LogStash::Instrument::NamespacedMetric:0x1070ed5f @metric=#<LogStash::Instrument::Metric:0x233ba0df @collector=#<LogStash::Instrument::Collector:0x4063a978 @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x4f0c94e4 @store=#, @structured_lookup_mutex=#, @fast_lookup=#>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs, :\"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2\", :events]>, @output_class=LogStash::Outputs::Kafka>", :error=>"Failed to construct kafka producer"} [2020-04-30T12:11:28,748][ERROR][logstash.agent ] Pipeline aborted due to error {:exception=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer, :backtrace=>["org.apache.kafka.clients.producer.KafkaProducer.(org/apache/kafka/clients/producer/KafkaProducer.java:335)", "org.apache.kafka.clients.producer.KafkaProducer.(org/apache/kafka/clients/producer/KafkaProducer.java:188)", "java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:423)", "RUBY.create_producer(/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.11/lib/logstash/outputs/kafka.rb:334)", "RUBY.register(/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.11/lib/logstash/outputs/kafka.rb:195)", "RUBY.register(/usr/share/logstash/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:9)", "RUBY.register(/usr/share/logstash/logstash-core/lib/logstash/output_delegator.rb:43)", "RUBY.register_plugin(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:290)", "RUBY.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:301)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1613)", "RUBY.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:301)", "RUBY.start_workers(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:310)", "RUBY.run(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:235)", "RUBY.start_pipeline(/usr/share/logstash/logstash-core/lib/logstash/agent.rb:408)", "java.lang.Thread.run(java/lang/Thread.java:748)"]} [2020-04-30T12:11:28,759][DEBUG][logstash.agent ] Starting puma [2020-04-30T12:11:28,759][DEBUG][logstash.agent ] Trying to start WebServer {:port=>9600} [2020-04-30T12:11:28,761][DEBUG][logstash.api.service ] [api-service] start [2020-04-30T12:11:28,781][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} [2020-04-30T12:11:31,761][DEBUG][logstash.instrument.periodicpoller.os] PeriodicPoller: Stopping [2020-04-30T12:11:31,761][DEBUG][logstash.instrument.periodicpoller.jvm] PeriodicPoller: Stopping [2020-04-30T12:11:31,761][DEBUG][logstash.instrument.periodicpoller.persistentqueue] PeriodicPoller: Stopping [2020-04-30T12:11:31,762][DEBUG][logstash.instrument.periodicpoller.deadletterqueue] PeriodicPoller: Stopping [2020-04-30T12:11:31,765][WARN ][logstash.agent ] stopping pipeline {:id=>"main"} [2020-04-30T12:11:31,766][DEBUG][logstash.pipeline ] Closing inputs [2020-04-30T12:11:31,769][DEBUG][logstash.inputs.elasticsearch] stopping {:plugin=>"LogStash::Inputs::Elasticsearch"} [2020-04-30T12:11:31,769][DEBUG][logstash.pipeline ] Closed inputs
Steps to reproduce: Use these version with this config, I was able to Send data without SSL (PLAIN TEXT) but it fails with SSL.
on the other hand using cli I am able to do this with same settings https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html security.protocol=SSL ssl.truststore.location=/tmp/kafka.client.truststore.jks
Not sure if the versions listed are not supported or the bug is a plugin problem. Or my config is missing something. Please help.
I have to use Logstash 5.6 because my Input is Elasticsearch 2.3.5, so that's the max I can go.
VERSION: Logstash 5.6 Kafka: 2.2.1 Kafka output plugin 5.1.11
OS: Ubuntu 16.04
CONFIG: input { elasticsearch { hosts => "xxx:9200" size => 1000 index => "xxx" scroll => "5m" docinfo => true } } output { kafka { bootstrap_servers => "xxx:9094" codec => json topic_id => "xxx" ssl => true
ssl_truststore_location => "/tmp/kafka.client.truststore.jks" } }
ERROR: [2020-04-30T12:11:28,697][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name bufferpool-wait-time [2020-04-30T12:11:28,699][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name buffer-exhausted-records [2020-04-30T12:11:28,735][INFO ][org.apache.kafka.clients.producer.KafkaProducer] Closing the Kafka producer with timeoutMillis = 0 ms. [2020-04-30T12:11:28,736][DEBUG][org.apache.kafka.clients.producer.KafkaProducer] The Kafka producer has closed. [2020-04-30T12:11:28,738][ERROR][logstash.outputs.kafka ] Unable to create Kafka producer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer} [2020-04-30T12:11:28,740][ERROR][logstash.pipeline ] Error registering plugin {:plugin=>"#<LogStash::OutputDelegator:0x580afd45 @namespaced_metric=#<LogStash::Instrument::NamespacedMetric:0x3ff04e36 @metric=#<LogStash::Instrument::Metric:0x233ba0df @collector=#<LogStash::Instrument::Collector:0x4063a978 @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x4f0c94e4 @store=#, @structured_lookup_mutex=#, @fast_lookup=#>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs, :\"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2\"]>, @metric=#<LogStash::Instrument::NamespacedMetric:0x5106c983 @metric=#<LogStash::Instrument::Metric:0x233ba0df @collector=#<LogStash::Instrument::Collector:0x4063a978 @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x4f0c94e4 @store=#, @structured_lookup_mutex=#, @fast_lookup=#>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs]>, @logger=#<LogStash::Logging::Logger:0xf557fe4 @logger=#>, @out_counter=LogStash::Instrument::MetricType::Counter - namespaces: [:stats, :pipelines, :main, :plugins, :outputs, :\"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2\", :events] key: out value: 0, @in_counter=LogStash::Instrument::MetricType::Counter - namespaces: [:stats, :pipelines, :main, :plugins, :outputs, :\"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2\", :events] key: in value: 0, @strategy=#<LogStash::OutputDelegatorStrategies::Shared:0x34fe0e7b @output=<LogStash::Outputs::Kafka bootstrap_servers=>\"b-1.host:9094,host2:9094\", codec=><LogStash::Codecs::JSON id=>\"json_92740a35-849e-4794-90d9-64ab2acfe240\", enable_metric=>true, charset=>\"UTF-8\">, topic_id=>\"test-migration-topic\", ssl=>true, ssl_truststore_location=>\"/tmp/kafka.client.truststore.jks\", id=>\"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2\", enable_metric=>true, workers=>1, acks=>\"1\", batch_size=>16384, block_on_buffer_full=>true, buffer_memory=>33554432, compression_type=>\"none\", key_serializer=>\"org.apache.kafka.common.serialization.StringSerializer\", linger_ms=>0, max_request_size=>1048576, metadata_fetch_timeout_ms=>60000, metadata_max_age_ms=>300000, receive_buffer_bytes=>32768, reconnect_backoff_ms=>10, retry_backoff_ms=>100, send_buffer_bytes=>131072, security_protocol=>\"PLAINTEXT\", sasl_mechanism=>\"GSSAPI\", timeout_ms=>30000, value_serializer=>\"org.apache.kafka.common.serialization.StringSerializer\">>, @id=\"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2\", @time_metric=LogStash::Instrument::MetricType::Counter - namespaces: [:stats, :pipelines, :main, :plugins, :outputs, :\"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2\", :events] key: duration_in_millis value: 0, @metric_events=#<LogStash::Instrument::NamespacedMetric:0x1070ed5f @metric=#<LogStash::Instrument::Metric:0x233ba0df @collector=#<LogStash::Instrument::Collector:0x4063a978 @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x4f0c94e4 @store=#, @structured_lookup_mutex=#, @fast_lookup=#>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs, :\"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2\", :events]>, @output_class=LogStash::Outputs::Kafka>", :error=>"Failed to construct kafka producer"}
[2020-04-30T12:11:28,748][ERROR][logstash.agent ] Pipeline aborted due to error {:exception=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer, :backtrace=>["org.apache.kafka.clients.producer.KafkaProducer.(org/apache/kafka/clients/producer/KafkaProducer.java:335)", "org.apache.kafka.clients.producer.KafkaProducer.(org/apache/kafka/clients/producer/KafkaProducer.java:188)", "java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:423)", "RUBY.create_producer(/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.11/lib/logstash/outputs/kafka.rb:334)", "RUBY.register(/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.11/lib/logstash/outputs/kafka.rb:195)", "RUBY.register(/usr/share/logstash/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:9)", "RUBY.register(/usr/share/logstash/logstash-core/lib/logstash/output_delegator.rb:43)", "RUBY.register_plugin(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:290)", "RUBY.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:301)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1613)", "RUBY.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:301)", "RUBY.start_workers(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:310)", "RUBY.run(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:235)", "RUBY.start_pipeline(/usr/share/logstash/logstash-core/lib/logstash/agent.rb:408)", "java.lang.Thread.run(java/lang/Thread.java:748)"]}
[2020-04-30T12:11:28,759][DEBUG][logstash.agent ] Starting puma
[2020-04-30T12:11:28,759][DEBUG][logstash.agent ] Trying to start WebServer {:port=>9600}
[2020-04-30T12:11:28,761][DEBUG][logstash.api.service ] [api-service] start
[2020-04-30T12:11:28,781][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2020-04-30T12:11:31,761][DEBUG][logstash.instrument.periodicpoller.os] PeriodicPoller: Stopping
[2020-04-30T12:11:31,761][DEBUG][logstash.instrument.periodicpoller.jvm] PeriodicPoller: Stopping
[2020-04-30T12:11:31,761][DEBUG][logstash.instrument.periodicpoller.persistentqueue] PeriodicPoller: Stopping
[2020-04-30T12:11:31,762][DEBUG][logstash.instrument.periodicpoller.deadletterqueue] PeriodicPoller: Stopping
[2020-04-30T12:11:31,765][WARN ][logstash.agent ] stopping pipeline {:id=>"main"}
[2020-04-30T12:11:31,766][DEBUG][logstash.pipeline ] Closing inputs
[2020-04-30T12:11:31,769][DEBUG][logstash.inputs.elasticsearch] stopping {:plugin=>"LogStash::Inputs::Elasticsearch"}
[2020-04-30T12:11:31,769][DEBUG][logstash.pipeline ] Closed inputs
Steps to reproduce: Use these version with this config, I was able to Send data without SSL (PLAIN TEXT) but it fails with SSL.
on the other hand using cli I am able to do this with same settings https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html security.protocol=SSL ssl.truststore.location=/tmp/kafka.client.truststore.jks
Not sure if the versions listed are not supported or the bug is a plugin problem. Or my config is missing something. Please help.