tsaikd / gogstash

Logstash like, written in golang
MIT License
647 stars 106 forks source link

conditional branching? #80

Open horacimacias opened 5 years ago

horacimacias commented 5 years ago

I'm trying to migrate from logstash to gogstash as the memory usage of my environment is getting out of hand. I couldn't find anything regarding conditional branching on input/filter/output elements like logstash does. Is there any similar thing in gogstash?

tengattack commented 5 years ago

You can use filter/cond & output/cond.

This is what I'm using in production (grok may has some errors as copy from terminal new line):

chsize: 5000
worker: 4
event:
  sort_map_keys: true
  remove_field: ['@metadata']

input:
  - type: beats
    port: 5065
    reuseport: true
  - type: socket
    socket: udp
    address: '0.0.0.0:5066'
    reuseport: true
    buffer_size: 65536

filter:
  - type: cond
    condition: "[fileset.module] == 'nginx' && [fileset.name] == 'access'"
    filter:
      - type: grok
        match:
          - "%{IPORHOST:nginx.access.remote_ip} - %{DATA:nginx.access.user_name} \\[%{HTTPDATE:nginx.access.time}(?:| %{NUMBER:nginx.access.timestamp})\\] \"%{DATA:nginx.access.host}\" \"%{WORD:nginx.access.method} %{DATA:nginx.access.url} HTTP/%{NUMBER:nginx.access.http_version}\" %{NUMBER:nginx.access.response_code:int} %{NUMBER:nginx.access.body_sent.bytes} \"%{DATA:nginx.access.referrer}\" \"%{DATA:nginx.access.agent}\" \"(?:-|%{DATA:nginx.access.x_forwarded_for})\" %{NUMBER:nginx.access.request_time:float} (?:%{NUMBER:nginx.access.upstream_response_time:float}|-)"
        source: message
        #patterns_path: /etc/gogstash/grok-patterns
      - type: add_field
        key: 'read_timestamp'
        value: '%{@timestamp}'
      - type: cond
        condition: "!('gogstash_filter_grok_error' IN map([tags]))"
        filter:
          - type: remove_field
            remove_message: true
          - type: cond
            condition: "!empty([nginx.access.timestamp])"
            filter:
              - type: date
                format: ["UNIX"]
                source: "nginx.access.timestamp"
            else_filter:
              - type: date
                format: ["dd/MMM/YYYY:H:m:s Z"]
                joda: true
                source: "nginx.access.time"
          - type: cond
            condition: "!('gogstash_filter_date_error' IN map([tags]))"
            filter:
              - type: remove_field
                fields: ['nginx.access.timestamp', 'nginx.access.time']
          - type: cond
            condition: "!empty([nginx.access.x_forwarded_for])"
            filter:
              - type: mutate
                split: ['nginx.access.x_forwarded_for', ', ']
            #else_filter:
          - type: useragent
            regexes: /etc/gogstash/regexes.yaml
            source: 'nginx.access.agent'
            target: 'nginx.access.user_agent'
          - type: geoip2
            db_path: /etc/gogstash/GeoLite2-City.mmdb
            ip_field: 'nginx.access.remote_ip'
            key: 'nginx.access.geoip'
            quiet: true
            skip_private: true
            flat_format: true

  - type: cond
    condition: "[fileset.module] == 'nginx' && [fileset.name] == 'error'"
    filter:
      - type: grok
        match:
          - "(?m)%{DATA:nginx.error.time} \\[%{DATA:nginx.error.level}\\] %{NUMBER:nginx.error.pid}#%{NUMBER:nginx.error.tid}: (\\*%{NUMBER:nginx.error.connection_id} )?%{GREEDYDATA:nginx.error.message}"
      - type: add_field
        key: 'read_timestamp'
        value: '%{@timestamp}'
      - type: cond
        condition: "!('gogstash_filter_grok_error' IN map([tags]))"
        filter:
          - type: remove_field
            remove_message: true
          - type: cond
            condition: "!empty([beat.timezone])"
            filter:
              - type: add_field
                key: 'nginx.error.time'
                value: '%{nginx.error.time} %{beat.timezone}'
          - type: date
            format: ["2006/01/02 15:04:05 -07:00", "2006/01/02 15:04:05"]
            source: "nginx.error.time"
          - type: cond
            condition: "!('gogstash_filter_date_error' IN map([tags]))"
            filter:
              - type: remove_field
                fields: ['nginx.error.time']

  - type: cond
    condition: "[fileset.module] == 'system' && [fileset.name] == 'auth'"
    filter:
      - type: grok
        patterns:
          GREEDYMULTILINE: "(.|\\n)*"
        match:
          - "%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} sshd(?:\\[%{POSINT:system.auth.pid}\\])?: %{DATA:system.auth.ssh.event} %{DATA:system.auth.ssh.method} for (invalid user )?%{DATA:system.auth.user} from %{IPORHOST:system.auth.ssh.ip} port %{NUMBER:system.auth.ssh.port} ssh2(: %{GREEDYDATA:system.auth.ssh.signature})?"
          - "%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} sshd(?:\\[%{POSINT:system.auth.pid}\\])?: %{DATA:system.auth.ssh.event} user %{DATA:system.auth.user} from %{IPORHOST:system.auth.ssh.ip}"
          - "%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} sshd(?:\\[%{POSINT:system.auth.pid}\\])?: Did not receive identification string from %{IPORHOST:system.auth.ssh[dropped_ip]}"
          - "%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} sudo(?:\\[%{POSINT:system.auth.pid}\\])?: \\s*%{DATA:system.auth.user} :( %{DATA:system.auth.sudo.error} ;)? TTY=%{DATA:system.auth.sudo.tty} ; PWD=%{DATA:system.auth.sudo.pwd} ; USER=%{DATA:system.auth.sudo.user} ; COMMAND=%{GREEDYDATA:system.auth.sudo.command}"
          - "%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} groupadd(?:\\[%{POSINT:system.auth.pid}\\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}"
          - "%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} useradd(?:\\[%{POSINT:system.auth.pid}\\])?: new user: name=%{DATA:system.auth.user.add.name}, UID=%{NUMBER:system.auth.user.add.uid}, GID=%{NUMBER:system.auth.user.add.gid}, home=%{DATA:system.auth.user.add.home}, shell=%{DATA:system.auth.user.add.shell}$"
          - "%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} %{DATA:system.auth.program}(?:\\[%{POSINT:system.auth.pid}\\])?: %{GREEDYMULTILINE:system.auth.message}"
      - type: cond
        condition: "!('gogstash_filter_grok_error' IN map([tags]))"
        filter:
          - type: remove_field
            remove_message: true
          - type: cond
            condition: "!empty([beat.timezone])"
            filter:
              - type: add_field
                key: 'system.auth.time'
                value: '%{+@2006} %{system.auth.timestamp} %{beat.timezone}'
            else_filter:
              - type: add_field
                key: 'system.auth.time'
                value: '%{+@2006} %{system.auth.timestamp}'
          - type: date
            format: ["2006 Jan 02 15:04:05 -07:00", "2006 Jan  2 15:04:05 -07:00", "2006 Jan 02 15:04:05", "2006 Jan  2 15:04:05"]
            source: "system.auth.time"
          - type: cond
            condition: "!('gogstash_filter_date_error' IN map([tags]))"
            filter:
              - type: remove_field
                fields: ['system.auth.time']
          - type: geoip2
            db_path: /etc/gogstash/GeoLite2-City.mmdb
            ip_field: 'system.auth.ssh.ip'
            key: 'system.auth.ssh.geoip'
            quiet: true
            skip_private: true
            flat_format: true
            cache_size: 100

  - type: cond
    condition: "[fileset.module] == 'system' && [fileset.name] == 'syslog'"
    filter:
      - type: grok
        patterns:
          GREEDYMULTILINE: "(.|\\n)*"
        match: 
          - "%{SYSLOGTIMESTAMP:system.syslog.timestamp} %{SYSLOGHOST:system.syslog.hostname} %{DATA:system.syslog.program}(?:\\[%{POSINT:system.syslog.pid}\\])?: %{GREEDYMULTILINE:system.syslog.message}"
      - type: cond
        condition: "!('gogstash_filter_grok_error' IN map([tags]))"
        filter:
          - type: remove_field
            remove_message: true
          - type: cond
            condition: "!empty([beat.timezone])"
            filter:
              - type: add_field
                key: 'system.syslog.time'
                value: '%{+@2006} %{system.syslog.timestamp} %{beat.timezone}'
            else_filter:
              - type: add_field
                key: 'system.syslog.time'
                value: '%{+@2006} %{system.syslog.timestamp}'
          - type: date
            format: ["2006 Jan 02 15:04:05 -07:00", "2006 Jan  2 15:04:05 -07:00", "2006 Jan 02 15:04:05", "2006 Jan  2 15:04:05"]
            source: "system.syslog.time"
          - type: cond
            condition: "!('gogstash_filter_date_error' IN map([tags]))"
            filter:
              - type: remove_field
                fields: ['system.syslog.time']

output:
  #- type: stdout
  - type: cond
    condition: "[fileset.module] == 'nginx'"
    output:
      - type: elastic
        url: ["http://logstash:******@172.16.141.226:9200"]
        index: "nginx-%{+@2006.01.02}"
        document_type: "doc"
    else_output:
      - type: elastic
        url: ["http://logstash:******@172.16.141.226:9200"]
        index: "%{@metadata.beat}-%{@metadata.version}-%{+@2006.01.02}"
        document_type: "doc
horacimacias commented 5 years ago

awesome thanks! I'll give this a try.

On Tue, Mar 26, 2019 at 2:56 AM tengattack notifications@github.com wrote:

You can use filter/cond & output/cond.

This is what I'm using in production:

chsize: 5000worker: 4event: sort_map_keys: true remove_field: ['@metadata'] input:

  • type: beats port: 5065 reuseport: true

  • type: socket socket: udp address: '0.0.0.0:5066' reuseport: true buffer_size: 65536 filter:

  • type: cond condition: "[fileset.module] == 'nginx' && [fileset.name] == 'access'" filter:

    • type: grok match:
      • "%{IPORHOST:nginx.access.remote_ip} - %{DATA:nginx.access.user_name} \[%{HTTPDATE:nginx.access.time}(?:| %{NUMBER:nginx.access.timestamp})\] \"%{DATA:nginx.access.host}\" \"%{WORD:nginx.access.method} %{DATA:nginx.access.url} HTTP/%{NUMBER:nginx.access.http_version}\" %{NUMBER:nginx.accesss.response_code:int} %{NUMBER:nginx.access.body_sent.bytes} \"%{DATA:nginx.access.referrer}\" \"%{DATA:nginx.access.agent}\" \"(?:-|%{DATA:nginx.access.x_forwarded_for})\" ?%{NUMBER:nginx.access.request_time:float} (?:%{NUMBER:nginx.access.upstream_response_time:float}|-)" source: message

        patterns_path: /etc/gogstash/grok-patterns

    • type: add_field key: 'read_timestamp' value: '%{@timestamp}'
    • type: cond condition: "!('gogstash_filter_grok_error' IN map([tags]))" filter:
      • type: remove_field remove_message: true
      • type: cond condition: "!empty([nginx.access.timestamp])" filter:
        • type: date format: ["UNIX"] source: "nginx.access.timestamp" else_filter:
        • type: date format: ["dd/MMM/YYYY:H:m:s Z"] joda: true source: "nginx.access.time"
      • type: cond condition: "!('gogstash_filter_date_error' IN map([tags]))" filter:
        • type: remove_field fields: ['nginx.access.timestamp', 'nginx.access.time']
      • type: cond condition: "!empty([nginx.access.x_forwarded_for])" filter:
        • type: mutate split: ['nginx.access.x_forwarded_for', ', ']

          else_filter:

      • type: useragent regexes: /etc/gogstash/regexes.yaml source: 'nginx.access.agent' target: 'nginx.access.user_agent'
      • type: geoip2 db_path: /etc/gogstash/GeoLite2-City.mmdb ip_field: 'nginx.access.remote_ip' key: 'nginx.access.geoip' quiet: true skip_private: true flat_format: true
  • type: cond condition: "[fileset.module] == 'nginx' && [fileset.name] == 'error'" filter:

    • type: grok match:
      • "(?m)%{DATA:nginx.error.time} \[%{DATA:nginx.error.level}\] %{NUMBER:nginx.error.pid}#%{NUMBER:nginx.error.tid}: (\*%{NUMBER:nginx.error.connection_id} )?%{GREEDYDATA:nginx.error.message}"
    • type: add_field key: 'read_timestamp' value: '%{@timestamp}'
    • type: cond condition: "!('gogstash_filter_grok_error' IN map([tags]))" filter:
      • type: remove_field remove_message: true
      • type: cond condition: "!empty([beat.timezone])" filter:
        • type: add_field key: 'nginx.error.time' value: '%{nginx.error.time} %{beat.timezone}'
      • type: date format: ["2006/01/02 15:04:05 -07:00", "2006/01/02 15:04:05"] source: "nginx.error.time"
      • type: cond condition: "!('gogstash_filter_date_error' IN map([tags]))" filter:
        • type: remove_field fields: ['nginx.error.time']
  • type: cond condition: "[fileset.module] == 'system' && [fileset.name] == 'auth'" filter:

    • type: grok patterns: GREEDYMULTILINE: "(.|\n)*" match:
      • "%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} sshd(?:\[%{POSINT:system.auth.pid}\])?: %{DATA:system.auth.ssh.event} %{DATA:system.auth.ssh.method} for (invalid user )?%{DATA:system.auth.user} from %{IPORHOST:system.auth.ssh.ip} port %{NUMBER:system.auth.ssh.port} ssh2(: %{GREEDYDATA:system.auth.ssh.signature})?"
      • "%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} sshd(?:\[%{POSINT:system.auth.pid}\])?: %{DATA:system.auth.ssh.event} user %{DATA:system.auth.user} from %{IPORHOST:system.auth.ssh.ip}"
      • "%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} sshd(?:\[%{POSINT:system.auth.pid}\])?: Did not receive identification string from %{IPORHOST:system.auth.ssh[dropped_ip]}"
      • "%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} sudo(?:\[%{POSINT:system.auth.pid}\])?: \s*%{DATA:system.auth.user} :( %{DATA:system.auth.sudo.error} ;)? TTY=%{DATA:system.auth.sudo.tty} ; PWD=%{DATA:system.auth.sudo.pwd} ; USER=%{DATA:system.auth.sudo.user} ; COMMAND=%{GREEDYDATA:system.auth.sudo.command}"
      • "%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} groupadd(?:\[%{POSINT:system.auth.pid}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}"
      • "%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} useradd(?:\[%{POSINT:system.auth.pid}\])?: new user: name=%{DATA:system.auth.user.add.name}, UID=%{NUMBER:system.auth.user.add.uid}, GID=%{NUMBER:system.auth.user.add.gid}, home=%{DATA:system.auth.user.add.home}, shell=%{DATA:system.auth.user.add.shell}$"
      • "%{SYSLOGTIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:system.auth.hostname} %{DATA:system.auth.program}(?:\[%{POSINT:system.auth.pid}\])?: %{GREEDYMULTILINE:system.auth.message}"
    • type: cond condition: "!('gogstash_filter_grok_error' IN map([tags]))" filter:
      • type: remove_field remove_message: true
      • type: cond condition: "!empty([beat.timezone])" filter:
        • type: add_field key: 'system.auth.time' value: '%{+@2006} %{system.auth.timestamp} %{beat.timezone}' else_filter: filter:
        • type: add_field key: 'system.auth.time' value: '%{+@2006} %{system.auth.timestamp} %{beat.timezone}' else_filter:
        • type: add_field key: 'system.auth.time' value: '%{+@2006} %{system.auth.timestamp}'
      • type: date format: ["2006 Jan 02 15:04:05 -07:00", "2006 Jan 2 15:04:05 -07:00", "2006 Jan 02 15:04:05", "2006 Jan 2 15:04:05"] source: "system.auth.time"
      • type: cond condition: "!('gogstash_filter_date_error' IN map([tags]))" filter:
        • type: remove_field fields: ['system.auth.time']
      • type: geoip2 db_path: /etc/gogstash/GeoLite2-City.mmdb ip_field: 'system.auth.ssh.ip' key: 'system.auth.ssh.geoip' quiet: true skip_private: true flat_format: true cache_size: 100
  • type: cond condition: "[fileset.module] == 'system' && [fileset.name] == 'syslog'" filter:

    • type: grok patterns: GREEDYMULTILINE: "(.|\n)*" match:
      • "%{SYSLOGTIMESTAMP:system.syslog.timestamp} %{SYSLOGHOST:system.syslog.hostname} %{DATA:system.syslog.program}(?:\[%{POSINT:system.syslog.pid}\])?: %{GREEDYMULTILINE:system.syslog.message}"
    • type: cond condition: "!('gogstash_filter_grok_error' IN map([tags]))" filter:
      • type: remove_field remove_message: true
      • type: cond condition: "!empty([beat.timezone])" filter:
        • type: add_field key: 'system.syslog.time'output:

          - type: stdout

  • type: cond condition: "[fileset.module] == 'nginx'" output:

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/tsaikd/gogstash/issues/80#issuecomment-476443683, or mute the thread https://github.com/notifications/unsubscribe-auth/ABRX2EHiamH_WaveB7ECL-aCJK6J_Kolks5vaX5qgaJpZM4cJtU- .