Open Hyhyhyhyhyhyh opened 4 years ago
create table nginx_top_uri(
id int auto_increment primary key,
datatime timestamp not null default current_timestamp,
uri text not null,
cnt int not null
);
create table nginx_top_ip(
id int auto_increment primary key,
datatime timestamp not null default current_timestamp,
ipaddr varchar(16) not null,
cnt int not null,
ip_from varchar(200)
);
import datetime
import json
import pandas as pd
import requests
from sqlalchemy import create_engine
def db_connection(database):
host = '127.0.0.1'
user = ''
passwd = ''
port = 3306
charset = 'utf8mb4'
engine = create_engine(
f'mysql+mysqldb://{user}:{passwd}@{host}:{port}/{database}?charset={charset}',
echo=True, # 打印sql语句
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1, # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# Session = scoped_session(sessionmaker(bind=engine, expire_on_commit=False))
return engine
def analyze_nginx_log(nginx_log, begin_date, end_date):
'''Nginx容器日志分析
分析top10 访问ip、top10 URI
接受的参数:
nginx_log: nginx日志路径
begin_date: 开始日期
end_date: 结束日期
'''
# 读取日志
data = []
with open(nginx_log, encoding='utf8') as f:
for line in f.readlines():
data.append(json.loads(line))
df = pd.DataFrame({'log': [], 'stream': [], 'time': []})
df = df.append(data, ignore_index=True)
# 分析状态正常的日志内容
df_out = df[df.stream == 'stdout']
df_out['time'] = pd.to_datetime(df_out['time'])
begin_date = pd.to_datetime(begin_date, utc=True)
end_date = pd.to_datetime(end_date, utc=True)
df_out = df_out[(df_out.time >= begin_date) & (df_out.time <= end_date)]
## 日志信息结构化
def process_log(log, info):
if info == 'ipaddr': # 客户端地址
return log.split(' | ')[0]
elif info == 'request_time': # 服务器时间
return log.split(' | ')[2]
elif info == 'request_info': # 请求信息
return log.split(' | ')[3]
elif info == 'http_host': # 请求地址
return log.split(' | ')[4]
elif info == 'uri': # URI,如果请求的是根uri则不做处理,否则去除uri最后的斜杠
uri = log.split(' | ')[5]
if len(uri) == 1:
return uri
else:
return uri.rstrip('/')
elif info == 'status': # 状态码
return log.split(' | ')[6]
elif info == 'http_referer': # 跳转来源
return log.split(' | ')[8]
elif info == 'browser': # 浏览器
return log.split(' | ')[9]
elif info == 'response_time': # 服务端响应时长
return log.split(' | ')[11].replace(' \n', '')
df_out['ipaddr'] = df_out['log'].apply(process_log, args=('ipaddr',))
df_out['request_time'] = df_out['log'].apply(process_log, args=('request_time',))
df_out['request_info'] = df_out['log'].apply(process_log, args=('request_info',))
df_out['uri'] = df_out['log'].apply(process_log, args=('uri',))
df_out['status'] = df_out['log'].apply(process_log, args=('status',))
df_out['http_referer'] = df_out['log'].apply(process_log, args=('http_referer',))
df_out['browser'] = df_out['log'].apply(process_log, args=('browser',))
df_out['response_time'] = df_out['log'].apply(process_log, args=('response_time',))
df_out.drop(['log', 'stream'], axis=1, inplace=True)
# 去除js、css等请求
df_out = df_out[df_out['uri'].str.contains('js|css|png|svg|woff') == False]
# 统计访问最多的uri
top_uri = df_out.groupby('uri').count()['time'].sort_values(ascending=False).head(10)
top_uri = pd.DataFrame(top_uri).reset_index()
top_uri.rename(columns={'time':'cnt'}, inplace=True)
# 分析访问量最多的ip:按照客户端IP、服务端时间去重合并单个请求
top_ip = df_out[['ipaddr','request_time']]
top_ip.drop_duplicates(inplace=True)
top_ip = top_ip.groupby('ipaddr').count().sort_values(by='request_time', ascending=False).head(10)
top_ip = pd.DataFrame(top_ip).reset_index()
top_ip.rename(columns={'request_time':'cnt'}, inplace=True)
## 获取ip来源的物理位置
headers = {
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9,zh-HK;q=0.8,zh-TW;q=0.7',
'Connection': 'keep-alive',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36'
}
def get_ip_info(ip):
url = 'https://sp0.baidu.com/8aQDcjqpAAV3otqbppnN2DJv/api.php?query=' + ip + '&co=&resource_id=6006&t=1578399124011&ie=utf8&oe=gbk&cb=op_aladdin_callback&format=json&tn=baidu&cb=jQuery110205771719877166044_1578399103824&_=1578399103832'
res = requests.get(url, headers=headers)
return str(res.text).split('data')[1].split('titlecont')[0].split('"')[4]
top_ip['ip_from'] = top_ip['ipaddr'].apply(get_ip_info)
# 将分析结果保存到数据库中
try:
engine = db_connection('analysis_db')
top_uri.to_sql('nginx_top_uri', con=engine, if_exists='append', index=False)
top_ip.to_sql('nginx_top_ip', con=engine, if_exists='append', index=False)
except Exception as e:
return e
finally:
engine.dispose()
v1.0
说明
分析结果示例
Python代码