Open gsy0911 opened 2 years ago
@dataclass(frozen=True)
class WafLogHttpRequest:
args: dict
client_ip: str
country: str
headers: list
http_method: str
http_version: str
request_id: str
uri: str
def of(log: dict):
return WafLogHttpRequest(
args = log['args'],
client_ip = log['clientIp'],
country = log['country'],
headers = log['headers'],
http_method = log['httpMethod'],
http_version = log['httpVersion'],
request_id = log['requestId'],
uri = log['uri']
)
# def dumps(self):
# return {
# "args": self.args,
# "client_ip": self.client_ip,
# "country": self.country,
# "headers": self.headers,
# "http_method": self.http_method,
# "http_version": self.http_version,
# "uri": self.uri
# }
@dataclass(frozen=True)
class WafAlbLog:
"""
See Also: https://docs.aws.amazon.com/waf/latest/developerguide/logging.html
"""
action: str
format_version: str
http_request: WafLogHttpRequest
timestamp: str
# datetime: str
http_source_id: str
http_source_name: str
non_terminating_matching_rules: list
request_headers_inserted: str
rate_based_rule_list: list
response_code_sent: str
rule_group_list: str
terminating_rule_id: str
terminating_rule_match_details: str
terminating_rule_type: str
web_acl_id: str
@staticmethod
def of(log: dict):
return WafAlbLog(
action = log['action'],
format_version = log['formatVersion'],
http_request = WafLogHttpRequest.of(log=log['httpRequest']),
timestamp = log['timestamp'],
# datetime = datetime.fromtimestamp( timestamp / 1000).strftime("%Y-%m-%d %H:%M:%S")
# is_attack_log = attack_check()
http_source_id = log['httpSourceId'],
http_source_name = log['httpSourceName'],
non_terminating_matching_rules = log['nonTerminatingMatchingRules'],
rate_based_rule_list = log['rateBasedRuleList'],
request_headers_inserted = log['requestHeadersInserted'],
response_code_sent = log['responseCodeSent'],
rule_group_list = log['ruleGroupList'],
terminating_rule_id = log['terminatingRuleId'],
terminating_rule_match_details = log['terminatingRuleMatchDetails'],
terminating_rule_type = log['terminatingRuleType'],
web_acl_id = log['webaclId'],
)
@staticmethod
def from_gzip(file_name) -> list:
log_list = []
with fs.open(file_name, "r") as f:
for line in f.readlines():
log_list.append(WafAlbLog.of(json.loads(line)))
return log_list
import s3fs
import gzip
from dataclasses import dataclass
import shlex
import json
from datetime import datetime
fs = s3fs.S3FileSystem(anon=False)