vutran1710 / PyrateLimiter

⚔️Python Rate-Limiter using Leaky-Bucket Algorithm Family
https://pyratelimiter.readthedocs.io
MIT License
334 stars 36 forks source link

RedisBucket no rate limit #149

Closed grayguest closed 7 months ago

grayguest commented 7 months ago
import http
import json
import re
import time
import urllib3
from urllib.parse import urlencode, quote

import redis
import requests
from bs4 import BeautifulSoup
from loguru import logger
from pyrate_limiter import Duration, Limiter, RedisBucket, RequestRate, SQLiteBucket
from requests_ratelimiter import LimiterSession

from config import settings

class EtherscanAPI(object):
    """_summary_
    https://docs.etherscan.io/

    My API Plan: FREE API PLAN
    API calls per second: 5 calls

    Args:
        object (_type_): _description_

    Returns:
        _type_: _description_
    """

    proto = 'https://'
    domain = 'api.etherscan.io'
    second_rate = RequestRate(5, Duration.SECOND)  # 5 requests per second
    redis_pool = redis.ConnectionPool(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        password=settings.REDIS_PASSWORD,
        db=settings.REDIS_DB,
        # max_connections=20,
        # decode_responses=True,
    )
    limiter = Limiter(
        second_rate,
        bucket_class=RedisBucket,
        bucket_kwargs={'redis_pool': redis_pool, 'bucket_name': 'etherscan_api'},
    )

    @classmethod
    @limiter.ratelimit(
        'etherscan_api', delay=True, max_delay=360
    )  # xxx
    def get(cls, path='/api', **params):
        """

        :param path: 
        :param params:
        :return:
        """
        url = cls.proto + cls.domain + path
        cls.session = requests.Session()

        cls.session.headers.update(
            {
                "User-Agent": settings.UA,
                # 'Accept-encoding': 'gzip',
                "Connection": "keep-alive",
            }
        )
        cls.session.proxies = settings.PROXIES
        r = cls.session.get(url, params=params, timeout=30)
        if r.status_code == http.HTTPStatus.OK:
            try:
                rsp = r.json()
            except json.decoder.JSONDecodeError as jde:
                return r
            if rsp["status"] == '1' and rsp["message"] == "OK":
                return rsp
            else:
                logger.error(
                    "request etherscan api error, response data's status code is error: %s, %s, %s, %s"
                    % (path, rsp["result"], rsp["status"], rsp["message"])
                )
                return False
        else:
            logger.error(
                "request etherscan api error, request url: %s, param: %s, response status_code: %s"
                % (url, params, r.status_code)
            )
            return False

    @classmethod
    def get_gas_price(cls):
        payload = {"module": "gastracker", "action": "gasoracle", "apikey": settings.ETHERSCAN_API_KEY}
        return cls.get('/api', **payload)

if __name__ == '__main__':
    start = time.time()

    # Send requests that stay within the defined rate limit
    # for i in range(20):
    #     EtherscanAPI.get_gas_price()
    #     print(f'[t+{time.time()-start:.2f}] Sent request {i+1}')

    from concurrent.futures import ThreadPoolExecutor

    with ThreadPoolExecutor(max_workers=20) as executor:
        futures = [executor.submit(EtherscanAPI.get_gas_price) for _ in range(20)]
    print([future.result() for future in futures])

Output

/tests/test_reque
sts-ratelimiter.py 
2024-02-04 18:23:27.242 | ERROR    | __main__:get:80 - request etherscan api error, response data's status code is error: /api, Max rate limit reached, 0, NOTOK
2024-02-04 18:23:27.257 | ERROR    | __main__:get:80 - request etherscan api error, response data's status code is error: /api, Max rate limit reached, 0, NOTOK
2024-02-04 18:23:27.272 | ERROR    | __main__:get:80 - request etherscan api error, response data's status code is error: /api, Max rate limit reached, 0, NOTOK
2024-02-04 18:23:27.298 | ERROR    | __main__:get:80 - request etherscan api error, response data's status code is error: /api, Max rate limit reached, 0, NOTOK
[{'status': '1', 'message': 'OK', 'result': {'LastBlock': '19154330', 'SafeGasPrice': '15', 'ProposeGasPrice': '15', 'FastGasPrice': '16', 'suggestBaseFee': '14.161850777', 'gasUsedRatio': '0.3678232,0.387201766666667,0.886431266666667,0.462735133333333,0.2426072'}},
vutran1710 commented 7 months ago

Looks like you are using pyrate-limiter v2. Can you try switching to v3? It would help me to debug the problem faster.

vutran1710 commented 7 months ago

As in v3, Limiter is no longer constructed using the arguments like this anymore

limiter = Limiter(
        second_rate,
        bucket_class=RedisBucket,
        bucket_kwargs={'redis_pool': redis_pool, 'bucket_name': 'etherscan_api'},
    )
grayguest commented 7 months ago

I have changed to v3.2.0, and The current performance is that except for the error encountered during the first run, there are no further problems later, and we are observing, thank you for your effort, sir. In addition, I would like to check whether the usage in my code is correct.

cls.limiter.try_acquire('etherscan_api')

the try_acquire function's argument string is random assign?

import http
import json
import re
import time
import urllib3
from urllib.parse import urlencode, quote

import redis
import requests
from bs4 import BeautifulSoup
from loguru import logger
from pyrate_limiter import BucketFullException, Duration, Limiter, Rate, RedisBucket, SQLiteBucket

# from requests_ratelimiter import LimiterSession

from config import settings

class EtherscanAPI(object):
    """_summary_
    https://docs.etherscan.io/

    My API Plan: FREE API PLAN
    API calls per second: 5 calls

    Args:
        object (_type_): _description_

    Returns:
        _type_: _description_
    """

    proto = 'https://'
    domain = 'api.etherscan.io'
    second_rate = [Rate(5, Duration.SECOND)]  # 5 requests per second
    redis_pool = redis.ConnectionPool(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        password=settings.REDIS_PASSWORD,
        db=settings.REDIS_DB,
        # max_connections=20,
        # decode_responses=True,
    )
    redis_db = redis.Redis(connection_pool=redis_pool)
    bucket_key = "etherscan_api"
    bucket = RedisBucket.init(second_rate, redis_db, bucket_key)
    # limiter = Limiter(
    #     second_rate,
    #     bucket_class=RedisBucket,
    #     bucket_kwargs={'redis_pool': redis_pool, 'bucket_name': 'clashshunter_etherscan_api'},
    # )
    limiter = Limiter(bucket, raise_when_fail=False, max_delay=10000)

    @classmethod
    def get(cls, path='/api', **params):
        """

        :param path: 
        :param params:
        :return:
        """
        # try:
        #     cls.limiter.try_acquire('etherscan_api')
        # except BucketFullException as err:
        #     print(err)
        #     print(err.meta_info)
        cls.limiter.try_acquire('etherscan_api')
        url = cls.proto + cls.domain + path
        cls.session = requests.Session()

        cls.session.headers.update(
            {
                "User-Agent": settings.UA,
                # 'Accept-encoding': 'gzip',
                "Connection": "keep-alive",
            }
        )
        cls.session.proxies = settings.PROXIES
        r = cls.session.get(url, params=params, timeout=30)
        if r.status_code == http.HTTPStatus.OK:
            try:
                rsp = r.json()
            except json.decoder.JSONDecodeError as jde:
                return r
            if rsp["status"] == '1' and rsp["message"] == "OK":
                return rsp
            else:
                logger.error(
                    "request etherscan api error, response data's status code is error: %s, %s, %s, %s"
                    % (path, rsp["result"], rsp["status"], rsp["message"])
                )
                return False
        else:
            logger.error(
                "request etherscan api error, request url: %s, param: %s, response status_code: %s"
                % (url, params, r.status_code)
            )
            return False

    @classmethod
    def get_gas_price(cls):
        payload = {"module": "gastracker", "action": "gasoracle", "apikey": settings.ETHERSCAN_API_KEY}
        return cls.get('/api', **payload)

if __name__ == '__main__':
    start = time.time()

    # Send requests that stay within the defined rate limit
    # for i in range(20):
    #     EtherscanAPI.get_gas_price()
    #     print(f'[t+{time.time()-start:.2f}] Sent request {i+1}')

    from concurrent.futures import ThreadPoolExecutor

    with ThreadPoolExecutor(max_workers=20) as executor:
        futures = [executor.submit(EtherscanAPI.get_gas_price) for _ in range(20)]
    print([future.result() for future in futures])
vutran1710 commented 7 months ago

Glad its working now! The code is correct.