elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
14.21k stars 3.5k forks source link

Logstash threads not being cleaned up #2005

Closed bortek closed 9 years ago

bortek commented 9 years ago

Hi guys,

I have an ELK installation with logstash 1.4.2 and elasticsearch 1.1.1. There is a problem with the threads , what I can see is that total number of threads are constantly increasing and are not being cleaned up when they are finished. At some point then I get Java outOfMemory and it crashes.

I have installed this FIX suggested from this discussion (https://github.com/elasticsearch/logstash-forwarder/issues/212) and since then I dont get OutOfMemory in the logs anymore but still the same behaviour.

Here is a screenshot from JVisualVM which shows this. You can see that the GC activity almost dies and the GC graph looks different after that. logstash1

When I look at the threads I can see lots of finished threads which are not being cleaned up and clog the moemory. logstash2

Can anyone help me shed some light on this and figure out whats happening?

colinsurprenant commented 9 years ago

Thanks for the report. Could you share your config?

bortek commented 9 years ago

Hi Colin!

I think what happens is that there it tries to read a lot of data from the logs to catchup from begining of logs to current time and those finished threads do not have time to be cleaned up, or are they never clean up for some reason.

I have downgraded logstash to version 1.3 to test if it is the same behaviour and yest it was. After few hours it got OutOfMemory.

Here comes my config together with the lofile with error in it.

============================== logstash.conf ============================= input { udp { port => 2515 type => syslog } lumberjack { port => 6043 ssl_certificate => "/opt/logstash/etc/selfsigned.crt" ssl_key => "/opt/logstash/etc/selfsigned.key" } file { path => "/tmp/request.log" type => requestlog }

}

filter { if [type] == "syslog" { mutate { gsub => [ "message", "[\n]", "" ] } grok { match => { "message" => "%{SYSLOG5424LINE}" } add_field => [ "env", "%{syslog5424_app}" ] } if !("_grokparsefailure" in [tags]) { if ("<event" in [syslog5424_msg]) { xml { source => "syslog5424_msg" target => "doc" } alter { add_field => [ "source", "APP" ] }

} } else { grok { match => { "message" => "%{SYSLOGLINE}" } } } } if [type] == "somelogfile" { grok { match => [ "message", "timestamp=\"%{YEAR:YEAR}-%{MONTHNUM:MONTHNUM}-%{MONTHDAY:MONTHDAY}:%{HOUR:HOUR}:?%{MINUTE:MINUTE}?::?%{SECOND:SECOND} %{ISO8601_TIMEZONE:ISO8601_TIMEZONE}\" sequence=%{NUMBER:sequence} session=\"%{GREEDYDATA:session}\" host=%{GREEDYDATA:hostname} xforwarded=\"%{DATA:forwarded}\" via clienthost=%{DATA:via} inport=%{NUMBER:inport} useragent=\"%{GREEDYDATA:useragent}\" user=%{NOTSPACE:username} request=\"%{GREEDYDATA:request}\" response=%{NUMBER:response} bytes=%{NOTSPACE:bytes} timetaken=%{NOTSPACE:timetaken}" ] add_field => [ "timestamp", "%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND} %{ISO8601_TIMEZONE}" ] remove_field => [ "YEAR", "MONTHNUM", "MONTHDAY", "HOUR", "MINUTE", "SECOND", "ISO8601_TIMEZONE" ] } if [request] != "-" { grok { match => [ "request", "%{NOTSPACE:werb} %{GREEDYDATA:URL} HTTP/%{GREEDYDATA:httpversion}" ] } } date { match => [ "timestamp" , "YYYY-MM-dd HH:mm:ss Z" , "ISO8601" ] remove_field => [ "timestamp" ] } mutate { gsub => ["timetaken", "out of range", "20000000"] } mutate { convert => [ "bytes", "integer" ] convert => [ "timetaken", "integer" ] convert => [ "response", "integer" ] convert => [ "inport", "integer" ] lowercase => [ "hostname" ] } alter { add_field => [ "source", "WEB" ] } if [useragent] { useragent { source => "useragent" target => "ua" } } if [forwarded] =~ ", " { grok { match => [ "forwarded", ", %{NOTSPACE:sourceip}\"" ] } } else { if [forwarded] != "-" { mutate { add_field => [ "sourceip", "%{forwarded}" ] } } } if [sourceip] !~ "(^127.0.0.1)|(^10.)" and [sourceip] { geoip { source => "sourceip" } mutate {

'coords' will be kept, 'tmplat' is temporary.

  # Both of these new fields are strings.
  add_field => [ "coords", "%{[geoip][longitude]}",
         "tmplat", "%{[geoip][latitude]}" ]
  }
  mutate {
  # Merge 'tmplat' into 'coords'
  merge => [ "coords", "tmplat" ]
  }
  mutate {
  # Convert our new array of strings back to float
  convert => [ "coords", "float" ]
  # Delete our temporary latitude field
  remove => [ "tmplat" ]
  }

} } if [type] == "accessfile" { grok { match => [ "message", "%{MONTHDAY:MONTHDAY}/%{MONTH:MONTH}/%{YEAR:YEAR}:%{HOUR:HOUR}:?%{MINUTE:MINUTE}?::?%{SECOND:SECOND} %{ISO8601_TIMEZONE:ISO8601_TIMEZONE}] conn=%{NUMBER:conn} op=%{NUMBER:op} ID=%{NUMBER:msgid} - %{NOTSPACE:werb}\s*%{GREEDYDATA:data}" ] add_field => [ "timestamp", "%{YEAR}-%{MONTH}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND} %{ISO8601_TIMEZONE}" ] remove_field => [ "YEAR", "MONTH", "MONTHDAY", "HOUR", "MINUTE", "SECOND", "ISO8601_TIMEZONE" ] } date { match => [ "timestamp" , "YYYY-MMM-dd HH:mm:ss Z" , "ISO8601" ] remove_field => [ "timestamp" ] } if [werb] == "RESULT" { grok { match => [ "data", "err=%{NUMBER:err} tag=%{NUMBER:tag} nentries=%{NUMBER:nentries} etime=%{BASE16FLOAT:etime}" ] } mutate { convert => [ "err", "integer" ] convert => [ "tag", "integer" ] convert => [ "nentries", "integer" ] convert => [ "etime", "float" ] } } alter { add_field => [ "source", "DS" ] } } if [type] == "somelogfile2" { grok { match => [ "message", "%{YEAR:YEAR}-%{MONTHNUM:MONTHNUM}-%{MONTHDAY:MONTHDAY} %{HOUR:HOUR}:?%{MINUTE:MINUTE}?::?%{SECOND:SECOND}:%{NUMBER:PARTSECOND};%{WORD:login};%{NUMBER:base_level};%{SPACE|WORD};%{SPACE|WORD};%{DATA:user};%{USERNAME:username};%{WORD:base_plugin}" ] match => [ "message", "%{YEAR:YEAR}-%{MONTHNUM:MONTHNUM}-%{MONTHDAY:MONTHDAY} %{HOUR:HOUR}:?%{MINUTE:MINUTE}?::?%{SECOND:SECOND}:%{NUMBER:PARTSECOND};%{SPACE|WORD:login};%{NUMBER:base_level};%{SPACE|WORD};%{SPACE|WORD};%{SPACE|DATA:bind_user};%{DATA:doing};%{WORD:base_plugin};" ] add_field => [ "timestamp", "%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}" ] remove_field => [ "YEAR", "MONTHNUM", "MONTHDAY", "HOUR", "MINUTE", "SECOND", "PARTSECOND" ] } date { match => [ "timestamp" , "YYYY-MM-dd HH:mm:ss" , "ISO8601" ] remove_field => [ "timestamp" ] } mutate { convert => [ "base_level", "integer" ] } alter { add_field => [ "source", "APP2" ] } } if [type] == "appdebug" or [type] == "apperror" { grok { match => [ "message", "%{YEAR:YEAR}-%{MONTHNUM:MONTHNUM}-%{MONTHDAY:MONTHDAY} %{HOUR:HOUR}:?%{MINUTE:MINUTE}?::?%{SECOND:SECOND},%{NUMBER:PARTSECOND} %{NOTSPACE} %{WORD:severity} [Web : %{NUMBER:web}] (%{NOTSPACE:logger}) %{UUID:requestid} %{GREEDYDATA:data}" ] add_field => [ "timestamp", "%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}" ] remove_field => [ "YEAR", "MONTHNUM", "MONTHDAY", "HOUR", "MINUTE", "SECOND", "PARTSECOND" ] } date { match => [ "timestamp" , "YYYY-MM-dd HH:mm:ss" , "ISO8601" ] } mutate { convert => [ "web", "integer" ] } alter { add_field => [ "source", "APP2" ] } } if [type] == "accesslog" { grok { match => [ "message", "%{IP:ip} %{DATA:agent} %{USERNAME:username} [%{MONTHDAY:MONTHDAY}/%{MONTH:MONTH}/%{YEAR:YEAR}:%{HOUR:HOUR}:?%{MINUTE:MINUTE}?::?%{SECOND:SECOND} %{ISO8601_TIMEZONE:ISO8601_TIMEZONE}] \"%{WORD:werb} %{GREEDYDATA:URI} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response} %{NOTSPACE:bytes}" ] match => [ "message", "[%{MONTHDAY:MONTHDAY}/%{MONTH:MONTH}/%{YEAR:YEAR}:%{HOUR:HOUR}:?%{MINUTE:MINUTE}?::?%{SECOND:SECOND} %{ISO8601_TIMEZONE:ISO8601_TIMEZONE}] %{DATA:forwarded} via %{NOTSPACE:via_ip} %{DATA:via} %{NOTSPACE:server_name} %{NOTSPACE:referer} \"%{DATA:id}\" \"%{WORD:werb} %{GREEDYDATA:URI} HTTP/%{NOTSPACE:httpversion}\" %{NUMBER:response} %{NOTSPACE:bytes} %{NUMBER:timetaken}" ] match => [ "message", "[%{MONTHDAY:MONTHDAY}/%{MONTH:MONTH}/%{YEAR:YEAR}:%{HOUR:HOUR}:?%{MINUTE:MINUTE}?::?%{SECOND:SECOND} %{ISO8601_TIMEZONE:ISO8601_TIMEZONE}] %{DATA:forwarded} via %{NOTSPACE:via_ip} %{DATA:via} %{NOTSPACE:server_name} %{NOTSPACE:referer} \"%{DATA:id}\" \"%{WORD:werb} %{GREEDYDATA:URI} HTTP/%{NOTSPACE:httpversion}\" %{NUMBER:response} %{NOTSPACE:timetaken}" ] add_field => [ "timestamp", "%{YEAR}-%{MONTH}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}" ] remove_field => [ "YEAR", "MONTH", "MONTHDAY", "HOUR", "MINUTE", "SECOND", "ISO8601_TIMEZONE" ] } date { match => [ "timestamp" , "YYYY-MMM-dd HH:mm:ss" , "ISO8601" ] }

    mutate {
                    convert => [ "bytes", "integer" ]
                    convert => [ "timetaken", "integer" ]
                    convert => [ "response", "integer" ]
    }
    grok {
            match => [ "file", "/var/log/app/%{WORD:source}/" ]
    }

} if [type] == "systemout" { multiline { pattern => "^[" negate => true what => "previous" } grok { match => [ "message", "[%{MONTHNUM:MONTHNUM}/%{MONTHDAY:MONTHDAY}/%{YEAR:YEAR} %{HOUR:HOUR}:?%{MINUTE:MINUTE}?::?%{SECOND:SECOND}:%{NUMBER:secfraction} %{WORD:TZ}] %{NOTSPACE: threadId} %{WORD:shortName}\s%{WORD:eventType}\s%{NOTSPACE:className} %{NOTSPACE:methodName} %{GREEDYDATA:data}" ] add_field => [ "timestamp", "20%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}" ] remove_field => [ "YEAR", "MONTHNUM", "MONTHDAY", "HOUR", "MINUTE", "SECOND", "TZ", "secfraction" ] } date { match => [ "timestamp" , "YYYY-M-d HH:mm:ss" ,"ISO8601" ] } grok { match => [ "file", "/var/log/app/%{WORD:source}/" ] }

} if [type] == "webauth" { multiline { pattern => "PATTERN1" negate => true what => "previous" } mutate { gsub => [ "message", "[\n]", ";" ] } grok { match => [ "message", "%{YEAR}-%{MONTHNUM}-%{MONTHDAY}-%{HOUR}:%{MINUTE}:%{SECOND}+%{GREEDYDATA}file.txt var1\s: %{NUMBER:num1};%{GREEDYDATA}file1.txt fail\s: %{NUMBER:numfail};%{GREEDYDATA}file.txt pwd exp\s: %{NUMBER:num2};%{GREEDYDATA}file2.txt max\s: %{NUMBER:num2}" ] } mutate { convert => [ "num1", "integer" ] convert => [ "num2", "integer" ] convert => [ "num3", "integer" ] convert => [ "num4", "integer" ] convert => [ "num5", "integer" ] } alter { addfield => [ "source", "APP" ] } } if [type] == "webthreads" { multiline { pattern => "PATTERN2" negate => true what => "previous" } mutate { gsub => [ "message", "[\n]", ";" ] } grok { match => [ "message", "%{YEAR}-%{MONTHNUM}-%{MONTHDAY}-%{HOUR}:%{MINUTE}:%{SECOND}+%{GREEDYDATA}web.threads active\s: %{NUMBER:threadsactive};%{GREEDYDATA}web.threads total\s: %{NUMBER:threadstotal};%{GREEDYDATA}web.threads \'default\' active\s: %{NUMBER:threadsdefaultactive};%{GREEDYDATA}web.threads \'default\' total\s_: %{NUMBER:threadsdefaulttotal}" ] } mutate { convert => [ "threadsactive", "integer" ] convert => [ "threadstotal", "integer" ] convert => [ "threadsdefaultactive", "integer" ] convert => [ "threadsdefaulttotal", "integer" ] } alter { add_field => [ "source", "APP" ] } } } output { elasticsearch { bind_port => "10310" embedded => true } }

============================== logstash.log ============================== {:timestamp=>"2014-11-03T16:13:16.727000+0100", :message=>"You are using a deprecated config setting \"remove\" set in mutate. Deprecated settings will continue to work, but are scheduled for removal from logstash in the future. If you have any questions about this, please visit the #logstash channel on freenode irc.", :name=>"remove", :plugin=><LogStash::Filters::Mutate --->, :level=>:warn} log4j, [2014-11-03T17:45:30.297] WARN: org.elasticsearch.index.engine.robin: [Strong Guy] [logstash-2014.11.03][0] failed engine java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.lucene.util.fst.BytesStore.writeByte(BytesStore.java:83) at org.apache.lucene.util.fst.FST.(FST.java:282) at org.apache.lucene.util.fst.Builder.(Builder.java:163) at org.apache.lucene.codecs.BlockTreeTermsWriter$PendingBlock.compileIndex(BlockTreeTermsWriter.java:420) at org.apache.lucene.codecs.BlockTreeTermsWriter$TermsWriter.writeBlocks(BlockTreeTermsWriter.java:569) at org.apache.lucene.codecs.BlockTreeTermsWriter$TermsWriter$FindBlocks.freeze(BlockTreeTermsWriter.java:544) at org.apache.lucene.util.fst.Builder.freezeTail(Builder.java:214) at org.apache.lucene.util.fst.Builder.finish(Builder.java:463) at org.apache.lucene.codecs.BlockTreeTermsWriter$TermsWriter.finish(BlockTreeTermsWriter.java:1010) at org.apache.lucene.index.FreqProxTermsWriterPerField.flush(FreqProxTermsWriterPerField.java:553) at org.apache.lucene.index.FreqProxTermsWriter.flush(FreqProxTermsWriter.java:85) at org.apache.lucene.index.TermsHash.flush(TermsHash.java:116) at org.apache.lucene.index.DocInverter.flush(DocInverter.java:53) at org.apache.lucene.index.DocFieldProcessor.flush(DocFieldProcessor.java:81) at org.apache.lucene.index.DocumentsWriterPerThread.flush(DocumentsWriterPerThread.java:465) at org.apache.lucene.index.DocumentsWriter.doFlush(DocumentsWriter.java:506) at org.apache.lucene.index.DocumentsWriter.flushAllThreads(DocumentsWriter.java:616) at org.apache.lucene.index.IndexWriter.getReader(IndexWriter.java:370) at org.apache.lucene.index.StandardDirectoryReader.doOpenFromWriter(StandardDirectoryReader.java:285) at org.apache.lucene.index.StandardDirectoryReader.doOpenIfChanged(StandardDirectoryReader.java:260) at org.apache.lucene.index.StandardDirectoryReader.doOpenIfChanged(StandardDirectoryReader.java:250) at org.apache.lucene.index.DirectoryReader.openIfChanged(DirectoryReader.java:170) at org.apache.lucene.search.SearcherManager.refreshIfNeeded(SearcherManager.java:118) at org.apache.lucene.search.SearcherManager.refreshIfNeeded(SearcherManager.java:58) at org.apache.lucene.search.ReferenceManager.doMaybeRefresh(ReferenceManager.java:155) at org.apache.lucene.search.ReferenceManager.maybeRefresh(ReferenceManager.java:204) at org.elasticsearch.index.engine.robin.RobinEngine.refresh(RobinEngine.java:786) at org.elasticsearch.index.shard.service.InternalIndexShard.refresh(InternalIndexShard.java:459) at org.elasticsearch.index.shard.service.InternalIndexShard$EngineRefresher$1.run(InternalIndexShard.java:877) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {:timestamp=>"2014-11-03T17:45:44.814000+0100", :message=>"A plugin had an unrecoverable error. Will restart this plugin.\n Plugin: <LogStash::Inputs::Lumberjack ssl_certificate=>\"/opt/logstash/etc/selfsigned.crt\", ssl_key=>\"/opt/logstash/etc/selfsigned.key\", host=>\"0.0.0.0\">\n Error: GC overhead limit exceeded", :level=>:error} log4j, [2014-11-03T17:45:58.238] WARN: org.elasticsearch.index.merge.scheduler: [Strong Guy] [logstash-2014.11.03][0] failed to merge java.lang.IllegalStateException: this writer hit an OutOfMemoryError; cannot complete merge at org.apache.lucene.index.IndexWriter.commitMerge(IndexWriter.java:3482) at org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:4197) at org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:3654) at org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:405) at org.apache.lucene.index.TrackingConcurrentMergeScheduler.doMerge(TrackingConcurrentMergeScheduler.java:107) at org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:482) Exception in thread "elasticsearch[Strong Guy][[logstash-2014.11.03][0]: Lucene Merge Thread #241]" org.apache.lucene.index.MergePolicy$MergeException: java.lang.IllegalStateException: this writer hit an OutOfMemoryError; cannot complete merge at org.apache.lucene.index.ConcurrentMergeScheduler.handleMergeException(ConcurrentMergeScheduler.java:545) at org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider$CustomConcurrentMergeScheduler.handleMergeException(ConcurrentMergeSchedulerProvider.java:110) at org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:518) Caused by: java.lang.IllegalStateException: this writer hit an OutOfMemoryError; cannot complete merge at org.apache.lucene.index.IndexWriter.commitMerge(IndexWriter.java:3482) at org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:4197) at org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:3654) at org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:405) at org.apache.lucene.index.TrackingConcurrentMergeScheduler.doMerge(TrackingConcurrentMergeScheduler.java:107) at org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:482)

electrical commented 9 years ago

It seems you run the embedded ES instance. Since that can be a memory hog i would advise to run a separate ES instance.

bortek commented 9 years ago

Thanks guys for your input! The problem is solved for now by increasing Network timeout parameter on the logstash-forwarder from 15 seconds to 60 seconds.

"timeout": 60

It is more of a capacity issue than a problem with logstash :)

logstash-forwarder reads 1024 logrows at a time and IF there are lots of logs that must be sent to the server AND server cannot accept them within 15 seconds timeout then this attempts fails. Another thread is than started to do the same job and either it succeeds or fails again depending on the load on the server. So this build up a lot of threads that are working and most of the time failing and increasing resulting in OutOfMemory.

By increasing timeout we give more chance for the server to get all the data. Right now threads are on constant 125.

We have only one server, non-clustered, which is perhaps is not enough :) Hope this info is helpful for others .

suyograo commented 9 years ago

Closing this. Please open a new issue if you run into similar problems