woshikid / blog

Apache License 2.0
8 stars 1 forks source link

Logstash学习笔记 #44

Open woshikid opened 6 years ago

woshikid commented 6 years ago

安装

curl -L -O https://artifacts.elastic.co/downloads/logstash/logstash-7.4.2.tar.gz
tar xzvf logstash-7.4.2.tar.gz
cd logstash-7.4.2/

启动

Windows需要双引号

本地测试

bin/logstash -e 'input { stdin {} } output { stdout {} }'
bin/logstash -e 'input { stdin {} } output { stdout { codec => rubydebug } }'

输出到ES

bin/logstash -e 'input { stdin {} } output { elasticsearch { hosts => localhost } }'

多重输出

bin/logstash -e 'input { stdin {} } output { elasticsearch { hosts => localhost } stdout {} }'

使用配置文件启动

bin/logstash -f config/logstash.conf
bin/logstash -f config/logstash.conf --config.test_and_exit
bin/logstash -f config/logstash.conf --config.reload.automatic

配置文件

条件判断支持:if, else if, else, ==, !=, <, >, <=, >=, =~, !~, in, not in, and, or, nand, xor, !, ()

logstash-filebeat.conf

input {
  beats {
    port => "5044"
  }
}

filter {
  mutate {
    remove_field => ["tags", "ecs"]
  }
}

output {
  if [@metadata][pipeline] {
    elasticsearch {
      hosts => "127.0.0.1:9200"
      pipeline => "%{[@metadata][pipeline]}"
#     index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
#     user => "elastic"
#     password => "elastic"
#     action => "create" # data stream
    }
  } else {
    redis {
      host => '192.168.1.104'
      data_type => 'list'
      key => 'logstash:redis'
    }
  }
}

logstash-apache.conf 文件路径必须为绝对路径 使用默认index => "logstash-%{+YYYY.MM.dd}"将会触发默认ILM而生成logstash-YYYY.MM.dd-000001格式的index,默认策略为达到50GB或30天后发生rollover 使用ilm_rollover_alias将覆盖index设置

input {
  file {
    path => [ "/tmp/*_log" ]
    start_position => beginning
#   type => "apache"
#   codec => multiline {
#     pattern => "^%{TIMESTAMP_ISO8601}"
#     negate => true
#     what => "previous"
#   }
  }
}

filter {
  if [path] =~ "access" {
    grok {
#     patterns_dir => ["./patterns"]
#     pattern_definitions => {"TOMCAT_TIME" => "%{MONTHDAY}-%{MONTH}-%{YEAR} %{HOUR}:%{MINUTE}:%{SECOND}"}
      keep_empty_captures => true
      match => { "message" => ["%{HTTPD_COMBINEDLOG}", "%{CATALINALOG}", "%{TOMCATLOG}"] }
#     match => { "message" => '%{IP:clientip}\s+%{TIMESTAMP_ISO8601:timestamp} %{WORD:verb} %{URIPATHPARAM:path} %{NUMBER:bytes} %{NOTSPACE:data} "(?<agent>[^"]*)"'}
#     overwrite => [ "message" ]
    }
    mutate {
      convert => {
        "bytes" => "integer"
        "response" => "integer"
      }
      gsub => ["referrer", "\"", ""]
      gsub => ["agent", "\"", ""]
      replace => { "type" => "apache_access" }
    }
    date {
      match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
    geoip {
      source => "clientip"
    }
#   ruby {
#     code => "event.set('interceptors', event.get('interceptors').split(/\n/))"
#   }
  }
}

output {
  stdout { codec => dots }
  if ! [tags] {
    elasticsearch {
      hosts => ["127.0.0.1:9200"]
      index => "apache-%{+YYYY.MM.dd}"
#     ilm_rollover_alias => "logstash"
#     ilm_pattern => "{now/d}-000001"
#     ilm_policy => "logstash-policy"
    }
  }
}

logstash-csv.conf

input {
  file {
    path => ["/tmp/*.csv"]
    start_position => beginning
  }
}

filter {
  csv {
    separator => ","
    columns => ["id", "name", "age"]
  }
  mutate {
    convert => { "age" => "integer" }
  }
}

output {
  stdout { codec => rubydebug }
  file {
    path => "/path/to/target/file.json"
  }
}

logstash-gelf.conf

input {
  gelf {
    port => 12201
    #use_tcp => true # 同时使用tcp与udp
  }
}

output {
  stdout {}
}

grok正则表达式

https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

文件位置记录

logstash会记录文件读取位置

logstash-x.x.x/data/plugins/inputs/file/.sincedb_xxxx

删除文件即可重新读取日志文件

使用最新的geoip

wget http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz

geoip {
  source => "ip"
  target => "ip_location"
  database => "/path/to/GeoLite2-City.mmdb"
# fields => [ "ip", "city_name", "region_name", "country_name", "location" ]
}

排除内网地址

if "_grokparsefailure" not in [tags] {
  if [ip] !~ "^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\." {
    geoip {
      source => "ip"
    }
  }
}

解析userAgent

if [agent] != "-" {
  useragent {
    source => "agent"
#   target => "ua"
  }
}

解析日期

注意日期day之前有时是一个空格有时是两个空格 默认target=@timestamp

filter {
  date {
    locale => "en_US"
    match => [ "logdate", "MMM dd yyyy HH:mm:ss", "MMM  d yyyy HH:mm:ss", "ISO8601" ]
    target => "@timestamp"
    add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
  }
}

日期格式

ISO8601
2011-04-19T03:44:01.103Z

UNIX
1326149001
1326149001.132

UNIX_MS
1366125117000

year
yyyy: 2015
yy: 15

month of the year
M: 1
MM: 01
MMM: Jan
MMMM: January

day of the month
d: 1
dd: 01

hour of the day (24H) 
H: 0
HH: 00

minutes of the hour
m: 0
mm: 00

seconds of the minute
s: 0
ss: 0

fraction of a second
S: 0
SS: 00
SSS: 000

time zone offset or identity
Z: -0700
ZZ: -07:00
ZZZ: America/Los_Angeles

time zone names
z

week of the year
w: 1
ww: 01

day of the year 
D: 1

day of the week
e: 1

day of the week
E, EE, EEE: Mon
EEEE: Monday

"2015-01-01T01:12:23"
"yyyy-MM-dd'T'HH:mm:ss"
JeffDeans commented 3 years ago

test