diff --git a/app/utils/common.py b/app/utils/common.py index 5fb82e5c..837e5147 100644 --- a/app/utils/common.py +++ b/app/utils/common.py @@ -47,18 +47,20 @@ class RateLimiter: 通过增加等待时间逐步减少调用的频率,以避免触发限流 """ - def __init__(self, base_wait: int = 60, max_wait: int = 600, backoff_factor: int = 1): + def __init__(self, base_wait: int = 60, max_wait: int = 600, backoff_factor: int = 1, source: str = ""): """ 初始化 RateLimiter 实例 :param base_wait: 基础等待时间(秒),默认值为 60 秒(1 分钟) :param max_wait: 最大等待时间(秒),默认值为 600 秒(10 分钟) :param backoff_factor: 等待时间的递增倍数,默认值为 1 + :param source: 业务来源或上下文信息,默认值为 "" """ self.next_allowed_time = 0 self.current_wait = base_wait self.base_wait = base_wait self.max_wait = max_wait self.backoff_factor = backoff_factor + self.source = f"[{source}]" if source else "" self.lock = threading.Lock() def can_call(self) -> bool: @@ -70,7 +72,8 @@ class RateLimiter: with self.lock: if current_time >= self.next_allowed_time: return True - logger.warn(f"限流期间,跳过调用:将在 {self.next_allowed_time - current_time:.2f} 秒后允许继续调用") + wait_time = self.next_allowed_time - current_time + logger.warn(f"{self.source}限流期间,跳过调用,将在 {wait_time:.2f} 秒后允许继续调用") return False def reset(self): @@ -80,7 +83,7 @@ class RateLimiter: """ with self.lock: if self.next_allowed_time != 0 or self.current_wait > self.base_wait: - logger.info(f"调用成功,重置限流等待时长,并允许立即调用") + logger.info(f"{self.source}调用成功,重置限流等待时间为{self.base_wait}秒") self.next_allowed_time = 0 self.current_wait = self.base_wait @@ -92,18 +95,19 @@ class RateLimiter: current_time = time.time() with self.lock: self.next_allowed_time = current_time + self.current_wait - logger.warn(f"触发限流:将在 {self.current_wait} 秒后允许继续调用") + logger.warn(f"{self.source}触发限流,将在 {self.current_wait} 秒后允许继续调用") self.current_wait = min(self.current_wait * self.backoff_factor, self.max_wait) def rate_limit_handler(base_wait: int = 60, max_wait: int = 600, backoff_factor: int = 1, - raise_on_limit: bool = True) -> Callable: + raise_on_limit: bool = True, source: str = "") -> Callable: """ 装饰器,用于处理限流逻辑 :param base_wait: 基础等待时间(秒),默认值为 60 秒(1 分钟) :param max_wait: 最大等待时间(秒),默认值为 600 秒(10 分钟) :param backoff_factor: 等待时间的递增倍数,默认值为 1 :param raise_on_limit: 是否在触发限流异常时抛出异常,默认为 True + :param source: 业务来源或上下文信息,默认为 "" :return: 装饰器函数 """ rate_limiter = RateLimiter(base_wait, max_wait, backoff_factor) @@ -112,17 +116,20 @@ def rate_limit_handler(base_wait: int = 60, max_wait: int = 600, backoff_factor: @functools.wraps(func) def wrapper(*args, **kwargs) -> Optional[Any]: if not rate_limiter.can_call(): + message = f"{source}调用因限流被跳过,当前等待时间为 {rate_limiter.current_wait} 秒" if raise_on_limit: - raise RateLimitExceededException("调用因限流被跳过") + raise RateLimitExceededException(message) + logger.warn(message) return None try: result = func(*args, **kwargs) - rate_limiter.reset() # 调用成功,重置等待时间 + rate_limiter.reset() return result except APIRateLimitException as e: rate_limiter.trigger_limit() - logger.error(f"触发限流:{str(e)}") + message = f"{source}外部调用触发限流:{str(e)},本地等待时间增加到 {rate_limiter.current_wait} 秒" + logger.error(message) if raise_on_limit: raise e return None