imweijh / dailywork

1 stars 0 forks source link

Logstash aggregate filter 处理多行日志,不连续,乱序 #6

Open imweijh opened 2 years ago

imweijh commented 2 years ago

原始日志

{
    "@timestamp":"2022-03-29T04:00:24.177Z",
    "message": "original_event_3",
    "textPayload": "# Query_time: 10.183602  Lock_time: 0.000386 Rows_sent: 823492  Rows_examined: 825476",
    "tags": ["cloudsql-slow-log"],
    "receiveTimestamp": "2022-03-29T04:00:36.305655815Z",
    "cloud.project.id": "ABC"
}
{
    "@timestamp":"2022-03-29T04:00:30.177Z",
    "message": "original_event_1",
    "textPayload": "# Time: 2022-03-29T08:37:50.228345Z"
    "tags": ["cloudsql-slow-log"]
    "receiveTimestamp": "2022-03-29T04:00:36.305655815Z"
    "cloud.project.id": "ABC"
}
{
    "@timestamp":"2022-03-29T04:00:40.177Z",
    "message": "original_event_6",
    "textPayload": "FROM reporting.v_v3_profile_identities"
    "tags": ["cloudsql-slow-log"]
    "receiveTimestamp": "2022-03-29T04:00:36.305655815Z"
    "cloud.project.id": "ABC"
}
{
    "@timestamp":"2022-03-29T04:00:55.177Z",
    "message": "original_event_4",
    "textPayload": "SET timestamp=1648543070;"
    "tags": ["cloudsql-slow-log"]
    "receiveTimestamp": "2022-03-29T04:00:36.305655815Z"
    "cloud.project.id": "ABC"
}

期望logstash处理后,得到以下日志

{
    "@timestamp":"2022-03-29T04:00:30.177Z",
    "textPayload": "# Time: 2022-03-29T08:37:50.228345Z
                    # User@Host: mule_transaction_report[mule_transaction_report] @  [194.255.15.66]  thread_id: 666318  server_id: 1912109277
                    # Query_time: 10.183602  Lock_time: 0.000386 Rows_sent: 823492  Rows_examined: 825476
                    SET timestamp=1648543070;
                    SELECT identifier as GCNUMBER
                    FROM reporting.v_v3_profile_identities 
                    where handle ='card_pos' 
                    and identifier >= '80029914'
                    and identifier <= '83539938'
                    order by identifier asc;"
    "tags": ["cloudsql-slow-log"]
    "receiveTimestamp": "2022-03-29T04:00:36.305655815Z"
    "cloud.project.id": "ABC"
}

Badger 非常牛逼的给了一个解决方案:

aggregate {
        task_id => "%{receiveTimestamp}"
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "receiveTimestamp"
        timeout => 3
        code => '
            map["textPayload"] ||= []
            map["@timestamp"] ||= event.get("@timestamp")
            map["tags"] ||= event.get("tags")
            map["cloud.project.id"] ||= event.get("cloud.project.id")
            i = event.get("message").sub(/\D+/, "").to_i
            map["textPayload"][i-1] = event.get("textPayload")
            event.cancel
        '
        timeout_code => '
            # This code operates on the generated event,
            # not on the map from which it is generated.
            event.set("textPayload", event.get("textPayload").join("\n"))
        '
    }

https://discuss.elastic.co/t/logstash-aggregate-filter-not-working-for-gcp-mysql-slow-query-logs/301103/3