From a1829fe59071da78e84f31e0c0ac62123097bdc9 Mon Sep 17 00:00:00 2001 From: DDSRem <1448139087@qq.com> Date: Wed, 4 Feb 2026 23:24:14 +0800 Subject: [PATCH] feat: u115 global rate limiting strategy --- app/modules/filemanager/storages/u115.py | 110 +++++++++++++------- app/utils/limit.py | 123 ++++++++++++++++++++--- 2 files changed, 181 insertions(+), 52 deletions(-) diff --git a/app/modules/filemanager/storages/u115.py b/app/modules/filemanager/storages/u115.py index 47a8799e..cff7b2a8 100644 --- a/app/modules/filemanager/storages/u115.py +++ b/app/modules/filemanager/storages/u115.py @@ -3,7 +3,7 @@ import secrets import time from pathlib import Path from threading import Lock -from typing import List, Optional, Tuple, Union, Dict +from typing import List, Optional, Tuple, Union from hashlib import sha256 import oss2 @@ -20,7 +20,7 @@ from app.modules.filemanager.storages import transfer_process from app.schemas.types import StorageSchema from app.utils.singleton import WeakSingleton from app.utils.string import StringUtils -from app.utils.limit import QpsRateLimiter +from app.utils.limit import QpsRateLimiter, RateStats lock = Lock() @@ -46,22 +46,23 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): # 文件块大小,默认10MB chunk_size = 10 * 1024 * 1024 - # 流控重试间隔时间 - retry_delay = 70 + # 下载接口单独限流 + download_endpoint = "/open/ufile/downurl" + # 风控触发后休眠时间(秒) + limit_sleep_seconds = 3600 def __init__(self): super().__init__() self._auth_state = {} self.session = httpx.Client(follow_redirects=True, timeout=20.0) self._init_session() - self.qps_limiter: Dict[str, QpsRateLimiter] = { - "/open/ufile/files": QpsRateLimiter(4), - "/open/folder/get_info": QpsRateLimiter(3), - "/open/ufile/move": QpsRateLimiter(2), - "/open/ufile/copy": QpsRateLimiter(2), - "/open/ufile/update": QpsRateLimiter(2), - "/open/ufile/delete": QpsRateLimiter(2), - } + # 接口限流 + self._download_limiter = QpsRateLimiter(1) + self._api_limiter = QpsRateLimiter(3) + self._limit_until = 0.0 + self._limit_lock = Lock() + # 总体 QPS/QPM/QPH 统计 + self._rate_stats = RateStats(source="115") def _init_session(self): """ @@ -209,8 +210,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): try: resp = self.session.get( - f"{settings.U115_AUTH_SERVER}/u115/token", - params={"state": state} + f"{settings.U115_AUTH_SERVER}/u115/token", params={"state": state} ) if resp is None: return {}, "无法连接到授权服务器" @@ -221,12 +221,14 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): if status == "completed": data = result.get("data", {}) if data: - self.set_config({ - "refresh_time": int(time.time()), - "access_token": data.get("access_token"), - "refresh_token": data.get("refresh_token"), - "expires_in": data.get("expires_in"), - }) + self.set_config( + { + "refresh_time": int(time.time()), + "access_token": data.get("access_token"), + "refresh_token": data.get("refresh_token"), + "expires_in": data.get("expires_in"), + } + ) self._auth_state = {} return {"status": 2, "tip": "授权成功"}, "" return {}, "授权服务器返回数据不完整" @@ -292,11 +294,24 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): # 错误日志标志 no_error_log = kwargs.pop("no_error_log", False) # 重试次数 - retry_times = kwargs.pop("retry_limit", 5) + retry_times = kwargs.pop("retry_limit", 3) - # qps 速率限制 - if endpoint in self.qps_limiter: - self.qps_limiter[endpoint].acquire() + # 按接口类型限流 + if endpoint == self.download_endpoint: + self._download_limiter.acquire() + else: + self._api_limiter.acquire() + self._rate_stats.record() + + # 风控冷却期间阻止所有接口调用,统一等待 + with self._limit_lock: + wait_until = self._limit_until + if wait_until > time.time(): + wait_secs = wait_until - time.time() + logger.info( + f"【115】风控冷却中,本请求等待 {wait_secs:.0f} 秒后再调用接口..." + ) + time.sleep(wait_secs) try: resp = self.session.request(method, f"{self.base_url}{endpoint}", **kwargs) @@ -310,13 +325,24 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): kwargs["retry_limit"] = retry_times - # 处理速率限制 if resp.status_code == 429: - reset_time = 5 + int(resp.headers.get("X-RateLimit-Reset", 60)) - logger.debug( - f"【115】{method} 请求 {endpoint} 限流,等待{reset_time}秒后重试" + self._rate_stats.log_stats("warning") + if retry_times <= 0: + logger.error( + f"【115】{method} 请求 {endpoint} 触发限流(429),重试次数用尽!" + ) + return None + with self._limit_lock: + self._limit_until = max( + self._limit_until, + time.time() + self.limit_sleep_seconds, + ) + logger.warning( + f"【115】触发限流(429),全体接口进入风控冷却 {self.limit_sleep_seconds} 秒,随后重试..." ) - time.sleep(reset_time) + time.sleep(self.limit_sleep_seconds) + kwargs["retry_limit"] = retry_times - 1 + kwargs["no_error_log"] = no_error_log return self._request_api(method, endpoint, result_key, **kwargs) # 处理请求错误 @@ -329,6 +355,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): ) return None kwargs["retry_limit"] = retry_times - 1 + kwargs["no_error_log"] = no_error_log sleep_duration = 2 ** (5 - retry_times + 1) logger.info( f"【115】{method} 请求 {endpoint} 错误 {e},等待 {sleep_duration} 秒后重试..." @@ -339,20 +366,27 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): # 返回数据 ret_data = resp.json() if ret_data.get("code") not in (0, 20004): - error_msg = ret_data.get("message") + error_msg = ret_data.get("message", "") if not no_error_log: logger.warn(f"【115】{method} 请求 {endpoint} 出错:{error_msg}") if "已达到当前访问上限" in error_msg: + self._rate_stats.log_stats("warning") if retry_times <= 0: logger.error( - f"【115】{method} 请求 {endpoint} 达到访问上限,重试次数用尽!" + f"【115】{method} 请求 {endpoint} 触发风控(访问上限),重试次数用尽!" ) return None - kwargs["retry_limit"] = retry_times - 1 - logger.info( - f"【115】{method} 请求 {endpoint} 达到访问上限,等待 {self.retry_delay} 秒后重试..." + with self._limit_lock: + self._limit_until = max( + self._limit_until, + time.time() + self.limit_sleep_seconds, + ) + logger.warning( + f"【115】触发风控(访问上限),全体接口进入风控冷却 {self.limit_sleep_seconds} 秒,随后重试..." ) - time.sleep(self.retry_delay) + time.sleep(self.limit_sleep_seconds) + kwargs["retry_limit"] = retry_times - 1 + kwargs["no_error_log"] = no_error_log return self._request_api(method, endpoint, result_key, **kwargs) return None @@ -879,7 +913,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool: """ - 企业级复制实现(支持目录递归复制) + 复制 """ if fileitem.fileid is None: fileitem = self.get_item(Path(fileitem.path)) @@ -912,7 +946,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): def move(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool: """ - 原子性移动操作实现 + 移动 """ if fileitem.fileid is None: fileitem = self.get_item(Path(fileitem.path)) @@ -950,7 +984,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): def usage(self) -> Optional[schemas.StorageUsage]: """ - 获取带有企业级配额信息的存储使用情况 + 存储使用情况 """ try: resp = self._request_api("GET", "/open/user/info", "data") diff --git a/app/utils/limit.py b/app/utils/limit.py index e9a90acd..a3e48d74 100644 --- a/app/utils/limit.py +++ b/app/utils/limit.py @@ -98,8 +98,14 @@ class ExponentialBackoffRateLimiter(BaseRateLimiter): 每次触发限流时,等待时间会成倍增加,直到达到最大等待时间 """ - def __init__(self, base_wait: float = 60.0, max_wait: float = 600.0, backoff_factor: float = 2.0, - source: str = "", enable_logging: bool = True): + def __init__( + self, + base_wait: float = 60.0, + max_wait: float = 600.0, + backoff_factor: float = 2.0, + source: str = "", + enable_logging: bool = True, + ): """ 初始化 ExponentialBackoffRateLimiter 实例 :param base_wait: 基础等待时间(秒),默认值为 60 秒(1 分钟) @@ -156,7 +162,9 @@ class ExponentialBackoffRateLimiter(BaseRateLimiter): current_time = time.time() with self.lock: self.next_allowed_time = current_time + self.current_wait - self.current_wait = min(self.current_wait * self.backoff_factor, self.max_wait) + self.current_wait = min( + self.current_wait * self.backoff_factor, self.max_wait + ) wait_time = self.next_allowed_time - current_time self.log_warning(f"触发限流,将在 {wait_time:.2f} 秒后允许继续调用") @@ -168,8 +176,13 @@ class WindowRateLimiter(BaseRateLimiter): 如果超过允许的最大调用次数,则限流直到窗口期结束 """ - def __init__(self, max_calls: int, window_seconds: float, - source: str = "", enable_logging: bool = True): + def __init__( + self, + max_calls: int, + window_seconds: float, + source: str = "", + enable_logging: bool = True, + ): """ 初始化 WindowRateLimiter 实例 :param max_calls: 在时间窗口内允许的最大调用次数 @@ -190,7 +203,10 @@ class WindowRateLimiter(BaseRateLimiter): current_time = time.time() with self.lock: # 清理超出时间窗口的调用记录 - while self.call_times and current_time - self.call_times[0] > self.window_seconds: + while ( + self.call_times + and current_time - self.call_times[0] > self.window_seconds + ): self.call_times.popleft() if len(self.call_times) < self.max_calls: @@ -225,8 +241,12 @@ class CompositeRateLimiter(BaseRateLimiter): 当任意一个限流策略触发限流时,都会阻止调用 """ - def __init__(self, limiters: List[BaseRateLimiter], source: str = "", enable_logging: bool = True): - + def __init__( + self, + limiters: List[BaseRateLimiter], + source: str = "", + enable_logging: bool = True, + ): """ 初始化 CompositeRateLimiter 实例 :param limiters: 要组合的限流器列表 @@ -263,7 +283,9 @@ class CompositeRateLimiter(BaseRateLimiter): # 通用装饰器:自定义限流器实例 -def rate_limit_handler(limiter: BaseRateLimiter, raise_on_limit: bool = False) -> Callable: +def rate_limit_handler( + limiter: BaseRateLimiter, raise_on_limit: bool = False +) -> Callable: """ 通用装饰器,允许用户传递自定义的限流器实例,用于处理限流逻辑 该装饰器可灵活支持任意继承自 BaseRateLimiter 的限流器 @@ -344,8 +366,14 @@ def rate_limit_handler(limiter: BaseRateLimiter, raise_on_limit: bool = False) - # 装饰器:指数退避限流 -def rate_limit_exponential(base_wait: float = 60.0, max_wait: float = 600.0, backoff_factor: float = 2.0, - raise_on_limit: bool = False, source: str = "", enable_logging: bool = True) -> Callable: +def rate_limit_exponential( + base_wait: float = 60.0, + max_wait: float = 600.0, + backoff_factor: float = 2.0, + raise_on_limit: bool = False, + source: str = "", + enable_logging: bool = True, +) -> Callable: """ 装饰器,用于应用指数退避限流策略 通过逐渐增加调用等待时间控制调用频率。每次触发限流时,等待时间会成倍增加,直到达到最大等待时间 @@ -359,14 +387,21 @@ def rate_limit_exponential(base_wait: float = 60.0, max_wait: float = 600.0, bac :return: 装饰器函数 """ # 实例化 ExponentialBackoffRateLimiter,并传入相关参数 - limiter = ExponentialBackoffRateLimiter(base_wait, max_wait, backoff_factor, source, enable_logging) + limiter = ExponentialBackoffRateLimiter( + base_wait, max_wait, backoff_factor, source, enable_logging + ) # 使用通用装饰器逻辑包装该限流器 return rate_limit_handler(limiter, raise_on_limit) # 装饰器:时间窗口限流 -def rate_limit_window(max_calls: int, window_seconds: float, - raise_on_limit: bool = False, source: str = "", enable_logging: bool = True) -> Callable: +def rate_limit_window( + max_calls: int, + window_seconds: float, + raise_on_limit: bool = False, + source: str = "", + enable_logging: bool = True, +) -> Callable: """ 装饰器,用于应用时间窗口限流策略 在固定的时间窗口内限制调用次数,当调用次数超过最大值时,触发限流,直到时间窗口结束 @@ -407,3 +442,63 @@ class QpsRateLimiter: self.next_call_time = max(now, self.next_call_time) + self.interval if sleep_duration > 0: time.sleep(sleep_duration) + + +class RateStats: + """ + 请求速率统计:记录时间戳,计算 QPS / QPM / QPH + """ + + def __init__(self, window_seconds: float = 7200, source: str = ""): + """ + :param window_seconds: 统计窗口(秒),默认 2 小时,用于计算 QPH + :param source: 日志来源标识 + """ + self._window = window_seconds + self._source = source + self._lock = threading.Lock() + self._timestamps: deque = deque() + + def record(self) -> None: + """ + 记录一次请求 + """ + t = time.time() + with self._lock: + self._timestamps.append(t) + while self._timestamps and t - self._timestamps[0] > self._window: + self._timestamps.popleft() + + def _count_since(self, seconds: float) -> int: + t = time.time() + with self._lock: + return sum(1 for ts in self._timestamps if t - ts <= seconds) + + def get_qps(self) -> float: + """ + 最近 1 秒内请求数 + """ + return self._count_since(1.0) + + def get_qpm(self) -> float: + """ + 最近 1 分钟内请求数 + """ + return self._count_since(60.0) + + def get_qph(self) -> float: + """ + 最近 1 小时内请求数 + """ + return self._count_since(3600.0) + + def log_stats(self, level: str = "info") -> None: + """ + 输出当前 QPS/QPM/QPH + """ + qps, qpm, qph = self.get_qps(), self.get_qpm(), self.get_qph() + msg = f"QPS={qps} QPM={qpm} QPH={qph}" + if self._source: + msg = f"[{self._source}] {msg}" + log_fn = getattr(logger, level, logger.info) + log_fn(msg)