From 7a7225ba4568e99c1ceaa27daf6f6cd214c3977f Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Thu, 5 Sep 2024 00:46:36 +0800 Subject: [PATCH] fix(rate-limiter): optimize log --- app/utils/common.py | 47 ++++++++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/app/utils/common.py b/app/utils/common.py index f115109c..6219dc8e 100644 --- a/app/utils/common.py +++ b/app/utils/common.py @@ -1,10 +1,10 @@ import functools import threading import time -from typing import Any, Callable, Optional +from typing import Any, Callable, Optional, Tuple from app.log import logger -from app.schemas import ImmediateException, APIRateLimitException, RateLimitExceededException +from app.schemas import ImmediateException, RateLimitExceededException, LimitException def retry(ExceptionToCheck: Any, @@ -63,7 +63,7 @@ class RateLimiter: self.source = source self.lock = threading.Lock() - def can_call(self) -> bool: + def can_call(self) -> Tuple[bool, str]: """ 检查是否可以进行下一次调用 :return: 如果当前时间超过下一次允许调用的时间,返回 True;否则返回 False @@ -71,10 +71,11 @@ class RateLimiter: current_time = time.time() with self.lock: if current_time >= self.next_allowed_time: - return True + return True, "" wait_time = self.next_allowed_time - current_time - logger.warn(f"{self.source}限流期间,跳过调用,将在 {wait_time:.2f} 秒后允许继续调用") - return False + message = self.format_log(f"限流期间,跳过调用,将在 {wait_time:.2f} 秒后允许继续调用") + logger.info(message) + return False, message def reset(self): """ @@ -83,7 +84,7 @@ class RateLimiter: """ with self.lock: if self.next_allowed_time != 0 or self.current_wait > self.base_wait: - logger.info(f"{self.source}调用成功,重置限流等待时间为{self.base_wait}秒") + logger.info(self.format_log(f"调用成功,重置限流等待时间为{self.base_wait}秒")) self.next_allowed_time = 0 self.current_wait = self.base_wait @@ -95,45 +96,51 @@ class RateLimiter: current_time = time.time() with self.lock: self.next_allowed_time = current_time + self.current_wait - logger.warn(f"{self.source}触发限流,将在 {self.current_wait} 秒后允许继续调用") + logger.warn(self.format_log(f"触发限流,将在 {self.current_wait} 秒后允许继续调用")) self.current_wait = min(self.current_wait * self.backoff_factor, self.max_wait) + def format_log(self, message: str) -> str: + """ + 格式化日志消息 + :param message: 日志内容 + :return: 格式化后的日志消息 + """ + return f"[{self.source}] {message}" if self.source else message -def rate_limit_handler(base_wait: int = 60, max_wait: int = 600, backoff_factor: int = 1, + +def rate_limit_handler(base_wait: int = 60, max_wait: int = 600, backoff_factor: float = 2.0, raise_on_limit: bool = True, source: str = "") -> Callable: """ - 装饰器,用于处理限流逻辑 + 装饰器,用于处理限流逻辑,支持动态控制是否在限流时抛出异常 :param base_wait: 基础等待时间(秒),默认值为 60 秒(1 分钟) :param max_wait: 最大等待时间(秒),默认值为 600 秒(10 分钟) - :param backoff_factor: 等待时间的递增倍数,默认值为 1 + :param backoff_factor: 等待时间的递增倍数,默认值为 2.0 :param raise_on_limit: 控制默认情况下是否在限流时抛出异常,默认为 True(限流时抛出异常)。 如果在函数调用时传入 `raise_exception` 参数,则以传入值为准。 :param source: 业务来源或上下文信息,默认为 "" :return: 装饰器函数 """ - rate_limiter = RateLimiter(base_wait, max_wait, backoff_factor) + rate_limiter = RateLimiter(base_wait, max_wait, backoff_factor, source) def decorator(func: Callable) -> Callable: @functools.wraps(func) def wrapper(*args, **kwargs) -> Optional[Any]: # 动态检查是否传入了 raise_exception,否则使用默认的 raise_on_limit raise_exception = kwargs.get("raise_exception", raise_on_limit) - message = f"{source}调用因限流被跳过,当前等待时间为 {rate_limiter.current_wait} 秒" - if not rate_limiter.can_call(): - if raise_on_limit: + can_call, message = rate_limiter.can_call() + if not can_call: + if raise_exception: raise RateLimitExceededException(message) - logger.warn(message) return None try: result = func(*args, **kwargs) rate_limiter.reset() return result - except APIRateLimitException as e: + except LimitException as e: rate_limiter.trigger_limit() - message = f"{source}外部调用触发限流:{str(e)},本地等待时间增加到 {rate_limiter.current_wait} 秒" - logger.error(message) - if raise_on_limit: + logger.error(rate_limiter.format_log(f"触发限流:{str(e)}")) + if raise_exception: raise e return None