jayus0821 / swagger-hack

自动化爬取并自动测试所有swagger接口
934 stars 102 forks source link

魔改了一下,渗透测试目标效果很好,就不提pr了,作者大大感兴趣可以放进repo里面。 #3

Open Kai5174 opened 3 years ago

Kai5174 commented 3 years ago

相较原repo改动如下

个人比较喜欢用这个做logger,因此要新加一个依赖

pip install loguru

代码如下:

import json
import random

import requests
import csv
from loguru import logger
import urllib3

urllib3.disable_warnings()

USER_AGENTS = [
    "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.8) Gecko Fedora/1.9.0.8-1.fc10 Kazehakase/0.5.6",
    "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/535.20 (KHTML, like Gecko) Chrome/19.0.1036.7 Safari/535.20",
    "Opera/9.80 (Macintosh; Intel Mac OS X 10.6.8; U; fr) Presto/2.9.168 Version/11.52",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:81.0) Gecko/20100101 Firefox/81.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/535.20 (KHTML, like Gecko) Chrome/19.0.1036.7 Safari/535.20",
    "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
    "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
    "Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
    "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
    "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 2.0.50727; Media Center PC 6.0)",
    "Mozilla/5.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 1.0.3705; .NET CLR 1.1.4322)",
    "Mozilla/4.0 (compatible; MSIE 7.0b; Windows NT 5.2; .NET CLR 1.1.4322; .NET CLR 2.0.50727; InfoPath.2; .NET CLR 3.0.04506.30)",
    "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN) AppleWebKit/523.15 (KHTML, like Gecko, Safari/419.3) Arora/0.3 (Change: 287 c9dfb30)",
]

class HttpMixin:

    def _random_x_forwarded_for(self):
        return "%d.%d.%d.%d" % (
            random.randint(1, 254),
            random.randint(1, 254),
            random.randint(1, 254),
            random.randint(1, 254),
        )

    def get_or_confuse_headers(self, headers=None, testShiro=False, isRest=False):
        if headers is None:
            headers = {
                "User-Agent": random.choice(USER_AGENTS),
                "X-Forwarded-For": self._random_x_forwarded_for(),
                # "Referer": "http://www.baidu.com",
            }
            if not testShiro:
                headers['Cookie'] = "rememberMe=xx"
        else:
            headers["User-Agent"] = random.choice(USER_AGENTS)
            headers["X-Forwarded-For"] = self._random_x_forwarded_for()
            headers["Referer"] = "http://www.baidu.com"
            if headers.get("Cookie"):
                if 'rememberMe=xx' not in headers.get("Cookie"):
                    headers["Cookie"] += ";" + "rememberMe=xx"
                else:
                    pass
            else:
                headers["Cookie"] = "rememberMe=xx"

        if isRest:
            headers["Content-Type"] = "Application/json"

        return headers

class SwaggerHack(HttpMixin):
    def __init__(self, proxy_ip, proxy_port):
        f = open('swagger.csv', 'w', newline='', encoding='utf-8')  # 写到csv中
        self.writer = csv.writer(f)
        self.proxy_ip = proxy_ip
        self.proxy_port = proxy_port

    def run(self, url: str):
        specs = self.get_specs(url)
        logger.info("[+] 共抓取到 %d 个标准" % (len(specs)))
        try:
            self.writer.writerow(
                ["标准", "summary", "path", "method", "consumes", "url", "num of params", "data", "status_code",
                 "response"])
        except Exception as e:
            logger.info(e)

        for spec in specs:
            spec_url = url + spec['url']
            pre = spec['url'].split('/')[1]
            logger.info("[+] : 开始测试 %s 标准" % spec_url)
            self.check_spec(spec_url, url)
            # break

    def get_proxy_dict(self):
        proxies = {
            "http": "http://{}:{}".format(self.proxy_ip, self.proxy_port),
            "https": "http://{}:{}".format(self.proxy_ip, self.proxy_port),
        }
        return proxies

    def get_specs(self, url):  # 获取标准列表
        specs_url = url + "/swagger-resources"
        res = requests.get(url=specs_url, proxies=self.get_proxy_dict(), verify=False)
        specs = json.loads(res.text)
        return specs

    def check_spec(self, spec_url, url):  # 前一个是接口文档,用于分析,后一个是文档对应的实际接口请求地址
        res = requests.get(url=spec_url, proxies=self.get_proxy_dict(), verify=False)
        headers = self.get_or_confuse_headers(isRest=True)
        try:
            paths = json.loads(res.text)['paths']
            logger.info("[+] : 此标准下共有 %d 个接口" % (len(paths)))
        except Exception as e:
            logger.exception(e)
            return 0

        for path in paths:
            logger.info("[+] : 开始测试接口 %s " % (path))
            methods = paths[path]
            for method in methods:
                tags = paths[path][method]['tags'][0]
                summary = paths[path][method]['summary']
                operationId = paths[path][method]['operationId']
                if 'consumes' in paths[path][method].keys():  # json格式
                    consumes = paths[path][method]['consumes'][0]
                else:
                    consumes = '0'

                if consumes != '0':  # 如果是json格式传输  post/put #post和post都是发送的json,但是接口文档并没有如何构造json的参数,目前只是随便发送一个
                    logger.info("使用json格式传输")
                    json_array = {}
                    if 'parameters' in paths[path][method]:
                        parameters = paths[path][method]['parameters']
                        logger.info("接口参数个数为 %d" % (len(parameters)))
                        for parameter in parameters:
                            position = parameter.get('in')
                            if position == "header":
                                if parameter.get('type') == 'boolean':
                                    headers[parameter['name']] = 'true'
                                else:
                                    headers[parameter['name']] = '1'
                            else:
                                if parameter.get('type') == "boolean":  # 布尔型全为true,string和数字全部为1
                                    json_array[parameter['name']] = 'true'
                                else:
                                    json_array[parameter['name']] = '1'
                    else:
                        json_array = ''
                        logger.info("接口参数个数为 %d" % (0))

                    logger.info("构造请求参数...")
                    json_string = json.dumps(json_array)
                    logger.info(json_string)

                    if method == "post":
                        res = requests.post(url=url + path, data=json_string, headers=headers, verify=False,
                                            proxies=self.get_proxy_dict())
                    elif method == "put":
                        logger.info("[!] {} 存有put方法,我不敢测".format(url))
                    try:  # post居然也可能没参数
                        row = [spec_url, summary, path, method, consumes, url + path,
                               str(len(paths[path][method]['parameters'])), json_string, res.status_code, res.text]
                    except:
                        row = [spec_url, summary, path, method, consumes, url + path, '0', json_string, res.status_code,
                               res.text]
                    self.writer.writerow(row)
                else:  # 不是json传输
                    if "{" in path:
                        parameter = paths[path][method]['parameters'][0]
                        print(parameter)
                        try:
                            if parameter.get('type') == "boolean":  # 布尔型全为true,string和数字全部为1
                                tmp = "true"
                            else:
                                tmp = "1"
                        except Exception as e:
                            tmp = "{1}"
                            logger.exception(e)
                        if method == 'get':
                            res = requests.get(url=url + path[:path.index('{')] + tmp, verify=False,
                                               proxies=self.get_proxy_dict())
                        elif method == 'delete':
                            logger.info("[!] {} 存有delete方法,我不敢测".format(url))

                        row = [spec_url, summary, path, method, consumes, url + path[:path.index('{')],
                               str(len(paths[path][method]['parameters'])), "", res.status_code, res.text]
                        self.writer.writerow(row)

                    else:
                        query_string = ''
                        if 'parameters' in paths[path][method]:
                            parameters = paths[path][method]['parameters']
                            num_of_param = len(parameters)
                            for parameter in parameters:
                                try:
                                    if parameter['type'] == "boolean":  # 布尔型全为true,string和数字全部为1
                                        query_string += "&%s=true" % (parameter['name'])
                                    else:
                                        query_string += "&%s=1" % (parameter['name'])
                                except:
                                    query_string += "&%s={1}" % (parameter['name'])
                        else:
                            query_string = ''
                            num_of_param = 0
                        query_string = query_string[1:]

                        if method == "get":
                            res = requests.get(url=url + path + "?" + query_string, verify=False,
                                               proxies=self.get_proxy_dict())
                        elif method == "delete":
                            logger.info("[!] {} 存有delete方法,我不敢测".format(url))

                        row = [spec_url, summary, path, method, consumes, url + path + "?" + query_string,
                               str(num_of_param), "", res.status_code, res.text]
                        self.writer.writerow(row)

if __name__ == '__main__':
    target_url = "https://xxx.xxx.xxx/"
    swaggerHack = SwaggerHack("localhost", 8080)
    swaggerHack.run(target_url)
jayus0821 commented 3 years ago

师傅太强了 爱了

jayus0821 commented 3 years ago

最近有点忙,后面把师傅的加进来

Kai5174 commented 3 years ago

最近有点忙,后面把师傅的加进来

可能还需要确认下通用性,跑我当前的场景是ok的。总有蛋疼的场景。。。