dromara / domain-admin

域名SSL证书监测平台、SSL证书申请自动续签。Domain and SSL Cert monitor System.
https://domain-admin.cn/
MIT License
1.67k stars 241 forks source link

【功能请求】增加CDN和宝塔Nginx自动部署 #111

Open fcwys opened 3 months ago

fcwys commented 3 months ago

1、支持阿里云、腾讯云、多吉云CDN自动部署(可参考这个),可选指定域名启用或禁用自动更新; 2、支持宝塔面板Nginx自动部署,可选指定域名启用或禁用自动更新(通过API); 非常感谢!!!

多吉云API:https://docs.dogecloud.com/cdn/api-cert-upload Nginx配置解析可使用库:https://github.com/nginxinc/crossplane

附:解析Nginx配置文件获取站点域名及证书私钥路径:

import os
import re
import crossplane

# nginx.conf文件路径
nginx_conf = 'C:\\Users\\admin\\Desktop\\nginx\\conf\\nginx.conf'

# 解析server块获取所有站点的server_name、ssl_certificate、ssl_certificate_key
def ParseServerBlock(block: dict, config_dir: str):
    """
    解析server块获取所有站点的server_name、ssl_certificate、ssl_certificate_key
    :param block: server块
    :param config_dir: nginx.conf所在目录
    :return: 站点信息字典
    """
    s = {'server_name': [], 'cert_file': '', 'key_file': ''}
    # 遍历server块的所有子块
    for sub_block in block['block']:
        if sub_block['directive'] == 'server_name':
            # 解析server_name参数
            for arg in sub_block['args']:
                # 正则校验域名格式是否正确
                if re.search(r'^[0-9\.]+$|^([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}$', arg):
                    s['server_name'].append(arg)
        elif sub_block['directive'] == 'ssl_certificate':
            # 解析ssl_certificate参数
            if sub_block['args']:
                # 若路径不是绝对路径,则拼接nginx.conf所在目录路径
                if os.path.isabs(sub_block['args'][0]):
                    s['cert_file'] = os.path.abspath(sub_block['args'][0])
                else:
                    s['cert_file'] = os.path.abspath(os.path.join(config_dir, sub_block['args'][0]))
        elif sub_block['directive'] == 'ssl_certificate_key':
            # 解析ssl_certificate_key参数
            if sub_block['args']:
                # 若路径不是绝对路径,则拼接nginx.conf所在目录路径
                if os.path.isabs(sub_block['args'][0]):
                    s['key_file'] = os.path.abspath(sub_block['args'][0])
                else:
                    s['key_file'] = os.path.abspath(os.path.join(config_dir, sub_block['args'][0]))
    return s

# 解析nginx.conf文件获取所有站点信息
def ParseSites(config_file: str):
    """
    解析nginx.conf文件获取所有站点信息
    :param config_file: nginx.conf文件路径
    :return: 所有站点信息列表
    """
    sites = []
    config_dir = os.path.dirname(config_file)    # 获取nginx.conf所在目录
    config = crossplane.parse(config_file)    # 解析nginx.conf文件
    if 'errors' in config and config['errors']:
        print('>>> 解析nginx.conf文件失败,请检查配置文件语法是否正确')
        return False
    # 遍历所有配置块
    for site in config['config']:
        if site['file'].endswith('.conf'):
            for parse in site['parsed']:
                # 若是http块则遍历server块(通常对于nginx.conf文件)
                if parse['directive'] == 'http':
                    for http_block in parse['block']:
                        if http_block['directive'] == 'server':
                            s = ParseServerBlock(http_block, config_dir)  # 解析server块
                            if len(s['server_name']) > 0:
                                sites.append(s)
                # 若是server块,则直接解析
                elif parse['directive'] == 'server':
                    s = ParseServerBlock(parse, config_dir)   # 解析server块
                    if len(s['server_name']) > 0:
                        sites.append(s)
    return sites

if __name__ == '__main__':
    sites = ParseSites(nginx_conf)
    # print(sites)
    for site in sites:
        print(site)

附:个人实现的宝塔获取站点列表、上传证书、重启Nginx代码,希望能帮上忙:

# -*- coding: utf-8 -*-
import hashlib
import time
import requests
import prettytable

# 宝塔面板操作类
class BtPanel:
    __BTURL = ''
    __APIKEY = ''
    __REQ = requests.session()

    # 初始化宝塔面板
    def __init__(self, host: str, apisk: str):
        '''
        :param host: 宝塔面板地址(末尾不加/)
        :param apisk: 宝塔面板API密钥
        '''
        self.__BTURL = host
        self.__APIKEY = apisk

    # 计算MD5
    def __GetMD5(self, s: str):
        '''
        计算字符串的MD5值
        :param s: 待计算的字符串
        :return: MD5值
        '''
        m = hashlib.md5()
        m.update(s.encode('utf-8'))
        return m.hexdigest()

    # 签名计算
    def __GetToken(self):
        request_time = int(time.time())    # 获取请求时间戳
        request_token = self.__GetMD5(str(request_time) + '' + self.__GetMD5(self.__APIKEY)),   # 生成请求签名
        return {'request_time': request_time, 'request_token': request_token}

    # 获取站点列表
    def GetSites(self, showlog=True):
        '''
        获取宝塔面板站点列表
        :showlog: 是否输出结果
        :return: 站点列表数据
        '''
        if showlog:
            print('\n### 获取站点列表...')
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token'],
            'p': 1,
            'limit': 100,
            'order': 'id'
        }
        res = self.__REQ.post(url=self.__BTURL + '/data?action=getData&table=sites', data=playload).json()
        # 判断请求是否成功
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 获取站点列表失败:', res['msg'])
            return False
        # 使用prettytable输出站点列表
        if showlog:
            tb = prettytable.PrettyTable()
            tb.field_names = ['网站名', '站点类型', '备注', 'SSL域名', 'SSL到期时间', 'SSL剩余天数', '证书品牌', '状态']
            tb.align['网站名'] = 'l'
            tb.align['备注'] = 'l'
            tb.align['SSL域名'] = 'l'
            tb.align['证书品牌'] = 'l'
            for site in res['data']:
                site_status = '运行' if site['status'] == '1' else '停止'
                if site['ssl'] == -1:
                    sslinfo = {'notAfter': '-', 'endtime': '-', 'subject': '-', 'issuer': '-'}
                else:
                    sslinfo = {'notAfter': site['ssl']['notAfter'], 'endtime': site['ssl']['endtime'], 'subject': site['ssl']['subject'], 'issuer': site['ssl']['issuer']}
                tb.add_row([site['name'], site['project_type'], site['ps'], sslinfo['subject'], sslinfo['notAfter'], sslinfo['endtime'], sslinfo['issuer'], site_status])
            print(tb)
        return res['data']

    # 设置站点SSL证书
    def SetSSL(self, site_name: str, ssl_cert_content: str, ssl_key_content: str):
        '''
        设置站点SSL证书
        :param site_name: 站点名称(域名)
        :param ssl_cert_content: ssl证书内容
        :param ssl_key_content: ssl私钥内容
        :return: 设置结果
        '''
        print('\n### 设置站点SSL证书...')
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token'],
            'type': 0,
            'siteName': site_name,
            'key': ssl_key_content,
            'csr': ssl_cert_content
        }
        res = self.__REQ.post(url=self.__BTURL + '/site?action=SetSSL', data=playload).json()
        # 判断请求是否成功
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 设置站点SSL证书失败:', res['msg'])
            return False
        print('>>>', res['msg'])    # 输出结果
        return res['status']

    # 获取证书夹列表
    def GetCertList(self, showlog=True):
        '''
        获取证书夹列表
        :showlog: 是否输出结果
        :return: 证书夹列表数据
        '''
        if showlog:
            print('\n### 获取证书夹列表...')
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token'],
            'force_refresh': 0    # 0:获取本地证书 1:获取云端证书
        }
        res = self.__REQ.post(url=self.__BTURL + '/ssl?action=get_cert_list', data=playload).json()
        # 判断请求是否成功
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 获取证书夹列表失败:', res['msg'])
            return False
        # 使用prettytable输出证书夹列表
        if showlog:
            tb = prettytable.PrettyTable()
            tb.field_names = ['域名', 'SSL到期时间', 'SSL剩余天数', '证书品牌', '可选域名']
            tb.align['域名'] = 'l'
            tb.align['证书品牌'] = 'l'
            tb.align['可选域名'] = 'l'
            for cert in res:
                tb.add_row([cert['subject'], cert['info']['notAfter'], cert['endtime'], cert['info']['issuer'], cert['dns']])
            print(tb)
        return res

    # 删除过期SSL证书
    def DelExpiredSSL(self):
        '''
        删除过期SSL证书
        :showlog: 是否输出结果
        :return: 删除结果
        '''
        print('\n### 删除过期SSL证书...')
        certs = self.GetCertList(showlog=False)    # 获取证书列表
        if not certs:
            return False
        expiredcerts = []    # 过期证书列表
        for cert in certs:
            if cert['endtime'] < 0:    # 证书已过期
                expiredcerts.append(cert['subject'])    # 加入过期证书列表
                tk = self.__GetToken()    # 获取签名
                playload = {
                    'request_time': tk['request_time'],
                    'request_token': tk['request_token'],
                    'local': 1,
                    'ssl_hash': cert['hash']
                }
                res = self.__REQ.post(url=self.__BTURL + '/ssl?action=remove_cloud_cert', data=playload).json()
                # 判断请求是否成功
                if 'status' in res and 'msg' in res and not res['status']:
                    print('>>> 删除过期SSL证书失败:', res['msg'])
                    return False
                print('>>> 过期证书', cert['subject'], res['msg'])
                return res['status']
        if len(expiredcerts) == 0:
            print('>>> 证书夹内未发现过期证书')
        return expiredcerts

    # 重启Nginx
    def RestartNginx(self):
        '''
        重启Nginx
        :return: 重启结果
        '''
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token'],
            'name': 'nginx',
            'type': 'restart'
        }
        res = self.__REQ.post(url=self.__BTURL + '/system?action=ServiceAdmin', data=playload).json()
        # 判断请求是否成功
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 重启Nginx失败:', res['msg'])
            return False
        return res.json()['status']

    # 获取系统基本信息
    def GetSystemInfo(self, showlog=True):
        '''
        获取系统基本信息
        :showlog: 是否输出结果
        :return: 系统基本信息
        '''
        if showlog:
            print('\n### 获取系统基本信息...')
        tk = self.__GetToken()    # 获取签名
        playload = {
            'request_time': tk['request_time'],
            'request_token': tk['request_token']
        }
        res = self.__REQ.post(url=self.__BTURL + '/system?action=GetSystemTotal', data=playload).json()
        # 判断请求是否成功
        # 判断res是否存在status字段,若不存在则说明请求失败
        if 'status' in res and 'msg' in res and not res['status']:
            print('>>> 获取系统基本信息失败:', res['msg'])
            return False
        res['memUtilization'] = round(res['memRealUsed'] / res['memTotal'], 2) * 100    # 内存使用率
        if showlog:
            print('>>> 面板地址:', self.__BTURL)
            print('>>> 操作系统:', res['system'])
            print('>>> 面板版本:', res['version'])
            print('>>> 运行时间:', res['time'])
            print('>>> CPU使用率:', res['cpuRealUsed'], '%')
            print('>>> 内存使用率:', res['memUtilization'], '%')
            print('>>> 内存总量:', round(res['memTotal'] / 1024, 2), 'GB')
        return res

上述代码已经过测试可行。

mouday commented 3 months ago

好的,我研究下