Open xuanyuanaosheng opened 1 week ago
# Global configuration
logging.level: debug
max_procs: 4
fields:
env: test
# Filebeat input
filebeat.inputs:
- type: log
tail_files: true
paths:
- /var/log/containers/*_skywalking_*.log
- /var/log/containers/*_istio-system_*.log
fields:
platform: monitor
type: application
- type: log
tail_files: true
paths:
- /data/logs/**/applicationlog/*.log
- /data/logs/**/applicationlog/*/*.*.*.log
fields:
platform: docker
type: application
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
multiline.negate: true
multiline.match: after
- type: log
tail_files: true
paths:
- /data/logs/**/applicationlog/errorMonitor.json
fields:
platform: docker
type: error-json
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
multiline.negate: true
multiline.match: after
- type: log
tail_files: true
paths:
- /data/logs/**/applicationlog/*.error
fields:
platform: docker
type: application-error
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
multiline.negate: true
multiline.match: after
- type: log
tail_files: true
json.keys_under_root: true
json.overwrite_keys: true
paths:
- /data/logs/kube-audit/*.json
fields:
platform: audit
type: audit
- type: log
tail_files: true
paths:
- /data/logs/**/tomcatlog/*.out
fields:
platform: docker
type: tomcat
multiline.pattern: '^[0-9]{2}-[a-zA-Z]+-[0-9]{4}|^[0-9]{4}-[0-9]{2}-[0-9]{2}|^\S'
multiline.negate: true
multiline.match: after
- type: log
tail_files: true
paths:
- /data/logs/traefik/access.log
fields:
platform: traefik
type: ingress
- type: log
tail_files: true
paths:
- /data/logs/traefik/traefik.log
fields:
platform: traefik
type: traefik
- type: log
tail_files: true
paths:
- /data/logs/kong/**/*.log
- /data/logs/kong/**/*.ERROR
- /data/logs/kong/**/*.INFO
- /data/logs/kong/**/*.WARNING
fields:
platform: kong
type: ingress-kong
symlinks: true
- type: log
tail_files: true
paths:
- /data/logs/**/wrapperlog/*.log
fields:
platform: docker
type: wrapper
multiline.pattern: '^[0-9]{4}\/[0-9]{2}\/[0-9]{2}'
multiline.negate: true
multiline.match: after
- type: log
tail_files: true
paths:
- /data/logs/kubernetes/*.INFO
- /data/logs/kubernetes/*.WARNING
- /data/logs/kubernetes/*.ERROR
- /data/logs/kubernetes/*.FATAL
fields:
platform: docker
type: k8s
multiline.pattern: '\\$'
multiline.negate: false
multiline.match: before
processors:
- dissect:
tokenizer: "/%{?data}/%{?logs}/%{project}/%{?applogs}/%{?app}/%{?appname}/%{?log}/"
field: "source"
target_prefix: "namespaces"
- script:
lang: javascript
source: >
function process(event) {
var msg = event.Get("message");
msg = msg.replace(/[\u4e00-\u9fa5]{2,5}/g, "XXXXXXXXXXXXXXXX");
msg = msg.replace(/\d{11}/g, "XXXXXXXXXXXXXX");
event.Put("message",msg);
}
tag_on_exception: true
# Filebeat Output
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["kafka01:9092","kafka02:9092","kafka03:9092"]
# message topic selection + partitioning
topics:
- topic: "bruce-uat"
when.contains:
fields.platform: "docker"
partition.round_robin:
reachable_only: false
required_acks: 1
compression: snappy
max_message_bytes: 1000000
logstash
的配置,以 pipeline.yml
为主,其中每个pipeline处理一组日志流,在每个pipeline里面又有input.conf
,filter.conf
,output.conf
# cat pipelines.yml
----------------------------------------------------------------------------
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
# https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
#- pipeline.id: main
# path.config: "/etc/logstash/conf.d/*.conf"
# pipeline.workers: 30
# pipeline.batch.size: 400
- pipeline.id: baidu
path.config: "/etc/logstash/baidu.d/*.conf"
pipeline.workers: 200
pipeline.batch.size: 400
- pipeline.id: baidu-bapp
path.config: "/etc/logstash/baidu-bapp.d/*.conf"
pipeline.workers: 80
pipeline.batch.size: 400
- pipeline.id: baidu-tea
path.config: "/etc/logstash/baidu-tea.d/*.conf"
pipeline.workers: 15
pipeline.batch.size: 400
- pipeline.id: k8s
path.config: "/etc/logstash/k8s.d/*.conf"
pipeline.workers: 30
pipeline.batch.size: 400
# cd /etc/logstash/k8s.d/
# cat 10_input.conf
-----------------------------------------------------------------------------------------
input {
kafka {
bootstrap_servers => "kafka01:9092,kafka02:9092,kafka03:9092"
topics_pattern => ["k8s-.*"]
group_id => "logstash_bj"
client_id => "logstash_bj"
codec => "json"
consumer_threads => 1
decorate_events => true
auto_offset_reset => "earliest"
max_poll_interval_ms => "600000"
max_poll_records => "500"
session_timeout_ms => "30000"
}
}
# cat 20_filter_grok.conf
------------------------------------------------------------------------------------------------------------------------------------------
filter {
if ![json] {
if [kubernetes][namespace_name] == "ba-ctbm" and [kubernetes][container_name] == "db" and [kubernetes][labels][app.baidu/name] == "cockrochdb" {
grok {
match => { "log" => "(?<level>[A-Z])[0-9]{6} %{TIME} %{POSINT} %{GREEDYDATA:message}"}
}
}
if [kubernetes][namespace_name] == "ba-ctbm" and [kubernetes][container_name] == "cp-kafka-connect-server" and [kubernetes][labels][app.baidu/name] == "cp-kafka-connect" {
grok {
match => { "log" => "\[%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}\] %{LOGLEVEL:level}%{GREEDYDATA:message}"}
}
}
if [kubernetes][namespace_name] == "ba-ctbm" and [kubernetes][container_name] == "postgresql" and [kubernetes][labels][app.baidu/name] == "postgresql-ha" {
grok {
match => { "log" => "%{DATE} %{TIME}.[0-9]{3} GMT \[%{POSINT}\] (?<level>[A-Z]+): %{GREEDYDATA:message}"}
}
}
if [kubernetes][namespace_name] == "ba-ctbm" and [kubernetes][container_name] == "redis" and [kubernetes][labels][app.baidu/name] == "redis" {
grok {
match => { "log" => "%{POSINT}:(?<role>[A-Z]) %{MONTHDAY} %{MONTH} %{YEAR} %{HOUR}:%{MINUTE}:%{SECOND} (?<level>[*#.-]) %{GREEDYDATA:message}"}
}
}
if [kubernetes][namespace_name] == "ba-ctbm" and [kubernetes][container_name] == "sentinel" and [kubernetes][labels][app.baidu/name] == "redis" {
grok {
match => { "log" => "%{POSINT}:(?<role>[A-Z]) %{MONTHDAY} %{MONTH} %{YEAR} %{HOUR}:%{MINUTE}:%{SECOND} (?<level>[*#.-]) %{GREEDYDATA:message}"}
}
}
if [kubernetes][namespace_name] == "bj-observability" and [kubernetes][container_name] == "couchdb" and [kubernetes][labels][app.baidu/name] == "couchdb" {
grok {
match => { "log" => "couchdb %{TIME} %{LOGLEVEL:level} ==> %{GREEDYDATA:message}"}
}
}
}
}
# cat 25_filter_common.conf
---------------------------------------------------------------------------------------
filter {
# fixup of wrong labels / inconsistent naming
mutate {
rename => ["[kubernetes][labels][app]","[kubernetes][labels][application]"]
rename => ["[json][msg]","[message]"]
rename => ["[json][lvl]","[level]"]
rename => ["[json][message]","[message]"]
rename => ["[json][level]","[level]"]
uppercase => [ "[level]" ]
lowercase => [ "[kubernetes][labels][app.baidu/part-of]" ]
remove_field => ["[kubernetes][docker_id]", "[kubernetes][labels][logging.baidu.net/storage]",
"[kubernetes][labels][statefulset.kubernetes.io/pod-name]","[stream]","[kubernetes][container_hash]","[kubernetes][annotations][timestamp]","[kubernetes][annotations]" ]
}
mutate {
gsub => [ "[json][timestamp]", " ", "T" ]
}
translate {
field => "[level]"
destination => "[level]"
override => true
dictionary => {
"10" => "TRACE"
"20" => "DEBUG"
"30" => "INFO"
"40" => "WARN"
"50" => "ERROR"
"60" => "FATAL"
}
}
if [time] {
date {
match => [ "[time]","ISO8601" ]
}
} else {
date {
match => [ "[@metadata][kafka][timestamp]","ISO8601" ]
}
}
# routing
if [kubernetes][labels][app.baidu/log-index] {
mutate {
add_field => { "[@metadata][index]" => "k8s-%{[kubernetes][labels][app.baidu/log-index]}" }
}
} else if [kubernetes][labels][app.baidu/part-of] {
mutate {
add_field => { "[@metadata][index]" => "k8s-%{[kubernetes][labels][app.baidu/part-of]}" }
}
} else {
mutate {
add_field => { "[@metadata][index]" => "%{[@metadata][kafka][topic]}" }
}
}
if [level] in ["DEBUG","TRACE"] {
mutate {
replace => { "[@metadata][index]" => "%{[@metadata][index]}-debug" }
}
}
# workaround for possibly old data trying to write to read only indices
ruby {
code => 'event.set("[@metadata][now]", Time.now.strftime("%Y.%m.%d"))'
}
}
另一个例子:
[filter.conf.txt](https://github.com/user-attachments/files/17539241/filter.conf.txt)
# cat output.conf
----------------------------------------------------------------------------
output {
elasticsearch {
hosts => ["http://10.26.8.128:9200","http://10.26.8.131:9200","http://10.26.8.132:9200","http://10.26.8.147:9200"]
index => "%{[@metadata][target_index]}"
user => "elastic"
password => "XXXXXXXXXXXXXXXXX"
}
}
cluster.name: bruce
node.name: node5
node.master: true
node.data: true
network.host: 10.26.8.132
http.port: 9200
path.data: /esdata
path.logs: /opt/es_logs
path.repo: /esdata/backup
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
cluster.max_shards_per_node: 900000
indices.query.bool.max_clause_count: 10240
indices.fielddata.cache.size: 40%
indices.memory.index_buffer_size: 40%
http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type
transport.tcp.port: 9300
cluster.initial_master_nodes: ["10.26.8.132:9300","10.26.8.130:9300","10.26.8.131:9300","10.26.8.129:9300","10.26.8.133:9300"]
discovery.seed_hosts: ["10.26.8.132:9300","10.26.8.130:9300","10.26.8.131:9300","10.26.8.129:9300","10.26.8.133:9300"]
xpack.ml.enabled: false
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: /opt/es/config/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: /opt/es/config/elastic-certificates.p12
discovery.zen.fd.ping_timeout: 60s
discovery.zen.fd.ping_interval: 10s
thread_pool.search.size: 64
thread_pool.search.queue_size: 1000
thread_pool.get.size: 32
thread_pool.get.queue_size: 1000
thread_pool.write.size: 32
thread_pool.write.queue_size: 10000
filebeat.inputs:
- type: log
enabled: true
paths:
- /opt/hsis*/log/*.json.log
fields:
type: "hsos"
ignore_older: 2h
tags: ["json"]
exclude_files: ['.gz$']
processors:
- drop_fields:
fields: ["ecs", "input.type", "log.offset", "version","agent"]
output.kafka:
hosts: ["elkkafka01:9092","elkkafka02:9092","elkkafka03:9092"]
topic: "logstash-%{[fields.type]}"
username: filebeat
password: ReOwchIg
timeout: 5
required_acks: 1
compression: gzip
max_message_bytes: 1000000
input {
kafka {
bootstrap_servers => "elkkafka01:9092,elkkafka02:9092,elkkafka03:9092"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "PLAIN"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='logstash' password='XXX';"
topics_pattern => ["logstash-.*"]
group_id => "logstash_uat"
client_id => "logstash_uat"
codec => "json"
consumer_threads => 1
decorate_events => true
auto_offset_reset => "earliest"
}
}
filter {
if [fields][type] {
mutate {
add_field => { "type" => "%{[fields][type]}" }
add_field => { "path" => "%{[log][file][path]}" }
}
mutate {
remove_field => [ "[fields][type]","[log][file][path]" ]
}
}
if "json" in [tags] {
json {
source => "message"
}
}
}
output {
elasticsearch {
hosts => [ "localhost" ]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
}
}
日志系统在IT公司非常常见。
日志处理数据流
搭建
参考资料