Closed zking2000 closed 1 month ago
# config.py import os from pathlib import Path class Config: # 基础配置 BASE_DIR = Path(__file__).parent MODEL_DIR = BASE_DIR / "models" DATA_DIR = BASE_DIR / "data" LOG_DIR = BASE_DIR / "logs" # 创建必要的目录 for dir_path in [MODEL_DIR, DATA_DIR, LOG_DIR]: dir_path.mkdir(exist_ok=True) # 数据库配置 DB_CONFIG = { 'host': os.getenv('DB_HOST', 'localhost'), 'port': int(os.getenv('DB_PORT', 5432)), 'database': os.getenv('DB_NAME', 'aiops'), 'user': os.getenv('DB_USER', 'aiops_user'), 'password': os.getenv('DB_PASSWORD', 'aiops_pass') } # Redis配置(用于缓存和任务队列) REDIS_CONFIG = { 'host': os.getenv('REDIS_HOST', 'localhost'), 'port': int(os.getenv('REDIS_PORT', 6379)), 'db': int(os.getenv('REDIS_DB', 0)) } # Elasticsearch配置(用于日志存储) ES_CONFIG = { 'hosts': os.getenv('ES_HOSTS', 'http://localhost:9200').split(','), 'index_prefix': 'aiops-' } # 告警配置 ALERT_CONFIG = { 'webhook_url': os.getenv('ALERT_WEBHOOK', ''), 'email_config': { 'smtp_host': os.getenv('SMTP_HOST', 'smtp.gmail.com'), 'smtp_port': int(os.getenv('SMTP_PORT', 587)), 'smtp_user': os.getenv('SMTP_USER', ''), 'smtp_password': os.getenv('SMTP_PASSWORD', '') } } # 模型配置 MODEL_CONFIG = { 'anomaly_detection': { 'train_interval': 3600, # 训练间隔(秒) 'prediction_window': 3600, # 预测窗口(秒) 'contamination': float(os.getenv('ANOMALY_CONTAMINATION', 0.1)) }, 'prediction': { 'horizon': int(os.getenv('PREDICTION_HORIZON', 24)), 'retrain_interval': 86400 # 每天重新训练 } } # database.py from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from config import Config Base = declarative_base() engine = create_engine(f"postgresql://{Config.DB_CONFIG['user']}:{Config.DB_CONFIG['password']}@" f"{Config.DB_CONFIG['host']}:{Config.DB_CONFIG['port']}/{Config.DB_CONFIG['database']}") Session = sessionmaker(bind=engine) class MetricData(Base): __tablename__ = 'metric_data' id = Column(Integer, primary_key=True) timestamp = Column(DateTime, nullable=False) metric_name = Column(String, nullable=False) value = Column(Float, nullable=False) labels = Column(JSON) class Alert(Base): __tablename__ = 'alerts' id = Column(Integer, primary_key=True) timestamp = Column(DateTime, nullable=False) severity = Column(String, nullable=False) metric_name = Column(String, nullable=False) value = Column(Float) message = Column(String) status = Column(String, default='open') # models/anomaly_detection.py import numpy as np from sklearn.ensemble import IsolationForest from sklearn.preprocessing import StandardScaler import joblib import logging from datetime import datetime from typing import Tuple, List logger = logging.getLogger(__name__) class AnomalyDetector: def __init__(self, model_path: str, config: dict): self.model_path = model_path self.config = config self.model = None self.scaler = StandardScaler() self.load_or_create_model() def load_or_create_model(self): try: self.model = joblib.load(self.model_path) logger.info("Loaded existing anomaly detection model") except: logger.info("Creating new anomaly detection model") self.model = IsolationForest( contamination=self.config['contamination'], random_state=42 ) def train(self, data: np.ndarray): try: scaled_data = self.scaler.fit_transform(data) self.model.fit(scaled_data) joblib.dump(self.model, self.model_path) logger.info("Successfully trained and saved anomaly detection model") except Exception as e: logger.error(f"Error training anomaly detection model: {str(e)}") raise def predict(self, data: np.ndarray) -> Tuple[np.ndarray, List[int]]: scaled_data = self.scaler.transform(data) predictions = self.model.predict(scaled_data) anomaly_indices = np.where(predictions == -1)[0] return predictions, anomaly_indices # alerting/alert_manager.py import smtplib from email.mime.text import MIMEText import requests import logging from datetime import datetime from typing import Dict, Any logger = logging.getLogger(__name__) class AlertManager: def __init__(self, config: Dict[str, Any]): self.config = config self.webhook_url = config['webhook_url'] self.email_config = config['email_config'] def send_alert(self, alert_data: Dict[str, Any]): try: self._send_webhook_alert(alert_data) self._send_email_alert(alert_data) logger.info(f"Alert sent successfully: {alert_data['message']}") except Exception as e: logger.error(f"Failed to send alert: {str(e)}") def _send_webhook_alert(self, alert_data: Dict[str, Any]): if not self.webhook_url: return try: response = requests.post( self.webhook_url, json=alert_data, timeout=5 ) response.raise_for_status() except Exception as e: logger.error(f"Webhook alert failed: {str(e)}") raise def _send_email_alert(self, alert_data: Dict[str, Any]): if not all([self.email_config['smtp_user'], self.email_config['smtp_password']]): return try: msg = MIMEText(alert_data['message']) msg['Subject'] = f"AIOps Alert: {alert_data['severity'].upper()}" msg['From'] = self.email_config['smtp_user'] msg['To'] = self.email_config['smtp_user'] with smtplib.SMTP(self.email_config['smtp_host'], self.email_config['smtp_port']) as server: server.starttls() server.login( self.email_config['smtp_user'], self.email_config['smtp_password'] ) server.send_message(msg) except Exception as e: logger.error(f"Email alert failed: {str(e)}") raise # main.py import logging import logging.handlers import sys from datetime import datetime import schedule import time from typing import List, Dict, Any import pandas as pd from config import Config from database import Session, MetricData, Alert from models.anomaly_detection import AnomalyDetector from alerting.alert_manager import AlertManager # 配置日志 def setup_logging(): logger = logging.getLogger() logger.setLevel(logging.INFO) # 控制台处理器 console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ) logger.addHandler(console_handler) # 文件处理器 file_handler = logging.handlers.RotatingFileHandler( Config.LOG_DIR / 'aiops.log', maxBytes=10485760, # 10MB backupCount=5 ) file_handler.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ) logger.addHandler(file_handler) return logger class AIOpsManager: def __init__(self): self.logger = setup_logging() self.anomaly_detector = AnomalyDetector( str(Config.MODEL_DIR / 'anomaly_detector.joblib'), Config.MODEL_CONFIG['anomaly_detection'] ) self.alert_manager = AlertManager(Config.ALERT_CONFIG) def collect_metrics(self) -> pd.DataFrame: """从数据库收集指标数据""" session = Session() try: metrics = session.query(MetricData).order_by( MetricData.timestamp.desc() ).limit(1000).all() data = pd.DataFrame([{ 'timestamp': m.timestamp, 'value': m.value, 'metric_name': m.metric_name, **m.labels } for m in metrics]) return data finally: session.close() def process_metrics(self): """处理指标数据并检测异常""" try: # 收集指标数据 data = self.collect_metrics() if data.empty: self.logger.info("No metrics data to process") return # 检测异常 predictions, anomaly_indices = self.anomaly_detector.predict( data[['value']].values ) # 处理异常 if len(anomaly_indices) > 0: anomalies = data.iloc[anomaly_indices] self.handle_anomalies(anomalies) except Exception as e: self.logger.error(f"Error processing metrics: {str(e)}") def handle_anomalies(self, anomalies: pd.DataFrame): """处理检测到的异常""" session = Session() try: for _, anomaly in anomalies.iterrows(): # 创建告警 alert = Alert( timestamp=datetime.now(), severity='high', metric_name=anomaly['metric_name'], value=float(anomaly['value']), message=f"Anomaly detected for {anomaly['metric_name']}: {anomaly['value']}" ) session.add(alert) # 发送告警 self.alert_manager.send_alert({ 'timestamp': alert.timestamp.isoformat(), 'severity': alert.severity, 'metric_name': alert.metric_name, 'value': alert.value, 'message': alert.message }) session.commit() except Exception as e: session.rollback() self.logger.error(f"Error handling anomalies: {str(e)}") finally: session.close() def train_models(self): """定期训练模型""" try: data = self.collect_metrics() if not data.empty: self.anomaly_detector.train(data[['value']].values) self.logger.info("Successfully trained models") except Exception as e: self.logger.error(f"Error training models: {str(e)}") def run(self): """运行AIOps管理器""" self.logger.info("Starting AIOps Manager") # 设置定时任务 schedule.every(1).minutes.do(self.process_metrics) schedule.every(1).hours.do(self.train_models) # 运行循环 while True: try: schedule.run_pending() time.sleep(1) except Exception as e: self.logger.error(f"Error in main loop: {str(e)}") time.sleep(10) # 出错后等待一段时间再继续 if __name__ == "__main__": manager = AIOpsManager() manager.run()