mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-04-05 11:47:50 +08:00
feat(rate-limiter): add source context for enhanced logging
This commit is contained in:
@@ -47,18 +47,20 @@ class RateLimiter:
|
||||
通过增加等待时间逐步减少调用的频率,以避免触发限流
|
||||
"""
|
||||
|
||||
def __init__(self, base_wait: int = 60, max_wait: int = 600, backoff_factor: int = 1):
|
||||
def __init__(self, base_wait: int = 60, max_wait: int = 600, backoff_factor: int = 1, source: str = ""):
|
||||
"""
|
||||
初始化 RateLimiter 实例
|
||||
:param base_wait: 基础等待时间(秒),默认值为 60 秒(1 分钟)
|
||||
:param max_wait: 最大等待时间(秒),默认值为 600 秒(10 分钟)
|
||||
:param backoff_factor: 等待时间的递增倍数,默认值为 1
|
||||
:param source: 业务来源或上下文信息,默认值为 ""
|
||||
"""
|
||||
self.next_allowed_time = 0
|
||||
self.current_wait = base_wait
|
||||
self.base_wait = base_wait
|
||||
self.max_wait = max_wait
|
||||
self.backoff_factor = backoff_factor
|
||||
self.source = f"[{source}]" if source else ""
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def can_call(self) -> bool:
|
||||
@@ -70,7 +72,8 @@ class RateLimiter:
|
||||
with self.lock:
|
||||
if current_time >= self.next_allowed_time:
|
||||
return True
|
||||
logger.warn(f"限流期间,跳过调用:将在 {self.next_allowed_time - current_time:.2f} 秒后允许继续调用")
|
||||
wait_time = self.next_allowed_time - current_time
|
||||
logger.warn(f"{self.source}限流期间,跳过调用,将在 {wait_time:.2f} 秒后允许继续调用")
|
||||
return False
|
||||
|
||||
def reset(self):
|
||||
@@ -80,7 +83,7 @@ class RateLimiter:
|
||||
"""
|
||||
with self.lock:
|
||||
if self.next_allowed_time != 0 or self.current_wait > self.base_wait:
|
||||
logger.info(f"调用成功,重置限流等待时长,并允许立即调用")
|
||||
logger.info(f"{self.source}调用成功,重置限流等待时间为{self.base_wait}秒")
|
||||
self.next_allowed_time = 0
|
||||
self.current_wait = self.base_wait
|
||||
|
||||
@@ -92,18 +95,19 @@ class RateLimiter:
|
||||
current_time = time.time()
|
||||
with self.lock:
|
||||
self.next_allowed_time = current_time + self.current_wait
|
||||
logger.warn(f"触发限流:将在 {self.current_wait} 秒后允许继续调用")
|
||||
logger.warn(f"{self.source}触发限流,将在 {self.current_wait} 秒后允许继续调用")
|
||||
self.current_wait = min(self.current_wait * self.backoff_factor, self.max_wait)
|
||||
|
||||
|
||||
def rate_limit_handler(base_wait: int = 60, max_wait: int = 600, backoff_factor: int = 1,
|
||||
raise_on_limit: bool = True) -> Callable:
|
||||
raise_on_limit: bool = True, source: str = "") -> Callable:
|
||||
"""
|
||||
装饰器,用于处理限流逻辑
|
||||
:param base_wait: 基础等待时间(秒),默认值为 60 秒(1 分钟)
|
||||
:param max_wait: 最大等待时间(秒),默认值为 600 秒(10 分钟)
|
||||
:param backoff_factor: 等待时间的递增倍数,默认值为 1
|
||||
:param raise_on_limit: 是否在触发限流异常时抛出异常,默认为 True
|
||||
:param source: 业务来源或上下文信息,默认为 ""
|
||||
:return: 装饰器函数
|
||||
"""
|
||||
rate_limiter = RateLimiter(base_wait, max_wait, backoff_factor)
|
||||
@@ -112,17 +116,20 @@ def rate_limit_handler(base_wait: int = 60, max_wait: int = 600, backoff_factor:
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs) -> Optional[Any]:
|
||||
if not rate_limiter.can_call():
|
||||
message = f"{source}调用因限流被跳过,当前等待时间为 {rate_limiter.current_wait} 秒"
|
||||
if raise_on_limit:
|
||||
raise RateLimitExceededException("调用因限流被跳过")
|
||||
raise RateLimitExceededException(message)
|
||||
logger.warn(message)
|
||||
return None
|
||||
|
||||
try:
|
||||
result = func(*args, **kwargs)
|
||||
rate_limiter.reset() # 调用成功,重置等待时间
|
||||
rate_limiter.reset()
|
||||
return result
|
||||
except APIRateLimitException as e:
|
||||
rate_limiter.trigger_limit()
|
||||
logger.error(f"触发限流:{str(e)}")
|
||||
message = f"{source}外部调用触发限流:{str(e)},本地等待时间增加到 {rate_limiter.current_wait} 秒"
|
||||
logger.error(message)
|
||||
if raise_on_limit:
|
||||
raise e
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user