elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
62 stars 3.5k forks source link

Memory leak (rabbitmq-input plugin) #3992

Open Alykoff opened 9 years ago

Alykoff commented 9 years ago

I have memory leak for logstash (try 1.5.4 and 1.4.4). Supose it's elasticsearch output plugin.

I using phusion/baseimage (special Ubuntu Docker image)

> java -version
java version "1.8.0_45"
Java(TM) SE Runtime Environment (build 1.8.0_45-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)

My filter and input:

input {
  rabbitmq {
    type => "logs"
    codec => "json"
    durable => true
    ack => false
    queue => "logs"
    threads => 1
    user => "guest1"
    password => "***"
    host => "10.**.**.***"
  }

  rabbitmq {
    type => "collectors"
    codec => "json"
    durable => true
    ack => false
    queue => "collectors"
    threads => 1
    user => "guest1"
    password => "***"
    host => "10.**.**.***"
  }
}

filter {
  json {
    source => "@message"
  }

  mutate {
    remove_field => [ "kafka" ]
  }

  if [type] == 'logs' {
    mutate {
      add_tag => [ "_logs" ]
    }
  }

  if [type] == 'collector_users' {
    mutate {
      add_tag => [ "_users", "supermap", "output" ]
    }
  }

  if [type] == 'collector_internal' {
    mutate {
      add_tag => [ "_internal", "output" ]
    }
  }

  if [type] == 'collector_errors' {
    mutate {
      add_tag => [ "_errors", "output" ]
    }
  }

  if [type] == 'bughunt' {
    mutate {
      add_tag => [ "_bughunt" ]
    }
  }

  if ('supermap' in [tags]) and ('prod' in [env]) and ([marker] in [
     'topic.some',
     'topic.other'
  ]){
    mutate {
      remove_tag => [ "supermap" ]
    }
    clone {
      remove_tag => [ "_users", "output" ]
      add_tag => [ "supermap" ]
    }
  } else {
    mutate {
      remove_tag => [ "supermap" ]
    }
  }

  if 'supermap' in [tags] {
    anonymize {
      key => "not used"
      algorithm => "MD5"
      fields => [ "fields.ID", "fields.service_sessionID" ]
    }
  }
}
output {
  if 'output' in [tags] {
    rabbitmq {
      workers => 1
      durable => true
      persistent => true
      codec => "json"
      exchange => "output-logs-exchange"
      exchange_type => "direct"
      host => "10.**.**.***"
      vhost => "outputq"
      key => "output-logs"
      user => "guest1"
      password => "***"
    }
  }

  if 'es_users' in [tags] {
    elasticsearch {
      codec => "json"
      cluster => "cluster_name"
      host => "elasticsearch-cpu"
      index => "collector_users-%{+YYYY.MM.dd}"
      workers => 1
    }
  }
}
output {
  if 'output' in [tags] {
    rabbitmq {
      workers => 1
      durable => true
      persistent => true
      codec => "json"
      exchange => "output-logs-exchange"
      exchange_type => "direct"
      host => "10.**.**.***"
      vhost => "outputq"
      key => "output-logs"
      user => "guest1"
      password => "***"
    }
  }

  if 'es_users' in [tags] {
    elasticsearch {
      codec => "json"
      cluster => "cluster_name"
      host => "elasticsearch-cpu"
      index => "collector_users-%{+YYYY.MM.dd}"
      workers => 1
    }
  }
  if 'es_users' in [tags] {
    elasticsearch {
      codec => "json"
      cluster => "cluster_name"
      host => "elasticsearch-cpu"
      index => "collector_users-%{+YYYY.MM.dd}"
      workers => 1
    }
  }
}

(OUTPUT_3):

output {
  if 'output' in [tags] {
    rabbitmq {
      workers => 1
      durable => true
      persistent => true
      codec => "json"
      exchange => "output-logs-exchange"
      exchange_type => "direct"
      host => "10.**.**.***"
      vhost => "outputq"
      key => "output-logs"
      user => "guest1"
      password => "***"
    }
  }

  if 'es_users' in [tags] {
    elasticsearch {
      codec => "json"
      cluster => "cluster_name"
      host => "elasticsearch-cpu"
      index => "collector_users-%{+YYYY.MM.dd}"
      workers => 1
    }
  }
  if 'es_users' in [tags] {
    elasticsearch {
      codec => "json"
      cluster => "cluster_name"
      host => "elasticsearch-cpu"
      index => "collector_users-%{+YYYY.MM.dd}"
      workers => 1
    }
  }
  if 'es_internal' in [tags] {
    elasticsearch {
      codec => "json"
      cluster => "cluster_name"
      host => "elasticsearch-cpu"
      index => "collector_internal-%{+YYYY.MM.dd}"
      workers => 1
    }
  }
}
Alykoff commented 9 years ago

Hm... I don't have memroy leak If I use this input:

input {
  rabbitmq {
    type => "logs"
    codec => "json"
    durable => true
    ack => true
    queue => "logs"
    threads => 1
    user => "guest1"
    password => "***"
    host => "10.**.**.***"
  }

  rabbitmq {
    type => "collectors"
    codec => "json"
    durable => true
    ack => true
    queue => "collectors"
    threads => 1
    user => "guest1"
    password => "***"
    host => "10.**.**.***"
  }
}

I change ack param, now it's:

ack => true

Alykoff commented 9 years ago

This memory leak is bug logstash-input-rabbitmq plugin.

suyograo commented 9 years ago

Thanks @Alykoff for the detailed report. We'll put this on out todo list.

michaelklishin commented 9 years ago

@Alykoff if changing acknowledgements mode alters the behaviour, this strongly suggests that the plugin doesn't acknowledge deliveries correctly (or at all). Memory usage breakdown indicates it is indeed RabbitMQ plugin's threads that use the majority of the heap.