Open secpool2000 opened 6 months ago
我没有这个的使用需求,如果你用的比较多,欢迎提交一个 PR 来支持
外企用的slack+邮箱,需求+1
推送slack解决方案 python转发 docker-compose配置
version: '3'
services:
watchvuln:
image: zemal/watchvuln:latest
container_name: watchvuln
restart: always
volumes:
- /opt/watchvuln/config:/config
environment:
- WEBHOOK_URL=http://10.0.0.68:5000/webhook
- INTERVAL=30m
- WHITELIST_FILE=/config/whitelist.txt
slack webhook 后台运行 nohup python3 webhook_forwarder.py &
from flask import Flask, request, jsonify
import requests
import json
import logging
app = Flask(__name__)
# 设置日志记录
logging.basicConfig(level=logging.INFO)
# Slack Webhook URL
SLACK_WEBHOOK_URL = 'https://hooks.slack.com/services/xxxx' # 你的 Slack Webhook URL
def send_to_slack(message):
"""发送消息到Slack"""
response = requests.post(SLACK_WEBHOOK_URL, data=json.dumps(message), headers={'Content-Type': 'application/json'})
return response
@app.route('/webhook', methods=['POST'])
def receive_webhook():
# 获取请求体的数据
data = request.json
# 如果没有数据,返回错误
if not data:
return jsonify({'error': 'No data received'}), 400
# 打印接收到的数据,便于后续排查
logging.info(f"Received webhook data: {data}")
# 解析 Webhook 类型
webhook_type = data.get("type", "")
content = data.get("content", {})
# 根据不同的类型,构建不同的 Slack 消息格式
if webhook_type == "watchvuln-initial":
message = format_initial_message(content)
elif webhook_type == "watchvuln-text":
message = format_text_message(content)
elif webhook_type == "watchvuln-vulninfo":
message = format_vulninfo_message(content)
else:
return jsonify({'error': 'Unknown webhook type'}), 400
# 发送消息到 Slack
response = send_to_slack(message)
if response.status_code != 200:
return jsonify({'error': 'Failed to send message to Slack', 'response': response.text}), 500
return jsonify({'status': 'success'}), 200
def format_initial_message(content):
"""格式化 watchvuln-initial 类型的消息"""
version = content.get("version", "N/A")
vuln_count = content.get("vuln_count", 0)
interval = content.get("interval", "N/A")
providers = content.get("provider", [])
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Vulnerability Watch Initial Report*\nVersion: `{version}`\nTotal Vulnerabilities: `{vuln_count}`\nCheck Interval: `{interval}`"
}
},
{"type": "divider"}
]
for provider in providers:
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{provider['display_name']}*\nLink: <{provider['link']}|{provider['name']}>"
}
})
return {"blocks": blocks}
def format_text_message(content):
"""格式化 watchvuln-text 类型的消息"""
message_text = content.get("message", "No message provided")
return {
"text": f"*WatchVuln Notification*\n{message_text}"
}
def format_vulninfo_message(content):
"""格式化 watchvuln-vulninfo 类型的消息"""
unique_key = content.get("unique_key", "N/A")
title = content.get("title", "No title provided")
description = content.get("description", "No description provided")
severity = content.get("severity", "N/A")
cve = content.get("cve", "N/A")
disclosure = content.get("disclosure", "N/A")
references = content.get("references", [])
tags = content.get("tags", []) or [] # 确保 tags 是列表,避免 None
from_source = content.get("from", "")
reason = content.get("reason", [])
# 检查 references 是否为空或 None
if not references:
references_text = "No references provided"
else:
references_text = "\n".join([f"<{ref}>" for ref in references])
# 构建消息块
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Vulnerability Alert*\n*Title*: {title}\n*Severity*: {severity}\n*CVE*: {cve}\n*Disclosure Date*: {disclosure}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Description*: {description}"
}
},
{"type": "divider"},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*References*:\n{references_text}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Tags*: {', '.join(tags)}\n*Source*: <{from_source}>"
}
}
]
return {"blocks": blocks}
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
我没有这个的使用需求,如果你用的比较多,欢迎提交一个 PR 来支持