logstash-plugins / logstash-output-elasticsearch_java

Java API Implementation of Elasticsearch Output
Apache License 2.0
0 stars 8 forks source link

Logstash 2.3.4 getting stuck while attempting to install template in elasticsearch using transport protocol #41

Open debraj-manna opened 8 years ago

debraj-manna commented 8 years ago

After discussing the issue in logstash forum posting the issue here.

Whenever I am trying to use logstash 2.3.4with the below config it is getting stuck.

input {
    kafka {
        zk_connect => "kafka:2181"
        group_id => "logstash"
        topic_id => "logstash_logs2"
        reset_beginning => false
        consumer_threads => 3
    }
}

filter {
  if [app] == "walle_slowquery" or [app] == "walle_slowindex" {
    ruby {
        code => "event['timestamp'] = event['@timestamp']"
    }
  }
  grok {
    match => [
     "timestamp", "^(?<app_log_time>%{YEAR}-%{MONTHNUM}-%{MONTHDAY})"
    ]
  }
  mutate {
    rename => {
      "app_log_time" => "[@metadata][app_log_time]"
    }
  }
}

output {
  if [env] == "prod" or [env] == "common" {
    elasticsearch_java {
      #For daily index creation used the time notation, Remove if not required.
      index => "jabong-%{env}-%{app}-%{iver}-%{[@metadata][app_log_time]}"
      cluster => "elasticsearch"
      network_host => "172.16.84.230"
      hosts => ["es-master1:9300", "es-master2:9300", "es-master3:9300"]
      protocol => "transport"
    }
    file {
      path => "/var/log/shop/%{env}/%{app}/%{app}_%{host}_%{[@metadata][app_log_time]}.log"
    }
    stdout { codec => rubydebug }
  }
}

The log that I am seeing when starting logstash with debug flag is below:-

root@logstash-indexer:/opt/logstash-2.3.4# bin/logstash -f conf/logstash_indexer.conf --debug
Reading config file {:config_file=>"/opt/logstash-2.3.4/conf/logstash_indexer.conf", :level=>:debug, :file=>"logstash/config/loader.rb", :line=>"69", :method=>"local_config"}
Plugin not defined in namespace, checking for plugin file {:type=>"input", :name=>"kafka", :path=>"logstash/inputs/kafka", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"}
Plugin not defined in namespace, checking for plugin file {:type=>"codec", :name=>"json", :path=>"logstash/codecs/json", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"}
config LogStash::Codecs::JSON/@charset = "UTF-8" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@zk_connect = "kafka:2181" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@group_id = "logstash" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@topic_id = "logstash_logs2" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@reset_beginning = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@consumer_threads = 3 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@codec = <LogStash::Codecs::JSON charset=>"UTF-8"> {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@add_field = {} {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@white_list = nil {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@black_list = nil {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@auto_offset_reset = "largest" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@auto_commit_interval_ms = 1000 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@queue_size = 20 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@rebalance_max_retries = 4 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@rebalance_backoff_ms = 2000 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@consumer_timeout_ms = -1 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@consumer_restart_on_error = true {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@consumer_restart_sleep_ms = 0 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@decorate_events = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@consumer_id = nil {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@fetch_message_max_bytes = 1048576 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@decoder_class = "kafka.serializer.DefaultDecoder" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Inputs::Kafka/@key_decoder_class = "kafka.serializer.DefaultDecoder" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
Plugin not defined in namespace, checking for plugin file {:type=>"filter", :name=>"ruby", :path=>"logstash/filters/ruby", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"}
config LogStash::Filters::Ruby/@code = "event['timestamp'] = event['@timestamp']" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Ruby/@add_tag = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Ruby/@remove_tag = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Ruby/@add_field = {} {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Ruby/@remove_field = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Ruby/@periodic_flush = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
Plugin not defined in namespace, checking for plugin file {:type=>"filter", :name=>"grok", :path=>"logstash/filters/grok", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"}
config LogStash::Filters::Grok/@match = {"timestamp"=>"^(?<app_log_time>%{YEAR}-%{MONTHNUM}-%{MONTHDAY})"} {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Grok/@add_tag = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Grok/@remove_tag = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Grok/@add_field = {} {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Grok/@remove_field = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Grok/@periodic_flush = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Grok/@patterns_dir = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Grok/@patterns_files_glob = "*" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Grok/@break_on_match = true {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Grok/@named_captures_only = true {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Grok/@keep_empty_captures = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Grok/@singles = true {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Grok/@tag_on_failure = ["_grokparsefailure"] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Grok/@overwrite = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
Plugin not defined in namespace, checking for plugin file {:type=>"filter", :name=>"mutate", :path=>"logstash/filters/mutate", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"}
config LogStash::Filters::Mutate/@rename = {"app_log_time"=>"[@metadata][app_log_time]"} {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Mutate/@add_tag = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Mutate/@remove_tag = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Mutate/@add_field = {} {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Mutate/@remove_field = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Filters::Mutate/@periodic_flush = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
Plugin not defined in namespace, checking for plugin file {:type=>"output", :name=>"elasticsearch_java", :path=>"logstash/outputs/elasticsearch_java", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"}
Plugin not defined in namespace, checking for plugin file {:type=>"output", :name=>"file", :path=>"logstash/outputs/file", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"}
Plugin not defined in namespace, checking for plugin file {:type=>"output", :name=>"stdout", :path=>"logstash/outputs/stdout", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"}
starting agent {:level=>:info, :file=>"logstash/agent.rb", :line=>"207", :method=>"execute"}
starting pipeline {:id=>"main", :level=>:info, :file=>"logstash/agent.rb", :line=>"469", :method=>"start_pipeline"}
Settings: Default pipeline workers: 12
log4j java properties setup {:log4j_level=>"DEBUG", :level=>:debug, :file=>"logstash/logging.rb", :line=>"89", :method=>"setup_log4j"}
Registering kafka {:group_id=>"logstash", :topic_id=>"logstash_logs2", :zk_connect=>"kafka:2181", :level=>:info, :file=>"logstash/inputs/kafka.rb", :line=>"133", :method=>"register"}
Running kafka {:group_id=>"logstash", :topic_id=>"logstash_logs2", :zk_connect=>"kafka:2181", :level=>:info, :file=>"logstash/inputs/kafka.rb", :line=>"140", :method=>"run"}
Plugin not defined in namespace, checking for plugin file {:type=>"codec", :name=>"plain", :path=>"logstash/codecs/plain", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"}
config LogStash::Codecs::Plain/@charset = "UTF-8" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@index = "jabong-%{env}-%{app}-%{iver}-%{[@metadata][app_log_time]}" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@cluster = "elasticsearch" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@network_host = "172.16.84.230" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@hosts = ["172.16.84.230:9300"] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@protocol = "transport" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@codec = <LogStash::Codecs::Plain charset=>"UTF-8"> {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@workers = 1 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@manage_template = true {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@template_name = "logstash" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@template_overwrite = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@parent = nil {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@flush_size = 500 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@idle_flush_time = 1 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@upsert = "" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@doc_as_upsert = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@max_retries = 3 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@script = "" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@script_type = "inline" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@script_lang = "" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@script_var_name = "event" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@scripted_upsert = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@retry_max_interval = 2 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@retry_max_items = 500 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@retry_on_conflict = 1 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@pipeline = nil {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@action = "index" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@max_inflight_requests = 50 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
config LogStash::Outputs::ElasticSearchJava/@sniffing = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"}
Using mapping template from {:path=>nil, :level=>:info, :file=>"logstash/outputs/elasticsearch/template_manager.rb", :line=>"6", :method=>"install_template"}
Attempting to install template {:manage_template=>{"template"=>"logstash-*", "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "omit_norms"=>true}, "dynamic_templates"=>[{"message_field"=>{"match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"string", "index"=>"analyzed", "omit_norms"=>true, "fielddata"=>{"format"=>"disabled"}}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"string", "index"=>"analyzed", "omit_norms"=>true, "fielddata"=>{"format"=>"disabled"}, "fields"=>{"raw"=>{"type"=>"string", "index"=>"not_analyzed", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"string", "index"=>"not_analyzed"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"float"}, "longitude"=>{"type"=>"float"}}}}}}}, :level=>:info, :file=>"logstash/outputs/elasticsearch/template_manager.rb", :line=>"8", :method=>"install_template"}

I have verified the issue is only with elasticsearch_java output plugin. If I modify my output plugin like below then I am seeing my logs in console.

output {
  if [env] == "prod" or [env] == "common" {
    stdout { codec => rubydebug }
  }
}

The logstash logs also shows the below message:-

Pipeline main started {:file=>"logstash/agent.rb",:line=>"473", :method=>"start_pipeline"}

If I replace the elasticsearch_java with the elasticsearch output plugin then also everything works fine:-

output {
  if [env] == "prod" or [env] == "common" {
    elasticsearch {
      index => "jabong-%{env}-%{app}-%{iver}-%{[@metadata][app_log_time]}"
      hosts => ["es-master1","es-master2","es-master3"]
    }
    file {
      path => "/var/log/shop/%{env}/%{app}/%{app}_%{host}_%{[@metadata][app_log_time]}.log"
    }
    stdout { codec => rubydebug }
  }
}

In my current set-up I am using logstash 1.5.3 there also everything is working fine with the below config:-

input {
    kafka {
        zk_connect => "kafka:2181"
        group_id => "logstash"
        topic_id => "logstash_logs2"
        reset_beginning => false
        consumer_threads => 3
    }
}

filter {
  if [app] == "walle_slowquery" or [app] == "walle_slowindex" {
    ruby {
        code => "event['timestamp'] = event['@timestamp']"
    }
  }
  grok {
    match => [
     "timestamp", "^(?<app_log_time>%{YEAR}-%{MONTHNUM}-%{MONTHDAY})"
    ]
  }
  mutate {
    rename => {
      "app_log_time" => "[@metadata][app_log_time]"
    }
  }
}

output {
  if [env] == "prod" or [env] == "common" {
    elasticsearch {
      index => "jabong-%{env}-%{app}-%{iver}-%{[@metadata][app_log_time]}"
      cluster => "elasticsearch"
      host => ["es-master1:9300", "es-master2:9300","es-master3:9300"]
      protocol => "transport"
    }
    file {
      path => "/var/log/shop/%{env}/%{app}/%{app}_%{host}_%{[@metadata][app_log_time]}.log"
    }
    stdout { codec => rubydebug }
  }
}

Environment:-

jordansissel commented 8 years ago

On my phone so github is hard to browse, but I believe that the es Java plugin you have uses the Elasticsearch Java library version 2.1.0 and I am not sure it is compatible with Elasticsearch 1.7 server.

Also, this plugin is probably not being maintained much anymore.

Recommendations:

On Monday, August 29, 2016, Debraj Manna notifications@github.com wrote:

After discussing the issue in logstash forum https://discuss.elastic.co/t/logstash-2-3-4-getting-stuck-while-attempting-to-install-template-in-elasticsearch/59098 posting the issue here.

Whenever I am trying to use logstash 2.3.4 with the below config it is getting stuck.

input { kafka { zk_connect => "kafka:2181" group_id => "logstash" topic_id => "logstash_logs2" reset_beginning => false consumer_threads => 3 } }

filter { if [app] == "walle_slowquery" or [app] == "walle_slowindex" { ruby { code => "event['timestamp'] = event['@timestamp']" } } grok { match => [ "timestamp", "^(?%{YEAR}-%{MONTHNUM}-%{MONTHDAY})" ] } mutate { rename => { "app_log_time" => "[@metadata][app_log_time]" } } }

output { if [env] == "prod" or [env] == "common" { elasticsearch_java {

For daily index creation used the time notation, Remove if not required.

  index => "jabong-%{env}-%{app}-%{iver}-%{[@metadata][app_log_time]}"
  cluster => "elasticsearch"
  network_host => "172.16.84.230"
  hosts => ["es-master1:9300", "es-master2:9300", "es-master3:9300"]
  protocol => "transport"
}
file {
  path => "/var/log/shop/%{env}/%{app}/%{app}_%{host}_%{[@metadata][app_log_time]}.log"
}
stdout { codec => rubydebug }

} }

The log that I am seeing when starting logstash with debug flag is below:-

root@logstash-indexer:/opt/logstash-2.3.4# bin/logstash -f conf/logstash_indexer.conf --debug Reading config file {:config_file=>"/opt/logstash-2.3.4/conf/logstash_indexer.conf", :level=>:debug, :file=>"logstash/config/loader.rb", :line=>"69", :method=>"local_config"} Plugin not defined in namespace, checking for plugin file {:type=>"input", :name=>"kafka", :path=>"logstash/inputs/kafka", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"} Plugin not defined in namespace, checking for plugin file {:type=>"codec", :name=>"json", :path=>"logstash/codecs/json", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"} config LogStash::Codecs::JSON/@charset = "UTF-8" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@zk_connect = "kafka:2181" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@group_id = "logstash" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@topic_id = "logstash_logs2" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@reset_beginning = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@consumer_threads = 3 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@codec = <LogStash::Codecs::JSON charset=>"UTF-8"> {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@add_field = {} {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@white_list = nil {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@black_list = nil {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@auto_offset_reset = "largest" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@auto_commit_interval_ms = 1000 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@queue_size = 20 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@rebalance_max_retries = 4 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@rebalance_backoff_ms = 2000 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@consumer_timeout_ms = -1 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@consumer_restart_on_error = true {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@consumer_restart_sleep_ms = 0 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@decorate_events = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@consumer_id = nil {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@fetch_message_max_bytes = 1048576 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@decoder_class = "kafka.serializer.DefaultDecoder" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Inputs::Kafka/@key_decoder_class = "kafka.serializer.DefaultDecoder" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} Plugin not defined in namespace, checking for plugin file {:type=>"filter", :name=>"ruby", :path=>"logstash/filters/ruby", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"} config LogStash::Filters::Ruby/@code = "event['timestamp'] = event['@timestamp']" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Ruby/@add_tag = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Ruby/@remove_tag = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Ruby/@add_field = {} {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Ruby/@remove_field = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Ruby/@periodic_flush = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} Plugin not defined in namespace, checking for plugin file {:type=>"filter", :name=>"grok", :path=>"logstash/filters/grok", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"} config LogStash::Filters::Grok/@match = {"timestamp"=>"^(?%{YEAR}-%{MONTHNUM}-%{MONTHDAY})"} {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Grok/@add_tag = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Grok/@remove_tag = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Grok/@add_field = {} {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Grok/@remove_field = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Grok/@periodic_flush = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Grok/@patterns_dir = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Grok/@patterns_filesglob = "" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Grok/@break_on_match = true {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Grok/@named_captures_only = true {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Grok/@keep_empty_captures = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Grok/@singles = true {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Grok/@tag_on_failure = ["_grokparsefailure"] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Grok/@overwrite = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} Plugin not defined in namespace, checking for plugin file {:type=>"filter", :name=>"mutate", :path=>"logstash/filters/mutate", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"} config LogStash::Filters::Mutate/@rename = {"app_log_time"=>"[@metadata][app_log_time]"} {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Mutate/@add_tag = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Mutate/@remove_tag = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Mutate/@add_field = {} {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Mutate/@remove_field = [] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Filters::Mutate/@periodic_flush = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} Plugin not defined in namespace, checking for plugin file {:type=>"output", :name=>"elasticsearch_java", :path=>"logstash/outputs/elasticsearch_java", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"} Plugin not defined in namespace, checking for plugin file {:type=>"output", :name=>"file", :path=>"logstash/outputs/file", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"} Plugin not defined in namespace, checking for plugin file {:type=>"output", :name=>"stdout", :path=>"logstash/outputs/stdout", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"} starting agent {:level=>:info, :file=>"logstash/agent.rb", :line=>"207", :method=>"execute"} starting pipeline {:id=>"main", :level=>:info, :file=>"logstash/agent.rb", :line=>"469", :method=>"start_pipeline"} Settings: Default pipeline workers: 12 log4j java properties setup {:log4j_level=>"DEBUG", :level=>:debug, :file=>"logstash/logging.rb", :line=>"89", :method=>"setup_log4j"} Registering kafka {:group_id=>"logstash", :topic_id=>"logstash_logs2", :zk_connect=>"kafka:2181", :level=>:info, :file=>"logstash/inputs/kafka.rb", :line=>"133", :method=>"register"} Running kafka {:group_id=>"logstash", :topic_id=>"logstash_logs2", :zk_connect=>"kafka:2181", :level=>:info, :file=>"logstash/inputs/kafka.rb", :line=>"140", :method=>"run"} Plugin not defined in namespace, checking for plugin file {:type=>"codec", :name=>"plain", :path=>"logstash/codecs/plain", :level=>:debug, :file=>"logstash/plugin.rb", :line=>"76", :method=>"lookup"} config LogStash::Codecs::Plain/@charset = "UTF-8" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@index = "jabong-%{env}-%{app}-%{iver}-%{[@metadata][app_log_time]}" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@cluster = "elasticsearch" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@network_host = "172.16.84.230" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@hosts = ["172.16.84.230:9300"] {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@protocol = "transport" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@codec = <LogStash::Codecs::Plain charset=>"UTF-8"> {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@workers = 1 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@manage_template = true {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@template_name = "logstash" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@template_overwrite = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@parent = nil {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@flush_size = 500 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@idle_flush_time = 1 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@upsert = "" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@doc_as_upsert = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@max_retries = 3 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@script = "" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@script_type = "inline" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@script_lang = "" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@script_var_name = "event" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@scripted_upsert = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@retry_max_interval = 2 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@retry_max_items = 500 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@retry_on_conflict = 1 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@pipeline = nil {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@action = "index" {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@max_inflight_requests = 50 {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} config LogStash::Outputs::ElasticSearchJava/@sniffing = false {:level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"153", :method=>"config_init"} Using mapping template from {:path=>nil, :level=>:info, :file=>"logstash/outputs/elasticsearch/template_manager.rb", :line=>"6", :method=>"install_template"} Attempting to install template {:managetemplate=>{"template"=>"logstash-", "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"default"=>{"_all"=>{"enabled"=>true, "omit_norms"=>true}, "dynamic_templates"=>[{"message_field"=>{"match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"string", "index"=>"analyzed", "omit_norms"=>true, "fielddata"=>{"format"=>"disabled"}}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"string", "index"=>"analyzed", "omit_norms"=>true, "fielddata"=>{"format"=>"disabled"}, "fields"=>{"raw"=>{"type"=>"string", "index"=>"not_analyzed", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"string", "index"=>"not_analyzed"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"float"}, "longitude"=>{"type"=>"float"}}}}}}}, :level=>:info, :file=>"logstash/outputs/elasticsearch/template_manager.rb", :line=>"8", :method=>"install_template"}

I have verified the issue is only with elasticsearch_java output plugin. If I modify my output plugin like below then I am seeing my logs in console.

output { if [env] == "prod" or [env] == "common" { stdout { codec => rubydebug } } }

The logstash logs also shows the below message:-

Pipeline main started {:file=>"logstash/agent.rb",:line=>"473", :method=>"start_pipeline"}

If I replace the elasticsearch_java with the elasticsearch output plugin then also everything works fine:-

output { if [env] == "prod" or [env] == "common" { elasticsearch { index => "jabong-%{env}-%{app}-%{iver}-%{[@metadata][app_logtime]}" hosts => ["es-master1","es-master2","es-master3"] } file { path => "/var/log/shop/%{env}/%{app}/%{app}%{host}_%{[@metadata][app_log_time]}.log" } stdout { codec => rubydebug } } }

In my current set-up I am using logstash 1.5.3 there also everything is working fine with the below config:-

input { kafka { zk_connect => "kafka:2181" group_id => "logstash" topic_id => "logstash_logs2" reset_beginning => false consumer_threads => 3 } }

filter { if [app] == "walle_slowquery" or [app] == "walle_slowindex" { ruby { code => "event['timestamp'] = event['@timestamp']" } } grok { match => [ "timestamp", "^(?%{YEAR}-%{MONTHNUM}-%{MONTHDAY})" ] } mutate { rename => { "app_log_time" => "[@metadata][app_log_time]" } } }

output { if [env] == "prod" or [env] == "common" { elasticsearch { index => "jabong-%{env}-%{app}-%{iver}-%{[@metadata][app_logtime]}" cluster => "elasticsearch" host => ["es-master1:9300", "es-master2:9300","es-master3:9300"] protocol => "transport" } file { path => "/var/log/shop/%{env}/%{app}/%{app}%{host}_%{[@metadata][app_log_time]}.log" } stdout { codec => rubydebug } } }

Environment:-

  • OS - Debian 8 - 64 Bit
  • ElasticSearch Version - 1.7.1
  • Logstash Version - 2.3.4
  • logstash-output-elasticsearch_java - 2.1.3
  • logstash-input-kafka - 2.0.8

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/logstash-plugins/logstash-output-elasticsearch_java/issues/41, or mute the thread https://github.com/notifications/unsubscribe-auth/AAIC6nTwtlPLcYewMyh-HXzzpDClcwgkks5qkob7gaJpZM4JvNK5 .

debraj-manna commented 8 years ago

Thanks @jordansissel for your prompt reply.

use the well-maintained Elasticsearch output, not this elasticsearch_java one

I was suggested the same thing in logstash forum. It is working fine with the elasticsearch plugin. But we cannot use it now because elasticsearch does not support the transport protocol. We decided to use transport protocol because in our set-up (which is a bit legacy about 2 years old) we saw the light transport protocol seems to have better performance than the http protocol.

try with an ES 2.x cluster

We were trying to upgrade to logstash 2.3.4 from logstash 1.5.3 so that we can use beats in place of logstash-forwarder. We are not seeing any issue in our ES Cluster. Upgrading the entire ES cluster to 2.x will be little challenging for us at the moment.

Can you suggest something else? Is it possible to use some version of elasticsearch_java which supports 1.7 ES Cluster?

acchen97 commented 8 years ago

@debraj-manna as @jordansissel mentioned, the elasticsearch output plugin (HTTP protocol) is the highly recommended option for communicating with ES and has been our default protocol since 2.0. HTTP has functional parity with node/transport and is compatible with ES 1.x/2.x and our upcoming 5.0 release. The performance differences between the protocols aren't really an issue anymore (see here). Please note the elasticsearch_java plugin will be a community maintained entity soon in the future.

Can you please using the elasticsearch plugin instead?

We were trying to upgrade to logstash 2.3.4 from logstash 1.5.3 so that we can use beats in place of logstash-forwarder. We are not seeing any issue in our ES Cluster.

Quick note on migrating to Filebeat - I'd recommend trying out LS 2.4 as we've made a large enhancement in the Beats input (v3.1.0+) which improves performance significantly.

debraj-manna commented 8 years ago

Thanks @acchen97 . We are planning to move to HTTP protocol along with the elasticsearch output plugin as it is heavily recommended.

But I have couple of doubts:-

The performance differences between the protocols aren't really an issue anymore

I have gone through the link that you have shared. One doubt does this hold for Elasticsearch Cluster 1.7 along with Logstash 2.x or this is only for Elasticsearch Cluster 2.x alongwith Logstash 2.x?

As mentioned here I tried installing logstash-output-elasticsearch_java for ES Cluster 1.7 but it is giving me the below error:-

root@logstash-indexer:/opt/logstash-2.4.0# bin/logstash-plugin install --version 1.5.x logstash-output-elasticsearch_java
Validating logstash-output-elasticsearch_java-1.5.x
Plugin logstash-output-elasticsearch_java version 1.5.x does not exist
ERROR: Installation aborted, verification failed for logstash-output-elasticsearch_java 1.5.x

Is the support for ES Cluster 1.7 dropped for this plugin?

andrewvc commented 8 years ago

As far as performance goes it should be true with 1.x as well. Just remember you'll need to set the workers setting for the ES output a bit higher for HTTP. This won't use any more resources but it will bring perf in line. I'd start with three workers.

The reason your install failed is that 1.5.x is not an actual version. You'll need to install the latest 1.x version which is 1.0.0.

The java protocol is tightly tied to the Elasticsearch release. It is not possible for one plugin to support multiple ES versions due to this.