Added SlackNotification, TelegramNotification, TwilioSMSNotification, and DiscordNotification classes to handle additional notification endpoints.
Updated initialize_notifiers Function: Extended to check the configuration (DEFAULTS) for each new notification type and initialize the corresponding notifier if the required settings are available.
Ensured Backward Compatibility:
The existing logic and functionalities for GotifyNotification, EmailNotification, and UptimeKumaNotification remain unchanged.
import logging
import requests
import smtplib
from email.mime.text import MIMEText
from abc import ABC, abstractmethod
from config import DEFAULTS
# Abstract base class for notification proxies
class NotificationProxy(ABC):
"""
Abstract class representing a generic notification proxy.
Classes inheriting from this must implement the send_notification method.
"""
@abstractmethod
def send_notification(self, title: str, message: str, priority: int = 5):
pass
# Gotify notification implementation
class GotifyNotification(NotificationProxy):
def __init__(self, url: str, token: str):
self.url = url
self.token = token
def send_notification(self, title: str, message: str, priority: int = 5):
payload = {'title': title, 'message': message, 'priority': priority}
headers = {'X-Gotify-Key': self.token}
try:
response = requests.post(f"{self.url}/message", data=payload, headers=headers)
response.raise_for_status()
logging.info(f"Gotify notification sent: {title} - {message}")
except requests.exceptions.RequestException as e:
logging.error(f"Gotify notification failed: {e}")
# Email notification implementation
class EmailNotification(NotificationProxy):
def __init__(self, smtp_server: str, port: int, username: str, password: str, from_addr: str, to_addrs: list):
self.smtp_server = smtp_server
self.port = port
self.username = username
self.password = password
self.from_addr = from_addr
self.to_addrs = to_addrs
def send_notification(self, title: str, message: str, priority: int = 5):
msg = MIMEText(message)
msg['Subject'] = title
msg['From'] = self.from_addr
msg['To'] = ', '.join(self.to_addrs)
try:
with smtplib.SMTP(self.smtp_server, self.port) as server:
server.starttls()
server.login(self.username, self.password)
server.sendmail(self.from_addr, self.to_addrs, msg.as_string())
logging.info(f"Email sent: {title} - {message}")
except Exception as e:
logging.error(f"Failed to send email: {e}")
# Uptime Kuma notification implementation
class UptimeKumaNotification(NotificationProxy):
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send_notification(self, title: str, message: str, priority: int = 5):
try:
response = requests.get(self.webhook_url)
if response.status_code == 200:
logging.info("Uptime Kuma notification sent successfully")
else:
logging.error(f"Failed to send Uptime Kuma notification: {response.status_code}")
except Exception as e:
logging.error(f"Error sending Uptime Kuma notification: {e}")
# Slack notification implementation
class SlackNotification(NotificationProxy):
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send_notification(self, title: str, message: str, priority: int = 5):
payload = {"text": f"*{title}*\n{message}"}
try:
response = requests.post(self.webhook_url, json=payload)
response.raise_for_status()
logging.info(f"Slack notification sent: {title} - {message}")
except requests.exceptions.RequestException as e:
logging.error(f"Slack notification failed: {e}")
# Telegram notification implementation
class TelegramNotification(NotificationProxy):
def __init__(self, bot_token: str, chat_id: str):
self.bot_token = bot_token
self.chat_id = chat_id
def send_notification(self, title: str, message: str, priority: int = 5):
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
payload = {'chat_id': self.chat_id, 'text': f"{title}\n{message}"}
try:
response = requests.post(url, data=payload)
response.raise_for_status()
logging.info(f"Telegram notification sent: {title} - {message}")
except requests.exceptions.RequestException as e:
logging.error(f"Telegram notification failed: {e}")
# Twilio SMS notification implementation
class TwilioSMSNotification(NotificationProxy):
def __init__(self, account_sid: str, auth_token: str, from_number: str, to_number: str):
self.account_sid = account_sid
self.auth_token = auth_token
self.from_number = from_number
self.to_number = to_number
def send_notification(self, title: str, message: str, priority: int = 5):
from twilio.rest import Client
client = Client(self.account_sid, self.auth_token)
try:
client.messages.create(body=f"{title}\n{message}", from_=self.from_number, to=self.to_number)
logging.info(f"Twilio SMS sent: {title} - {message}")
except Exception as e:
logging.error(f"Twilio SMS notification failed: {e}")
# Discord notification implementation
class DiscordNotification(NotificationProxy):
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send_notification(self, title: str, message: str, priority: int = 5):
payload = {"content": f"**{title}**\n{message}"}
try:
response = requests.post(self.webhook_url, json=payload)
response.raise_for_status()
logging.info(f"Discord notification sent: {title} - {message}")
except requests.exceptions.RequestException as e:
logging.error(f"Discord notification failed: {e}")
# Initialize and return the list of configured notifiers
def initialize_notifiers():
notifiers = []
# Initialize email notifier if SMTP settings are available
if DEFAULTS.get('smtp_server') and DEFAULTS.get('smtp_username') and DEFAULTS.get('smtp_password'):
try:
email_notifier = EmailNotification(
smtp_server=DEFAULTS['smtp_server'],
port=DEFAULTS.get('smtp_port', 587),
username=DEFAULTS['smtp_username'],
password=DEFAULTS['smtp_password'],
from_addr=DEFAULTS['smtp_from'],
to_addrs=DEFAULTS['smtp_to']
)
notifiers.append(email_notifier)
except Exception as e:
logging.error(f"Failed to initialize Email notifier: {e}")
# Initialize Gotify notifier if Gotify settings are available
if DEFAULTS.get('gotify_url') and DEFAULTS.get('gotify_token'):
try:
gotify_notifier = GotifyNotification(DEFAULTS['gotify_url'], DEFAULTS['gotify_token'])
notifiers.append(gotify_notifier)
except Exception as e:
logging.error(f"Failed to initialize Gotify notifier: {e}")
# Initialize Uptime Kuma notifier if webhook URL is available
if DEFAULTS.get('uptime_kuma_webhook_url'):
try:
uptime_kuma_notifier = UptimeKumaNotification(DEFAULTS['uptime_kuma_webhook_url'])
notifiers.append(uptime_kuma_notifier)
except Exception as e:
logging.error(f"Failed to initialize Uptime Kuma notifier: {e}")
# Initialize Slack notifier if webhook URL is available
if DEFAULTS.get('slack_webhook_url'):
try:
slack_notifier = SlackNotification(DEFAULTS['slack_webhook_url'])
notifiers.append(slack_notifier)
except Exception as e:
logging.error(f"Failed to initialize Slack notifier: {e}")
# Initialize Telegram notifier if bot token and chat ID are available
if DEFAULTS.get('telegram_bot_token') and DEFAULTS.get('telegram_chat_id'):
try:
telegram_notifier = TelegramNotification(
bot_token=DEFAULTS['telegram_bot_token'],
chat_id=DEFAULTS['telegram_chat_id']
)
notifiers.append(telegram_notifier)
except Exception as e:
logging.error(f"Failed to initialize Telegram notifier: {e}")
# Initialize Twilio SMS notifier if account SID, auth token, and numbers are available
if DEFAULTS.get('twilio_account_sid') and DEFAULTS.get('twilio_auth_token') and \
DEFAULTS.get('twilio_from_number') and DEFAULTS.get('twilio_to_number'):
try:
twilio_notifier = TwilioSMSNotification(
account_sid=DEFAULTS['twilio_account_sid'],
auth_token=DEFAULTS['twilio_auth_token'],
from_number=DEFAULTS['twilio_from_number'],
to_number=DEFAULTS['twilio_to_number']
)
notifiers.append(twilio_notifier)
except Exception as e:
logging.error(f"Failed to initialize Twilio SMS notifier: {e}")
# Initialize Discord notifier if webhook URL is available
if DEFAULTS.get('discord_webhook_url'):
try:
discord_notifier = DiscordNotification(DEFAULTS['discord_webhook_url'])
notifiers.append(discord_notifier)
except Exception as e:
logging.error(f"Failed to initialize Discord notifier: {e}")
return notifiers
# Send notification to all initialized notifiers
def send_notification(title, message, priority=5):
notifiers = initialize_notifiers()
if notifiers:
for notifier in notifiers:
try:
notifier.send_notification(title, message, priority)
except Exception as e:
logging.error(f"Failed to send notification using {notifier.__class__.__name__}: {e}")
else:
logging.warning("No notification system configured.")
Key Changes
New Notification Classes: