feat: u115 global rate limiting strategy

This commit is contained in:
DDSRem
2026-02-04 23:24:14 +08:00
parent 2b2b39365c
commit a1829fe590
2 changed files with 181 additions and 52 deletions

View File

@@ -3,7 +3,7 @@ import secrets
import time
from pathlib import Path
from threading import Lock
from typing import List, Optional, Tuple, Union, Dict
from typing import List, Optional, Tuple, Union
from hashlib import sha256
import oss2
@@ -20,7 +20,7 @@ from app.modules.filemanager.storages import transfer_process
from app.schemas.types import StorageSchema
from app.utils.singleton import WeakSingleton
from app.utils.string import StringUtils
from app.utils.limit import QpsRateLimiter
from app.utils.limit import QpsRateLimiter, RateStats
lock = Lock()
@@ -46,22 +46,23 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 文件块大小默认10MB
chunk_size = 10 * 1024 * 1024
# 流控重试间隔时间
retry_delay = 70
# 下载接口单独限流
download_endpoint = "/open/ufile/downurl"
# 风控触发后休眠时间(秒)
limit_sleep_seconds = 3600
def __init__(self):
super().__init__()
self._auth_state = {}
self.session = httpx.Client(follow_redirects=True, timeout=20.0)
self._init_session()
self.qps_limiter: Dict[str, QpsRateLimiter] = {
"/open/ufile/files": QpsRateLimiter(4),
"/open/folder/get_info": QpsRateLimiter(3),
"/open/ufile/move": QpsRateLimiter(2),
"/open/ufile/copy": QpsRateLimiter(2),
"/open/ufile/update": QpsRateLimiter(2),
"/open/ufile/delete": QpsRateLimiter(2),
}
# 接口限流
self._download_limiter = QpsRateLimiter(1)
self._api_limiter = QpsRateLimiter(3)
self._limit_until = 0.0
self._limit_lock = Lock()
# 总体 QPS/QPM/QPH 统计
self._rate_stats = RateStats(source="115")
def _init_session(self):
"""
@@ -209,8 +210,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
try:
resp = self.session.get(
f"{settings.U115_AUTH_SERVER}/u115/token",
params={"state": state}
f"{settings.U115_AUTH_SERVER}/u115/token", params={"state": state}
)
if resp is None:
return {}, "无法连接到授权服务器"
@@ -221,12 +221,14 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
if status == "completed":
data = result.get("data", {})
if data:
self.set_config({
"refresh_time": int(time.time()),
"access_token": data.get("access_token"),
"refresh_token": data.get("refresh_token"),
"expires_in": data.get("expires_in"),
})
self.set_config(
{
"refresh_time": int(time.time()),
"access_token": data.get("access_token"),
"refresh_token": data.get("refresh_token"),
"expires_in": data.get("expires_in"),
}
)
self._auth_state = {}
return {"status": 2, "tip": "授权成功"}, ""
return {}, "授权服务器返回数据不完整"
@@ -292,11 +294,24 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 错误日志标志
no_error_log = kwargs.pop("no_error_log", False)
# 重试次数
retry_times = kwargs.pop("retry_limit", 5)
retry_times = kwargs.pop("retry_limit", 3)
# qps 速率限制
if endpoint in self.qps_limiter:
self.qps_limiter[endpoint].acquire()
# 按接口类型限流
if endpoint == self.download_endpoint:
self._download_limiter.acquire()
else:
self._api_limiter.acquire()
self._rate_stats.record()
# 风控冷却期间阻止所有接口调用,统一等待
with self._limit_lock:
wait_until = self._limit_until
if wait_until > time.time():
wait_secs = wait_until - time.time()
logger.info(
f"【115】风控冷却中本请求等待 {wait_secs:.0f} 秒后再调用接口..."
)
time.sleep(wait_secs)
try:
resp = self.session.request(method, f"{self.base_url}{endpoint}", **kwargs)
@@ -310,13 +325,24 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
kwargs["retry_limit"] = retry_times
# 处理速率限制
if resp.status_code == 429:
reset_time = 5 + int(resp.headers.get("X-RateLimit-Reset", 60))
logger.debug(
f"【115】{method} 请求 {endpoint} 限流,等待{reset_time}秒后重试"
self._rate_stats.log_stats("warning")
if retry_times <= 0:
logger.error(
f"【115】{method} 请求 {endpoint} 触发限流(429),重试次数用尽!"
)
return None
with self._limit_lock:
self._limit_until = max(
self._limit_until,
time.time() + self.limit_sleep_seconds,
)
logger.warning(
f"【115】触发限流(429),全体接口进入风控冷却 {self.limit_sleep_seconds} 秒,随后重试..."
)
time.sleep(reset_time)
time.sleep(self.limit_sleep_seconds)
kwargs["retry_limit"] = retry_times - 1
kwargs["no_error_log"] = no_error_log
return self._request_api(method, endpoint, result_key, **kwargs)
# 处理请求错误
@@ -329,6 +355,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
)
return None
kwargs["retry_limit"] = retry_times - 1
kwargs["no_error_log"] = no_error_log
sleep_duration = 2 ** (5 - retry_times + 1)
logger.info(
f"【115】{method} 请求 {endpoint} 错误 {e},等待 {sleep_duration} 秒后重试..."
@@ -339,20 +366,27 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
# 返回数据
ret_data = resp.json()
if ret_data.get("code") not in (0, 20004):
error_msg = ret_data.get("message")
error_msg = ret_data.get("message", "")
if not no_error_log:
logger.warn(f"【115】{method} 请求 {endpoint} 出错:{error_msg}")
if "已达到当前访问上限" in error_msg:
self._rate_stats.log_stats("warning")
if retry_times <= 0:
logger.error(
f"【115】{method} 请求 {endpoint} 达到访问上限,重试次数用尽!"
f"【115】{method} 请求 {endpoint} 触发风控(访问上限),重试次数用尽!"
)
return None
kwargs["retry_limit"] = retry_times - 1
logger.info(
f"【115】{method} 请求 {endpoint} 达到访问上限,等待 {self.retry_delay} 秒后重试..."
with self._limit_lock:
self._limit_until = max(
self._limit_until,
time.time() + self.limit_sleep_seconds,
)
logger.warning(
f"【115】触发风控(访问上限),全体接口进入风控冷却 {self.limit_sleep_seconds} 秒,随后重试..."
)
time.sleep(self.retry_delay)
time.sleep(self.limit_sleep_seconds)
kwargs["retry_limit"] = retry_times - 1
kwargs["no_error_log"] = no_error_log
return self._request_api(method, endpoint, result_key, **kwargs)
return None
@@ -879,7 +913,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
企业级复制实现(支持目录递归复制)
复制
"""
if fileitem.fileid is None:
fileitem = self.get_item(Path(fileitem.path))
@@ -912,7 +946,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
def move(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
原子性移动操作实现
移动
"""
if fileitem.fileid is None:
fileitem = self.get_item(Path(fileitem.path))
@@ -950,7 +984,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
def usage(self) -> Optional[schemas.StorageUsage]:
"""
获取带有企业级配额信息的存储使用情况
存储使用情况
"""
try:
resp = self._request_api("GET", "/open/user/info", "data")

View File

@@ -98,8 +98,14 @@ class ExponentialBackoffRateLimiter(BaseRateLimiter):
每次触发限流时,等待时间会成倍增加,直到达到最大等待时间
"""
def __init__(self, base_wait: float = 60.0, max_wait: float = 600.0, backoff_factor: float = 2.0,
source: str = "", enable_logging: bool = True):
def __init__(
self,
base_wait: float = 60.0,
max_wait: float = 600.0,
backoff_factor: float = 2.0,
source: str = "",
enable_logging: bool = True,
):
"""
初始化 ExponentialBackoffRateLimiter 实例
:param base_wait: 基础等待时间(秒),默认值为 60 秒1 分钟)
@@ -156,7 +162,9 @@ class ExponentialBackoffRateLimiter(BaseRateLimiter):
current_time = time.time()
with self.lock:
self.next_allowed_time = current_time + self.current_wait
self.current_wait = min(self.current_wait * self.backoff_factor, self.max_wait)
self.current_wait = min(
self.current_wait * self.backoff_factor, self.max_wait
)
wait_time = self.next_allowed_time - current_time
self.log_warning(f"触发限流,将在 {wait_time:.2f} 秒后允许继续调用")
@@ -168,8 +176,13 @@ class WindowRateLimiter(BaseRateLimiter):
如果超过允许的最大调用次数,则限流直到窗口期结束
"""
def __init__(self, max_calls: int, window_seconds: float,
source: str = "", enable_logging: bool = True):
def __init__(
self,
max_calls: int,
window_seconds: float,
source: str = "",
enable_logging: bool = True,
):
"""
初始化 WindowRateLimiter 实例
:param max_calls: 在时间窗口内允许的最大调用次数
@@ -190,7 +203,10 @@ class WindowRateLimiter(BaseRateLimiter):
current_time = time.time()
with self.lock:
# 清理超出时间窗口的调用记录
while self.call_times and current_time - self.call_times[0] > self.window_seconds:
while (
self.call_times
and current_time - self.call_times[0] > self.window_seconds
):
self.call_times.popleft()
if len(self.call_times) < self.max_calls:
@@ -225,8 +241,12 @@ class CompositeRateLimiter(BaseRateLimiter):
当任意一个限流策略触发限流时,都会阻止调用
"""
def __init__(self, limiters: List[BaseRateLimiter], source: str = "", enable_logging: bool = True):
def __init__(
self,
limiters: List[BaseRateLimiter],
source: str = "",
enable_logging: bool = True,
):
"""
初始化 CompositeRateLimiter 实例
:param limiters: 要组合的限流器列表
@@ -263,7 +283,9 @@ class CompositeRateLimiter(BaseRateLimiter):
# 通用装饰器:自定义限流器实例
def rate_limit_handler(limiter: BaseRateLimiter, raise_on_limit: bool = False) -> Callable:
def rate_limit_handler(
limiter: BaseRateLimiter, raise_on_limit: bool = False
) -> Callable:
"""
通用装饰器,允许用户传递自定义的限流器实例,用于处理限流逻辑
该装饰器可灵活支持任意继承自 BaseRateLimiter 的限流器
@@ -344,8 +366,14 @@ def rate_limit_handler(limiter: BaseRateLimiter, raise_on_limit: bool = False) -
# 装饰器:指数退避限流
def rate_limit_exponential(base_wait: float = 60.0, max_wait: float = 600.0, backoff_factor: float = 2.0,
raise_on_limit: bool = False, source: str = "", enable_logging: bool = True) -> Callable:
def rate_limit_exponential(
base_wait: float = 60.0,
max_wait: float = 600.0,
backoff_factor: float = 2.0,
raise_on_limit: bool = False,
source: str = "",
enable_logging: bool = True,
) -> Callable:
"""
装饰器,用于应用指数退避限流策略
通过逐渐增加调用等待时间控制调用频率。每次触发限流时,等待时间会成倍增加,直到达到最大等待时间
@@ -359,14 +387,21 @@ def rate_limit_exponential(base_wait: float = 60.0, max_wait: float = 600.0, bac
:return: 装饰器函数
"""
# 实例化 ExponentialBackoffRateLimiter并传入相关参数
limiter = ExponentialBackoffRateLimiter(base_wait, max_wait, backoff_factor, source, enable_logging)
limiter = ExponentialBackoffRateLimiter(
base_wait, max_wait, backoff_factor, source, enable_logging
)
# 使用通用装饰器逻辑包装该限流器
return rate_limit_handler(limiter, raise_on_limit)
# 装饰器:时间窗口限流
def rate_limit_window(max_calls: int, window_seconds: float,
raise_on_limit: bool = False, source: str = "", enable_logging: bool = True) -> Callable:
def rate_limit_window(
max_calls: int,
window_seconds: float,
raise_on_limit: bool = False,
source: str = "",
enable_logging: bool = True,
) -> Callable:
"""
装饰器,用于应用时间窗口限流策略
在固定的时间窗口内限制调用次数,当调用次数超过最大值时,触发限流,直到时间窗口结束
@@ -407,3 +442,63 @@ class QpsRateLimiter:
self.next_call_time = max(now, self.next_call_time) + self.interval
if sleep_duration > 0:
time.sleep(sleep_duration)
class RateStats:
"""
请求速率统计:记录时间戳,计算 QPS / QPM / QPH
"""
def __init__(self, window_seconds: float = 7200, source: str = ""):
"""
:param window_seconds: 统计窗口(秒),默认 2 小时,用于计算 QPH
:param source: 日志来源标识
"""
self._window = window_seconds
self._source = source
self._lock = threading.Lock()
self._timestamps: deque = deque()
def record(self) -> None:
"""
记录一次请求
"""
t = time.time()
with self._lock:
self._timestamps.append(t)
while self._timestamps and t - self._timestamps[0] > self._window:
self._timestamps.popleft()
def _count_since(self, seconds: float) -> int:
t = time.time()
with self._lock:
return sum(1 for ts in self._timestamps if t - ts <= seconds)
def get_qps(self) -> float:
"""
最近 1 秒内请求数
"""
return self._count_since(1.0)
def get_qpm(self) -> float:
"""
最近 1 分钟内请求数
"""
return self._count_since(60.0)
def get_qph(self) -> float:
"""
最近 1 小时内请求数
"""
return self._count_since(3600.0)
def log_stats(self, level: str = "info") -> None:
"""
输出当前 QPS/QPM/QPH
"""
qps, qpm, qph = self.get_qps(), self.get_qpm(), self.get_qph()
msg = f"QPS={qps} QPM={qpm} QPH={qph}"
if self._source:
msg = f"[{self._source}] {msg}"
log_fn = getattr(logger, level, logger.info)
log_fn(msg)