From 73fca81641b1db6315fda89ff087485e47aece4f Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Wed, 4 Sep 2024 20:00:53 +0800 Subject: [PATCH] feat(rate-limiter): add rate limiter --- app/schemas/exception.py | 19 +++++++- app/utils/common.py | 98 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 114 insertions(+), 3 deletions(-) diff --git a/app/schemas/exception.py b/app/schemas/exception.py index 63ef784e..655104b7 100644 --- a/app/schemas/exception.py +++ b/app/schemas/exception.py @@ -6,9 +6,26 @@ class ImmediateException(Exception): pass -class APIRateLimitException(ImmediateException): +class LimitException(ImmediateException): + """ + 用于表示本地限流器或外部触发的限流异常的基类。 + 该异常类可用于本地限流逻辑或外部限流处理。 + """ + pass + + +class APIRateLimitException(LimitException): """ 用于表示API速率限制的异常类。 当API调用触发速率限制时,可以抛出此异常以立即终止操作并报告错误。 """ pass + + +class RateLimitExceededException(LimitException): + """ + 用于表示本地限流器触发的异常类。 + 当函数调用频率超过限流器的限制时,可以抛出此异常以停止当前操作并告知调用者限流情况。 + 这个异常通常用于本地限流逻辑(例如 RateLimiter),当系统检测到函数调用频率过高时,触发限流并抛出该异常。 + """ + pass diff --git a/app/utils/common.py b/app/utils/common.py index 4ba1aa7e..5fb82e5c 100644 --- a/app/utils/common.py +++ b/app/utils/common.py @@ -1,7 +1,10 @@ +import functools +import threading import time -from typing import Any +from typing import Any, Callable, Optional -from app.schemas import ImmediateException +from app.log import logger +from app.schemas import ImmediateException, APIRateLimitException, RateLimitExceededException def retry(ExceptionToCheck: Any, @@ -36,3 +39,94 @@ def retry(ExceptionToCheck: Any, return f_retry return deco_retry + + +class RateLimiter: + """ + 限流器类,用于处理调用的限流逻辑 + 通过增加等待时间逐步减少调用的频率,以避免触发限流 + """ + + def __init__(self, base_wait: int = 60, max_wait: int = 600, backoff_factor: int = 1): + """ + 初始化 RateLimiter 实例 + :param base_wait: 基础等待时间(秒),默认值为 60 秒(1 分钟) + :param max_wait: 最大等待时间(秒),默认值为 600 秒(10 分钟) + :param backoff_factor: 等待时间的递增倍数,默认值为 1 + """ + self.next_allowed_time = 0 + self.current_wait = base_wait + self.base_wait = base_wait + self.max_wait = max_wait + self.backoff_factor = backoff_factor + self.lock = threading.Lock() + + def can_call(self) -> bool: + """ + 检查是否可以进行下一次调用 + :return: 如果当前时间超过下一次允许调用的时间,返回 True;否则返回 False + """ + current_time = time.time() + with self.lock: + if current_time >= self.next_allowed_time: + return True + logger.warn(f"限流期间,跳过调用:将在 {self.next_allowed_time - current_time:.2f} 秒后允许继续调用") + return False + + def reset(self): + """ + 重置等待时间 + 当调用成功时调用此方法,重置当前等待时间为基础等待时间 + """ + with self.lock: + if self.next_allowed_time != 0 or self.current_wait > self.base_wait: + logger.info(f"调用成功,重置限流等待时长,并允许立即调用") + self.next_allowed_time = 0 + self.current_wait = self.base_wait + + def trigger_limit(self): + """ + 触发限流 + 当触发限流异常时调用此方法,增加下一次允许调用的时间并更新当前等待时间 + """ + current_time = time.time() + with self.lock: + self.next_allowed_time = current_time + self.current_wait + logger.warn(f"触发限流:将在 {self.current_wait} 秒后允许继续调用") + self.current_wait = min(self.current_wait * self.backoff_factor, self.max_wait) + + +def rate_limit_handler(base_wait: int = 60, max_wait: int = 600, backoff_factor: int = 1, + raise_on_limit: bool = True) -> Callable: + """ + 装饰器,用于处理限流逻辑 + :param base_wait: 基础等待时间(秒),默认值为 60 秒(1 分钟) + :param max_wait: 最大等待时间(秒),默认值为 600 秒(10 分钟) + :param backoff_factor: 等待时间的递增倍数,默认值为 1 + :param raise_on_limit: 是否在触发限流异常时抛出异常,默认为 True + :return: 装饰器函数 + """ + rate_limiter = RateLimiter(base_wait, max_wait, backoff_factor) + + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + def wrapper(*args, **kwargs) -> Optional[Any]: + if not rate_limiter.can_call(): + if raise_on_limit: + raise RateLimitExceededException("调用因限流被跳过") + return None + + try: + result = func(*args, **kwargs) + rate_limiter.reset() # 调用成功,重置等待时间 + return result + except APIRateLimitException as e: + rate_limiter.trigger_limit() + logger.error(f"触发限流:{str(e)}") + if raise_on_limit: + raise e + return None + + return wrapper + + return decorator