logstash-plugins / logstash-output-stomp

Apache License 2.0
4 stars 13 forks source link

Output to ActiveMQ cause java.lang.OutOfMemoryError #5

Open jpastuszek opened 8 years ago

jpastuszek commented 8 years ago

I am outputting messages to ActiveMQ and messages are getting delivered. But the plugin accumulates OnStomp::Components::Frame objects until it runs out of heap space. So no matter how much heap spec I give it it will eventually (matter of minutes in my case) run out of memory. If I use redis output plugin instead all is stable.

input {
    redis {
        host => "prod.log.abc.net"
        data_type => "channel"
        key => "event://prod.logstash.abc.net/logline"
        codec => json
    }
}
input {
    syslog {
        type => syslog
        #NOTE: needs root to run on 514
        port => 5514
    }
}

filter {
    if [type] == "syslog" {
        mutate {
            replace => {
                "_original_syslog_message" => "%{message}"
            }
        }

        if "_grokparsefailure" in [tags] {
            # Try more relaxed pattern to parse syslog
            # cron run-parts send () chars in program name which will not match built in %{PROG} pattern - match any prog name
            grok {
                overwrite => "message"
                match => {
                    "message" => "(?m)<%{POSINT:priority}>(?:%{SYSLOGTIMESTAMP:timestamp}|%{TIMESTAMP_ISO8601:timestamp8601}) (?:%{SYSLOGFACILITY} )?(:?%{SYSLOGHOST:logsource} )?(?<program>[^ \[]+)(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:message}"
                }
                add_tag => ["class:syslog/relaxed"]
                remove_tag => ["_grokparsefailure"]
                tag_on_failure => ["failed:syslog/relaxed"]
            }
        }

        # Handle LF escape http://blog.gerhards.net/2013/09/imfile-multi-line-messages.html
        # Note: can't be done with mutate gsub since parser of this file fill fail on \n... perhaps fixed in latest
        ruby { code => "event['message'].gsub!('#012', 10.chr)" }

        # If we have timestamp8601 than we use it as time source (has millisecond)
        if [timestamp8601] {
            mutate {
                rename => {
                    "timestamp8601" => "timestamp"
                }
            }
        }

        # Set @timestamp to syslog event time
        if [timestamp] {
            date {
                match => [ "timestamp",
                    "MMM  d HH:mm:ss",
                    "MMM dd HH:mm:ss",
                    "ISO8601"
                ]
                remove_field => 'timestamp'
                timezone => 'UTC'
            }
        }

        # Value can be any of: "emergency" , "alert", "critical", "error", "warning", "notice", "informational", "debug"
        syslog_pri {
            syslog_pri_field_name => "priority"
            add_tag => ["class:syslog"]
        }
        mutate {
            rename => [
                "syslog_severity", "log_level",
                "syslog_facility", "facility"
            ]
            remove_field => [
                priority,
                severity,
                facility_label,
                severity_label,
                syslog_facility_code,
                syslog_severity_code
            ]
        }

        # Add missing fileds
        if ! [logsource] {
            mutate {
                replace => {
                    "logsource" => "%{host}"
                }
            }
        }

        if ! [program] {
            mutate {
                replace => {
                    "program" => "unknown"
                }
            }
        }

        # Normailze and fliter control chars
        if [message] {
            mutate {
                gsub => ["message", "\r\n", "\n"]
            }
            mutate {
                gsub => ["message", "\r", ""]
            }
            mutate {
                gsub => ["message", "\n$", ""]
            }
        }

        # Assign ID
        uuid {
            target => "@id"
        }
    }
}

output {
    stomp {
        host => "127.0.0.1"
        destination => "/queue/logstash_indexer_1"
        user => "logstash"
        password => "logstash"
    }
}