wongzeon / ICP-Checker

ICP备案查询,可查询企业或域名的ICP备案信息,自动完成滑动验证,保存结果到Excel表格,适用于新版的工信部备案管理系统网站,告别频繁拖动验证,以及某站*工具要开通VIP才可查看备案信息的坑
GNU Affero General Public License v3.0
533 stars 87 forks source link

【功能建议】批量功能 #28

Open ghost opened 10 months ago

ghost commented 10 months ago

拿cursor改了几个版本,有用到的地方库主可以参考下

# -*- coding: utf-8 -*-
"""
ICP-Checker.py
日期:2023-10-01
作者:soapffz
改编自:https://github.com/wongzeon/ICP-Checker

此脚本用于批量查询域名的备案信息。它首先获取必要的Cookie和Token,然后对输入的域名进行查询。查询结果包括备案信息和不支持备案的域名。

主要功能如下:
1. 批量查询:支持从文件中读取域名进行批量查询。
2. 备案信息获取:对每个域名,获取其备案信息,包括域名主办方、域名、备案许可证号、网站备案号、域名类型、网站前置审批项、是否限制接入、审核通过日期等。
3. 不支持备案的域名:对于不支持备案的域名,会打印出相应的提示信息。
4. 查询间隔:在批量查询中,每次查询之间有10秒的间隔,以防止频繁查询导致的问题。

注意:此脚本需要在Python 3环境下运行,并需要安装requests和tqdm等第三方库。
"""

import re
import os
import cv2
import time
import base64
import hashlib
import requests
import openpyxl as xl
from openpyxl.styles import Alignment
import argparse
import sys
import logging
from tqdm import tqdm
import os
import subprocess
import logging

# 创建一个handler,用于写入日志文件
handler = logging.StreamHandler(sys.stdout)

# 再创建一个handler,用于输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)

# 定义handler的输出格式
formatter = logging.Formatter("%(message)s")
handler.setFormatter(formatter)
console.setFormatter(formatter)

# 给logger添加handler
logging.getLogger("").addHandler(handler)
logging.getLogger("").addHandler(console)

# 设置日志格式
logging.basicConfig(level=logging.INFO, format="%(message)s")

os.environ["no_proxy"] = "*"

# 添加命令行参数解析
arg_parser = argparse.ArgumentParser(
    description="Check ICP for a domain or a list of domains from a file."
)
arg_parser.add_argument(
    "input", help="The domain or the file containing a list of domains."
)
args = arg_parser.parse_args()

# 设置保存路径
output_directory = "./outs/"
if not os.path.exists(output_directory):
    os.makedirs(output_directory)

# 使用requests.Session
http_session = requests.Session()

class CustomException(Exception):
    pass

def send_request(
    url, method="get", headers=None, data=None, json=None, timeout=(3.06, 27)
):
    try:
        response = requests.request(
            method, url, headers=headers, data=data, json=json, timeout=timeout
        )
        return response
    except requests.RequestException as e:
        raise CustomException(f"请求失败: {e}")

def retrieve_cookies():
    cookie_headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.41 Safari/537.36 Edg/101.0.1210.32"
    }
    err_num = 0
    while err_num < 3:
        response = send_request("https://beian.miit.gov.cn/", headers=cookie_headers)
        try:
            cookie = requests.utils.dict_from_cookiejar(response.cookies)["__jsluid_s"]
            return cookie
        except KeyError:
            err_num += 1
            time.sleep(3)
    raise CustomException("获取Cookie失败,请重试!")

def retrieve_token():
    timeStamp = round(time.time() * 1000)
    authSecret = "testtest" + str(timeStamp)
    authKey = hashlib.md5(authSecret.encode(encoding="UTF-8")).hexdigest()
    auth_data = {"authKey": authKey, "timeStamp": timeStamp}
    url = "https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth"
    try:
        t_response = requests.post(
            url=url, data=auth_data, headers=base_header, timeout=(3.06, 27)
        ).json()
        token = t_response["params"]["bussiness"]
    except:
        return -1
    return token

def retrieve_check_pic(token):
    url = "https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImage"
    base_header["Accept"] = "application/json, text/plain, */*"
    base_header.update({"Content-Length": "0", "token": token})
    try:
        p_request = requests.post(
            url=url, data="", headers=base_header, timeout=(3.06, 27)
        ).json()
        p_uuid = p_request["params"]["uuid"]
        big_image = p_request["params"]["bigImage"]
        small_image = p_request["params"]["smallImage"]
    except:
        return -1
    # 解码图片,写入并计算图片缺口位置
    with open("bigImage.jpg", "wb") as f:
        f.write(base64.b64decode(big_image))
    with open("smallImage.jpg", "wb") as f:
        f.write(base64.b64decode(small_image))
    background_image = cv2.imread("bigImage.jpg", cv2.COLOR_GRAY2RGB)
    fill_image = cv2.imread("smallImage.jpg", cv2.COLOR_GRAY2RGB)
    position_match = cv2.matchTemplate(
        background_image, fill_image, cv2.TM_CCOEFF_NORMED
    )
    max_loc = cv2.minMaxLoc(position_match)[3][0]
    mouse_length = max_loc + 1
    os.remove("bigImage.jpg")
    os.remove("smallImage.jpg")
    check_data = {"key": p_uuid, "value": mouse_length}
    return check_data

def retrieve_sign(check_data, token):
    check_url = "https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/checkImage"
    base_header.update(
        {"Content-Length": "60", "token": token, "Content-Type": "application/json"}
    )
    try:
        pic_sign = requests.post(
            check_url, json=check_data, headers=base_header, timeout=(3.06, 27)
        ).json()
        sign = pic_sign["params"]
    except:
        return -1
    return sign

def query_base(info):
    # 过滤空值和特殊字符,只允许 - —《》. () 分别用于域名和公司名
    try:
        if info == "":
            raise ValueError("InputNone")
        info = re.sub("[^\\u4e00-\\u9fa5-A-Za-z0-9,-.()《》—()]", "", info)
        info = (
            info.replace(" ", "")
            .replace("https://www.", "")
            .replace("http://www.", "")
            .replace("http://", "")
        )
        input_zh = re.compile("[\u4e00-\u9fa5]")
        zh_match = input_zh.search(info)
        if zh_match:
            info_result = info
        else:
            # 检测是否为可备案的域名类型(类型同步日期2022/01/06)
            input_url = re.compile(
                r"([^.]+)(?:\.(?:GOV\.cn|ORG\.cn|AC\.cn|MIL\.cn|NET\.cn|EDU\.cn|COM\.cn|BJ\.cn|TJ\.cn|SH\.cn|CQ\.cn|HE\.cn|SX\.cn|NM\.cn|LN\.cn|JL\.cn|HL\.cn|JS\.cn|ZJ\.cn|AH\.cn|FJ\.cn|JX\.cn|SD\.cn|HA\.cn|HB\.cn|HN\.cn|GD\.cn|GX\.cn|HI\.cn|SC\.cn|GZ\.cn|YN\.cn|XZ\.cn|SN\.cn|GS\.cn|QH\.cn|NX\.cn|XJ\.cn|TW\.cn|HK\.cn|MO\.cn|cn|REN|WANG|CITIC|TOP|SOHU|XIN|COM|NET|CLUB|XYZ|VIP|SITE|SHOP|INK|INFO|MOBI|RED|PRO|KIM|LTD|GROUP|BIZ|AUTO|LINK|WORK|LAW|BEER|STORE|TECH|FUN|ONLINE|ART|DESIGN|WIKI|LOVE|CENTER|VIDEO|SOCIAL|TEAM|SHOW|COOL|ZONE|WORLD|TODAY|CITY|CHAT|COMPANY|LIVE|FUND|GOLD|PLUS|GURU|RUN|PUB|EMAIL|LIFE|CO|FASHION|FIT|LUXE|YOGA|BAIDU|CLOUD|HOST|SPACE|PRESS|WEBSITE|ARCHI|ASIA|BIO|BLACK|BLUE|GREEN|LOTTO|ORGANIC|PET|PINK|POKER|PROMO|SKI|VOTE|VOTO|ICU|LA))",
                flags=re.IGNORECASE,
            )
            info_result = input_url.search(info)
            if info_result is None:
                if info.split(".")[0] == "":
                    raise ValueError("OnlyDomainInput")
                raise ValueError("ValidType")
            else:
                info_result = info_result.group()
        info_data = {
            "pageNum": "1",
            "pageSize": "40",
            "serviceType": 1,
            "unitName": info_result,
        }
        return info_data
    except ValueError as e:
        if str(e) == "InputNone" or str(e) == "OnlyDomainInput":
            logging.error(f"[-] 请正确输入域名: {info}")
        else:
            logging.error(f"[-] {info} 不支持备案")

def retrieve_beian_info(info_data, p_uuid, token, sign):
    global base_header
    domain_list = []
    info_url = "https://hlwicpfwc.miit.gov.cn/icpproject_query/api/icpAbbreviateInfo/queryByCondition"
    base_header.update(
        {"Content-Length": "78", "uuid": p_uuid, "token": token, "sign": sign}
    )
    max_retries = 3
    for _ in range(max_retries):
        try:
            beian_info = requests.post(
                url=info_url, json=info_data, headers=base_header, timeout=(3.06, 27)
            ).json()
            if not beian_info["success"]:
                if beian_info["code"] in [401, 429]:
                    # 如果遇到401或429错误,重新获取COOKIE和token
                    logging.info("[+] 正在获取Cookie,请稍等……")
                    cookie = retrieve_cookies()
                    base_header = {
                        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.41 Safari/537.36 Edg/101.0.1210.32",
                        "Origin": "https://beian.miit.gov.cn",
                        "Referer": "https://beian.miit.gov.cn/",
                        "Cookie": f"__jsluid_s={cookie}",
                    }
                    if cookie != -1:
                        token = retrieve_token()
                        if token == -1:
                            raise CustomException("获取Token失败")
                        logging.info("[+] Retrieving Token, please wait……")
                        if token != -1:
                            logging.info("[+] Token retrieved, querying, please wait……")
                            check_data = retrieve_check_pic(token)
                            if check_data != -1:
                                sign = retrieve_sign(check_data, token)
                                p_uuid = check_data["key"]
                                if sign != -1:
                                    base_header.update(
                                        {
                                            "Content-Length": "78",
                                            "uuid": p_uuid,
                                            "token": token,
                                            "sign": sign,
                                        }
                                    )
                                    continue
                logging.error(
                    f'[-] 请求错误: CODE {beian_info["code"]} MSG {beian_info["msg"]}'
                )
                return domain_list
            # 如果请求成功,处理数据并退出循环
            # ...(省略处理数据的代码)
            break
        except Exception as e:
            logging.error(f"[-] 意外错误: {e}")
            return domain_list
    return domain_list

def save_data(domain_list):
    """
    打印最终结果,并保存数据至Excel表格,同时调整表格格式。
    """
    # 计算需要写入表格的总行数,如果是空列表,即代表该域名没有备案信息,也有可能是获取信息失败了
    total_row = len(domain_list)
    if total_row == 1:
        total_row = 0
    elif total_row == 0:
        logging.info("[!] 所查域名无备案")
        return
    logging.info(f"[+] 查询结果如下:\n\n{domain_list}")
    # 将表格保存到当前目录的outs文件夹下
    file_path = os.path.join(output_directory, "备案信息.xlsx")
    # 存在对应文件,则读取表格追加写入,不存在则创建,并设置表格的标题、列宽、冻结窗格、文字布局等格式
    if os.path.exists(file_path):
        wb = xl.load_workbook(file_path)
        ws = wb["备案信息"]
        max_row = ws.max_row
        start = max_row + 1
        total_row = total_row + start
        after_title = 0
    else:
        wb = xl.Workbook()
        ws = wb.active
        ws.title = "备案信息"
        title_list = [
            "域名主办方",
            "域名",
            "备案许可证号",
            "网站备案号",
            "域名类型",
            "网站前置审批项",
            "是否限制接入",
            "审核通过日期",
        ]
        for i in range(0, 8):
            ws.cell(1, i + 1).value = title_list[i]
        col_width = {
            "A": 45,
            "B": 40,
            "C": 22,
            "D": 24,
            "E": 9,
            "F": 15,
            "G": 13,
            "H": 21,
        }
        for k, v in col_width.items():
            ws.column_dimensions[k].width = v
        ws.freeze_panes = "A2"
        start = 0
        after_title = 2
    # 写入查询数据
    for j in range(start, total_row + 1):
        for k in range(0, 8):
            try:
                ws.cell(j + after_title, k + 1).value = domain_list[j - start][k]
            except:
                continue
    # 垂直居中
    for row in range(ws.max_row):
        for col in range(ws.max_column):
            ws.cell(row + 1, col + 1).alignment = Alignment(
                horizontal="center", vertical="center"
            )
    try:
        wb.save(file_path)
    except PermissionError:
        logging.error("[!] 备案信息登记表格已打开,无法写入文件。如需写入,请关闭文件后重新执行!")
        return -1
    logging.info(f"[+] 查询结果保存在:{file_path}")
    return "OK"

import time

def main(input):
    try:
        query_count = 0  # 添加计数器
        while True:  # 添加一个无限循环
            if query_count % 20 == 0:  # 每20次查询后重新生成COOKIE和token
                logging.info("[+] 正在获取Cookie,请稍等……")
                cookie = retrieve_cookies()
                global base_header
                base_header = {
                    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.41 Safari/537.36 Edg/101.0.1210.32",
                    "Origin": "https://beian.miit.gov.cn",
                    "Referer": "https://beian.miit.gov.cn/",
                    "Cookie": f"__jsluid_s={cookie}",
                }
                if cookie != -1:
                    token = retrieve_token()
                    logging.info("[+] Retrieving Token, please wait……")
                    if token != -1:
                        logging.info("[+] Token retrieved, querying, please wait……")
                        check_data = retrieve_check_pic(token)
                        if check_data != -1:
                            sign = retrieve_sign(check_data, token)
                            p_uuid = check_data["key"]
                            if sign != -1:
                                # If input is a file, perform batch check
                                if os.path.isfile(input):
                                    # 获取文件行数作为进度条的总进度
                                    with open(input) as f:
                                        total = sum(1 for _ in f)

                                    with open(input) as f, tqdm(
                                        total=total, ncols=70, position=0, leave=True
                                    ) as pbar:
                                        for line in f:
                                            domain = line.strip()
                                            logging.info(f"\n[+] 正在查询 {domain} ……")
                                            info = query_base(domain)
                                            domain_list = retrieve_beian_info(
                                                info, p_uuid, token, sign
                                            )
                                            if domain_list:
                                                logging.info(
                                                    f"\n{domain} 备案信息为:\n{domain_list}"
                                                )
                                            else:
                                                logging.info(f"\n{domain} 不支持备案")
                                            save_data(domain_list)
                                            pbar.update()
                                            time.sleep(8)  # 设置间隔时间
                                            query_count += 1  # 每次查询后增加计数器的值
                                else:
                                    domain = input
                                    info = query_base(domain)
                                    domain_list = retrieve_beian_info(
                                        info, p_uuid, token, sign
                                    )
                                    save_data(domain_list)
                                    query_count += 1  # 每次查询后增加计数器的值
    except CustomException as e:
        logging.error(f"[-] {e}\n")

if __name__ == "__main__":
    main(args.input)

检测到401重新生成cookie和token,就是429屏蔽设置8秒或10秒间隔都会被长时间封禁

还改了一个加代理池的版本,但是用代理IP会有SSL的问题

wongzeon commented 10 months ago

感谢,这个写得不错👍