songquanpeng / message-pusher

搭建专属于你的消息推送服务,支持多种消息推送方式,支持 Markdown,基于 Golang 仅单可执行文件,开箱即用
https://msgpusher.com
MIT License
2.58k stars 385 forks source link

API 请求频率限制失效 #98

Open Ai-Yolo opened 1 year ago

Ai-Yolo commented 1 year ago

例行检查

问题描述

通过下面的python测试脚本进行压测发现未对API接口进行限制。

复现步骤

import requests
import time
import threading
from concurrent.futures import ThreadPoolExecutor

# 设置请求次数和延迟
NUM_REQUESTS = 500
DELAY = 0.01  # 10ms

# URL 和 数据  修改参数
URL = 'https://push.xxxxx.com/push/root'
PARAMS = {
    'title': 'test',
    'description': 'test',
    'channel': 'bark'
}
HEADERS = {'Content-Type': 'application/json'}

# 计数器和锁
success_count = 0
failure_count = 0
counter_lock = threading.Lock()

# 用于记录每次请求的响应时间
response_times = []

# 初始化session
session = requests.Session()

# 发出GET请求的函数
def make_get_request():
    global success_count
    global failure_count
    start_time = time.time()
    try:
        response = session.get(URL, params=PARAMS)
        if response.status_code == 200:
            with counter_lock:
                success_count += 1
        else:
            with counter_lock:
                failure_count += 1
    except Exception as e:
        print(f"GET request failed with error: {e}")
        with counter_lock:
            failure_count += 1
    finally:
        response_times.append(time.time() - start_time)
    time.sleep(DELAY)

# 发出POST请求的函数
def make_post_request():
    global success_count
    global failure_count
    start_time = time.time()
    try:
        response = session.post(URL, json=PARAMS, headers=HEADERS)
        if response.status_code == 200:
            with counter_lock:
                success_count += 1
        else:
            with counter_lock:
                failure_count += 1
    except Exception as e:
        print(f"POST request failed with error: {e}")
        with counter_lock:
            failure_count += 1
    finally:
        response_times.append(time.time() - start_time)
    time.sleep(DELAY)

# 记录测试开始时间
start_time = time.time()

# 使用线程池并发发出请求
with ThreadPoolExecutor(max_workers=20) as executor:
    # GET 请求
    for _ in range(NUM_REQUESTS):
        executor.submit(make_get_request)

    # 重置计数器
    success_count = 0
    failure_count = 0
    response_times = []

    # POST 请求
    for _ in range(NUM_REQUESTS):
        executor.submit(make_post_request)

# 记录测试结束时间
end_time = time.time()

# 输出成功和失败的次数
print(f'Success count: {success_count}')
print(f'Failure count: {failure_count}')

# 输出测试时间和平均响应时间
print(f'Test duration: {end_time - start_time:.3f} seconds')
print(f'Average response time: {sum(response_times) / len(response_times):.3f} seconds')

预期结果

希望可以改成对单用户或全局进行请求数量限制

相关截图

PK%9Q{@O5SHDZQPPFHB{68A

songquanpeng commented 1 year ago

Okay