logstash-plugins / logstash-input-syslog

Apache License 2.0
37 stars 38 forks source link

Support RFC5424 message format #15

Open jordansissel opened 9 years ago

jordansissel commented 9 years ago

(This issue was originally filed by @suyograo at https://github.com/elastic/logstash/issues/1667)


Logstash has the syslog input which only supports messages in RFC3164 (with some modifications). It would be useful to add a codec which supports RFC5424 messages which could be used with inputs like TCP. With this support users would not have to use a grok filter with SYSLOG5424LINE pattern

arabold commented 9 years ago

Loggly, Logentries and other SaaS logging services are using RFC5424 for shipping logs to their servers. Even if Logstash has it's own format (Lumberjack), I favor using syslog for interoperability with those services. I've started to write my own parsing in Logstash, to work around the limitations of the logstash-input-syslog plugin. So I'll leave this here in case it might be helpful for someone:

input {
  tcp {
    type => "syslog"
    port => 5544
    tags => []
    add_field => { "_index_name" => "syslog" }
  }
}
filter {
    # Manually parse the log, as we want to support both RCF3164 and RFC5424
    grok {
      break_on_match => true
      match => [ 
        "message", "%{SYSLOG5424LINE}",
        "message", "%{SYSLOGLINE}"
      ]
    }

    if [syslog5424_ts] {
      # Handle RFC5424 formatted Syslog messages
      mutate {
        remove_field => [ "message", "host" ]
        add_tag => [ "syslog5424" ]
      }
      mutate {
        # Use a friendlier naming scheme
        rename => { 
          "syslog5424_app"  => "program"
          "syslog5424_msg"  => "message"
          "syslog5424_host" => "host"
        }
        remove_field => [ "syslog5424_ver", "syslog5424_proc" ]
      }
      if [syslog5424_pri] {
        # Calculate facility and severity from the syslog PRI value
        ruby {
          code => "event['severity'] = event['syslog5424_pri'].modulo(8)"
        }
        ruby {
          code => "event['facility'] = (event['syslog5424_pri'] / 8).floor"
        }
        mutate {
          remove_field => [ "syslog5424_pri" ]
        }
      }
      if [syslog5424_sd] {
        # All structured data needs to be in format [key=value,key=value,...]
        mutate {
          # Remove wrapping brackets
          gsub => [ "syslog5424_sd", "[\[\]]", "" ]
        }
        kv {
          # Convert the structured data into Logstash fields
          source => "syslog5424_sd"
          field_split => ","
          value_split => "="
          remove_field => [ "syslog5424_sd" ]
        }
      }
      date {
        match => [ "syslog5424_ts", "ISO8601" ]
        remove_field => [ "syslog5424_ts", "timestamp" ]
      }
    }
    else {
      # Handle RFC3164 formatted Syslog messages
      mutate {
        add_tag => [ "syslog3164" ]
      }
    }
}
output {
  # ...
}

This works pretty with in combination with Logspout.

KeaneYu commented 9 years ago

+1

dngferreira commented 8 years ago

+1

ivan-mjch commented 8 years ago

@arabold Thanks for the config, but you do realise, that the snippet above parses attributes in a form incompatible with RFC5424?

Implementation listed above is incorrect and lacking in multiple ways, if intended to parse RFC5424, aside from using a kv format incompatible with RFC5424. This is not meant as a critique of arabold's code, rather to show those interested in using RFC5424 format, that the above code is not RFC5424 compliant and how much more would be needed to properly parse and handle it.

As usual, this is my simplistic opinion and may be incorrect and incomplete:

dforste commented 8 years ago

If you want to you can augment the above with this code in place of the kv filter. You also should remove the Mutate as it breaks the format of the field. The below should be RFC 5424 Compliant without taking into account encoding issues that may be present.

    if [syslog5424_sd] {
      ruby {
        code => '
          def extract_syslog5424_sd( syslog5424_sd )
            sd = {}
            syslog5424_sd.scan(/\[(?<element>.*?[^\\])\]/) { |element| 
              data = element[0].match(/(?<sd_id>[^\ ]+) (?<sd_params>.*)/)
              sd[data[:sd_id]] = {}
              data[:sd_params].scan(/(.*?[=]".*?[^\\]")/) { |set| 
                set = set[0].match(/(?<param_name>.*?)[=](?<param_value>.*)/)
                sd[data[:sd_id]][set[:param_name].lstrip] = set[:param_value]
              }
            }
            return sd
          end 
          event["syslog5424_sd"] = extract_syslog5424_sd( event["syslog5424_sd"] )
        '
      }
    }
    date {
      match => [ "syslog5424_ts", "ISO8601" ]
      remove_field => [ "syslog5424_ts", "timestamp" ]
    }
  }
idsvandermolen commented 8 years ago

You probably would want to strip double quotes from param_value by modifying line in above ruby filter with:

set = set[0].match(/(?<param_name>.*?)[=]\"(?<param_value>.*)\"/)
ghost commented 8 years ago

Is there any sight on this feature being implemented in the near future? I need to capture syslog from 2000+ cisco switches and routers, currently it is nearly impossible to do so.

martsa1 commented 7 years ago

In case anyone else ends up here looking for a way to work with syslog in either RFC5424 or RFC3164, I had to make some tweaks to the above suggestions to support logstash 5.x.

I also noted an issue in the RegEx that caused failures if an SD Element has no SD Params. According to the spec an SD Element can have 0+ Params.

def extract_syslog5424_sd(syslog5424_sd)
  sd = {}
  syslog5424_sd.scan(/\[(?<element>.*?[^\\])\]/) do |element|
    data = element[0].match(/(?<sd_id>[^\ ]+)(?<sd_params> .*)?/)
    sd[data[:sd_id]] = {}
    next if data.nil? || data[:sd_params].nil?
    data[:sd_params].scan(/(.*?[=]".*?[^\\]")/) do |set|
      set = set[0].match(/(?<param_name>.*?)[=]\"(?<param_value>.*)\"/)
      sd[data[:sd_id]][set[:param_name].lstrip] = set[:param_value]
    end
  end
  sd
end
event.set('syslog5424_sd', extract_syslog5424_sd(event.get('syslog5424_sd')))

Note also that I had to concatenate that with ;'s as well which looks like:

ruby {
          code => 'def extract_syslog5424_sd(syslog5424_sd); sd = {}; syslog5424_sd.scan(/\[(?<element>.*?[^\\])\]/) do |element|; data = element[0].match(/(?<sd_id>[^\ ]+)(?<sd_params> .*)?/); sd[data[:sd_id]] = {}; next if data.nil? || data[:sd_params].nil?; data[:sd_params].scan(/(.*?[=]".*?[^\\]")/) do |set|; set = set[0].match(/(?<param_name>.*?)[=]\"(?<param_value>.*)\"/); sd[data[:sd_id]][set[:param_name].lstrip] = set[:param_value]; end; end; sd; end; event.set("syslog5424_sd", extract_syslog5424_sd(event.get("syslog5424_sd")));'
     }

Thanks for the template to work with!

jobec commented 6 years ago

There's a small bug in @ABitMoreDepth 's code. If you send an empty value (e.g. x="") it breaks. I had to change this line data[:sd_params].scan(/(.*?[=]".*?[^\\]")/) do |set| into data[:sd_params].scan(/ (.*?[=](?:""|".*?[^\\]"))/) do |set| eventually ending up with this complete config...

filter {
    grok {
        match => {
            "message" => "<%{NONNEGINT:syslog_pri}>%{NONNEGINT:version}%{SPACE}(?:-|%{TIMESTAMP_ISO8601:syslog_timestamp})%{SPACE}(?:-|%{IPORHOST:hostname})%{SPACE}(?:%{SYSLOG5424PRINTASCII:program}|-)%{SPACE}(?:-|%{SYSLOG5424PRINTASCII:process_id})%{SPACE}(?:-|%{SYSLOG5424PRINTASCII:message_id})%{SPACE}(?:-|(?<structured_data>(\[.*?[^\\]\])+))(?:%{SPACE}%{GREEDYDATA:syslog_message}|)"
        }
        add_tag => [ "match" ]
    }
    if "match" in [tags] {
        syslog_pri {
            remove_field => "syslog_pri"
        }
        date {
            match => [ "syslog_timestamp", "ISO8601" ]
            remove_field => "syslog_timestamp"
        }
        if [structured_data] {
            ruby {
                code => '
                    def extract_syslog5424_sd(syslog5424_sd)
                        sd = {}
                        syslog5424_sd.scan(/\[(?<element>.*?[^\\])\]/) do |element|
                            data = element[0].match(/(?<sd_id>[^\ ]+)(?<sd_params> .*)?/)
                            sd_id = data[:sd_id].split("@", 2)[0]
                            sd[sd_id] = {}
                            next if data.nil? || data[:sd_params].nil?
                            data[:sd_params].scan(/ (.*?[=](?:""|".*?[^\\]"))/) do |set|
                                set = set[0].match(/(?<param_name>.*?)[=]\"(?<param_value>.*)\"/)
                                sd[sd_id][set[:param_name]] = set[:param_value]
                            end
                        end
                        sd
                    end
                    event.set("[sd]", extract_syslog5424_sd(event.get("[structured_data]")))
                '
                remove_field => "structured_data"
            }
        }
    }
}
tomsommer commented 6 years ago

+1 for handling this natively

caryyu commented 6 years ago

+1

JPvRiel commented 6 years ago

While I agree it would be nice for logstash to natively support RFC5424, some pointers from a lot of experience with this amounted to:

If you want to go with the rsyslog + kafka approach, I did build a container to tackle it docker-rsyslog. However, I noticed that rsyslog upstream is finally maturing their support for containers and there are official rsyslog images.

Not necessarily promoting rsyslog here, given it (in my opinion) has fairly complicated configuration syntax and its documentation lags often referencing old legacy style syslogd config syntax, but I can vouch for it performing remarkably well.

If you want to stick to pure logstash, my hack was the following syslog grok pattern which you can use in conjunction with tcp / udp input plugins.

# This is a flexable grok pattern file for syslog. By default, it attempts to be
# relaxed and accomodate implimentation variations.

# valid priority range from 0 to 191, but 00 or 001 technically not legitimate
# according to RFC 3164.
SYSLOGPRINUMSTRICT (?:0|(?:(?:[1-9][0-9])|(?:1[0-8][0-9])|(?:19[0-1])))
# the example below is less precise but hopefully faster. Rather use range 
# checking logic in conf.
SYSLOGPRINUMRELAXED [0-9]{1,3}
SYSLOGPRISTRICT <%{SYSLOGPRINUMSTRICT:priority:int}>
SYSLOGPRIRELAXED <%{SYSLOGPRINUMRELAXED:priority:int}>
SYSLOGPRI %{SYSLOGPRIRELAXED}

# RFC3164
SYSLOG3164TIMESTAMPSTRICT (?:(?:Jan)|(?:Feb)|(?:Mar)|(?:Apr)|(?:May)|(?:Jun)|(?:Jul)|(?:Aug)|(?:Sep)|(?:Oct)|(?:Nov)|(?:Dec)) (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) (?:2[0123]|[01][0-9]):(?:[0-5][0-9]):(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
# Try be even more flexable then RFC3164 and also allow ISO8601 timestamps.
SYSLOG3164TIMESTAMPRELAXED (?:%{MONTH} +%{MONTHDAY} %{TIME})|%{TIMESTAMP_ISO8601}
SYSLOG3164TIMESTAMP %{SYSLOG3164TIMESTAMPRELAXED:timestamp_source}
# Hostname or IP allowed in RFC 3164, but not supposed to be FQDN. Can be 
# flexable and allow it.
HOSTNAMEONLY (?!-)[a-zA-Z0-9-]{1,63}(?<!-)
SYSLOG3164HOSTNAMESTRICT (?:%{HOSTNAMEONLY}|%{IP})
SYSLOG3164HOSTNAMERELAXED %{IPORHOST}
SYSLOG3164HOSTNAME %{SYSLOG3164HOSTNAMERELAXED:host_source}
# For the RFC3164 header, avoid matching RFC 5424 with a negative lookhead for a
# 5424 version number. Also assume that given a timestamp, a hostname aught 
# to follow 
SYSLOG3164HDR ^(?:%{SYSLOGPRI}(?!%{SYSLOG5424VER} ))?(?:%{SYSLOG3164TIMESTAMP} (:?%{SYSLOG3164HOSTNAME} )?)?
# The pattern below is bit stricter than the RFC definiton for tags. Technically 
# the tag is supposed to be only alphanumeric and terminate on first 
# non-alphanum character. However, many programs don't obey that. Generally 
# a colon or left sqaure bracket terminates the tag. In addition, exclude '<'
# character as not appropriate for a program name, given it can cause confusion 
# with a syslog priority header
SYSLOG3164TAG [^:\[<]{1,32}
SYSLOG3164PID \[%{POSINT:pid}\]
SYSLOG3164CONTENT %{GREEDYDATA:message_content}
SYSLOG3164MSG (%{SYSLOG3164TAG:program}(?:%{SYSLOG3164PID})?: ?)?%{SYSLOG3164CONTENT}
SYSLOG3164 %{SYSLOG3164HDR}%{SYSLOG3164MSG:message_syslog}

# RFC5424
SYSLOG5424VER [0-9]{1,2}
# Timestamp is ISO8601 - the version in grok-patterns wasn't as strict as it was defined in the RFC
SYSLOG5424TIMESTAMPSTRICT [0-9]{4}-(?:0[1-9]|1[0-2])-(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])T(?:[01][0-9]|2[0123]):(?:[0-5][0-9]):(?:[0-5][0-9])(?:[.][0-9]{1,6})?(?:Z|[+-](?:[01][0-9]|2[0123]):[0-5][0-9])
SYSLOG5424TIMESTAMPRELAXED %{TIMESTAMP_ISO8601}
SYSLOG5424TIMESTAMP %{SYSLOG5424TIMESTAMPRELAXED}
# Hostname can be FQDN, DNS label/hostname only or IP
SYSLOGRFC5424HOSTNAME %{IPORHOST}
SYSLOG5424PRINTASCII [!-~]+
SYSLOG5424APPNAME [!-~]{1,48}
SYSLOG5424PROCID [!-~]{1,128}
SYSLOG5424MSGID [!-~]{1,32}
# Practically, only one version for now, and trying to parse future versions 
# would be unwise. So 1 'hardcoded'.
SYSLOG5424HDR ^%{SYSLOGPRI}1 (?:%{SYSLOG5424TIMESTAMP:timestamp_source}|-) (?:%{SYSLOGRFC5424HOSTNAME:host_source}|-) (?:%{SYSLOG5424APPNAME:program}|-) (?:%{SYSLOG5424PROCID:pid}|-) (?:%{SYSLOG5424MSGID:msgid}|-)
# Replace the 1 above with %{SYSLOG5424VER:syslog_version} to cater for 
# additional versions.
SYSLOG5424STRUCTDATA \[%{DATA}\]+
SYSLOG5424MSG %{GREEDYDATA:message_content}
SYSLOG5424 %{SYSLOG5424HDR} (?<message_syslog>(?:%{SYSLOG5424STRUCTDATA:structured_data}|-)( ?%{SYSLOG5424MSG})?)

# Try match and capture RFC 5424 first, given RFC 3164 allows messages without any syslog header. 
# Otherwise, RFC 3164 could accidentally capture an RFC 5424 priority and header as the tag or host of a raw message
SYSLOG %{SYSLOG5424}|%{SYSLOG3164}

And this is an example logstash conf using the above

input {
  tcp {
    host => "127.0.0.1"
    port => "5514"
    type => "syslog"
  }
  udp {
    host => "127.0.0.1"
    port => "5514"
    type => "syslog"
  }
}

# determine and parse type of syslog message
filter {
  if [type] == "syslog" { 

    # look for and, if found, decode syslog priority
    if [message] =~ "^<[0-9]{1,3}>" { 
      grok {
        match => [ "message", "^<%{NONNEGINT:priority:int}>" ]
      }
      if [priority] <= 191 {
        # check for RFC 3164 vs RFC 5424
        if [message] =~ "^<[0-9]{1,3}>[0-9]{1,2} " {
          mutate {
            add_tag => ["syslog_rfc5424"]
          }
        }
        else {
      mutate {
            add_tag =>  ["syslog_rfc3164"]
          }
        }
      }
      else {
        mutate {  
          add_tag => ["syslog_priority_invalid"]
        }
      }
    } 
    else {
      # only RFC 3164 allows a message to specify no priority
      mutate {  
    add_tag => [ "syslog_rfc3164", "syslog_priority_missing" ]
      }
    }
    # RFC 3164 suggests adding priority if it's missing. 
    # Even if missing, syslog_pri filter adds the default priority.
    syslog_pri {
      syslog_pri_field_name => "priority"
    }
    mutate {
      # follow convention used by logstash syslog input plugin
      rename => { "syslog_severity_code" => "severity" }
      rename => { "syslog_facility_code" => "facility" }
      rename => { "syslog_facility" => "facility_label" }
      rename => { "syslog_severity" => "severity_label" }
    }

    # parse both RFC 3164 and 5424
    grok {
      patterns_dir => "../patterns"
      match => [ "message", "%{SYSLOG}" ]
      tag_on_failure => [ "_grokparsefailure_syslog" ]
    }

    # Check if a timestamp source was found and work out elapsed time recieving log
    # Note, mutate filter will convert a date object to a string not in ISO8601 format, so rather use ruby filter
    ruby {
        code => "event['timestamp_logstash'] = event['@timestamp']"
    }
    if [timestamp_source] {
      date {
        locale => en
        # assume timezone for cases where it isn't provided
        timezone => "Africa/Johannesburg"
        match => [ "timestamp_source", "MMM d H:m:s", "MMM d H:m:s", "ISO8601" ]
      }
      # add a field for delta (in seconds) between logsource and logstash
      ruby {
        code => "event['time_elapsed_logstash'] = event['timestamp_logstash'] - event['@timestamp']"
      }
    }
    else {
      mutate {
    add_tag => ["syslog_timestamp_source_missing"]
      }
    }

    # Check if a host source was found
    if ! [host_source] {
      mutate {
    add_tag => ["syslog_host_source_missing"]
      }
    }

    # discard redundant info
    mutate {
      remove_field => [ "priority" ] #redundant and less useful once severity and facility are decoded
      replace => { "message" => "%{message_content}" } 
      remove_field => [ "message_syslog", "message_content" ] #already in content message
    } 
  }
}

output {
  stdout { codec => rubydebug }
}

If you look at the above mess, running rsyslog and dealing with it's cumbersome config to produce nice JSON output is possibly worth the pain anyhow.

jobec commented 6 years ago

That doesn't parse the structured data in the message into separate fields.

And also, it also isn't correctly extracting the structured data because it doesn't take into account that there can be a ] somewhere in the structured data.

Try putting this through your rule:

<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog - ID47 [exampleSDID@32473 iut="3" eventSource="Application[\"123\"]" eventID="1011"] An application event log entry...
JPvRiel commented 6 years ago

That doesn't parse the structured data in the message into separate fields.

Yip, what I did was limited and abandoned once I hit performance limitations and hadn't yet worried about structured data... @ABitMoreDepth's approach and your adaptation from it will likely work better for using native logstash, which given lower volumes or lots of CPU to throw at the problem is less effort than the rsyslog way of things (which can also neatly parse structured element data into JSON objects and index directly to elastic if you like)...

Hacking at logstash to handle syslog at high volumes is something I recommend against (unless someone comes up with some small miracles to make this input plugin perform better).

martsa1 commented 6 years ago

@JPvRiel do you have any numbers for the kind of performance you were seeing before you moved away from logstash?

I haven't had the chance to get much volume through the logstash plugin yet (couple of thousand per second max, but only in short bursts). I was planning to scale the logstash container and handle volume horizontally when the need arises, but its an interesting point you make about CPU consumption, I just wondered what kind of throughput you got to before starting to feel that logstash wasn't going to scale well enough for you?

1 million events per second is pretty hefty!

JPvRiel commented 6 years ago

do you have any numbers for the kind of performance you were seeing before you moved away from logstash?

It was about about 2 years ago I moved away from my regex/grok pattern hackery for RFC5424 in logstash and onto rsyslog. At the time, I don't recall benchmarking it in detail, since one can't simply just run time rsyslog ... vs time logstash ... with say one million sample messages, given that unfair heavy startup cost jruby would have vs rsyslog (native c). Perhaps letting both initialise, then gathering before and after cpu counters for processing n of the same sample message set could be a way to quantify it.

However, I didn't go as far as the above because I know rsyslog has "hand crafted" parser modules for RFC3164 and RFC5424 aimed specifically at just those kinds of syslog headers, without needing to rely on heavier generic pattern matching regexes. I'm venturing a safe bet that rsyslog (C code with targeted header specific string matching) versus logstash (JVM + JRuby + regex) is going to be no contest.

1 million events per second is pretty hefty!

Just firewalls, DNS, and proxies logging into syslog pushes volume quickly. Add the trend of microservices in even medium size organisations can start loading a central syslog service.

Trolldemorted commented 4 years ago

This issue has been open for over 5 years. Any chance logstash will get native support for RFC 5424 soonish? The RFC will literally be older than our interns soon, and parts of our equipment use RFC 5424 only :|

ypid-geberit commented 4 years ago

@Trolldemorted Just my two cents. Possibly look at alternative solutions for parsing (RFC 5424) logs. Some that come to my mind:

JPvRiel commented 4 years ago

Possibly look at alternative solutions for parsing (RFC 5424) logs

Yes, exactly! I ended up using rsyslog. Once you work past it's niche config syntax (bit of a learning curve!), it's able to output syslog as JSON. It also has a an elasticsearch or kafka module for output.

My workaround was rsyslog in a container (https://github.com/JPvRiel/docker-rsyslog), but I also used kafka since logstash buffering/queuing support wasn't a feature until fairly recently. From this example, I learnt rsyslog has mature and performant syslog handling features (consumes much less CPU compared to logstash!), including parsing both RFC3164, RFC5424 and being able to deal with odd legacy operating systems like Solaris and AIX, neither of which follow the RFCs nicely.

hackery commented 3 years ago

Is there something fundamentally hard about implementing this natively in Logstash? RFC5424 is widespread and the workarounds all seem pretty ugly. Surprised no contributors weighing in on this since @jordansissel opened the issue ...

JPvRiel commented 3 years ago

RFC5424 is widespread

While the RFC is widespread, you'll be surprised at how many vendors don't properly follow it and add special quirks. For teams who have just one or two well formatted syslog sources properly formatted, sure, a better native RFC5424 module support would be helpful. But try do this at scale in enterprise with odd things like Solaris, AIX, security devices that don't even follow RFC3164 nicely, etc and you'll find that the 10+ years of hacking done in rsyslog can handle most situations with performant C code much better than any attempt to re-implement all that quirky logic in logstash.

Weather implemented in Java (or worse JRuby), I'm doubting it's going to be quick or compete with rsyslog in terms of performance. Rsyslog also has an omelasticsearch output module.

Again, too bad rsyslog custom config syntax is even worse than Logstash's config / domain specific language...

Hopefully some day the legacy way of logging with syslog dies (it's over 20 years old now) and more mature JSON-based formats with HTTP event collection patterns emerge. Until then, you'll be hard pressed to find a better fully open source implementation than rsyslog.

ypid-geberit commented 3 years ago

you'll be surprised at how many vendors don't properly follow it and add special quirks.

I fully agree …

Until then, you'll be hard pressed to find a better fully open source implementation than rsyslog.

Hands down, look at http://vector.dev/, it is not yet feature complete with Logstash but we already replaced parts of what was previously done with Logstash. It can even replace most of the Beats products like Filebeat.