diff --git a/app/core/plugin.py b/app/core/plugin.py index 243c4aaf..efefc7b7 100644 --- a/app/core/plugin.py +++ b/app/core/plugin.py @@ -3,8 +3,6 @@ import concurrent.futures import importlib.util import inspect import os -import threading -import time import traceback from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Type @@ -21,8 +19,9 @@ from app.helper.module import ModuleHelper from app.helper.plugin import PluginHelper from app.helper.sites import SitesHelper from app.log import logger -from app.utils.crypto import RSAUtils from app.schemas.types import SystemConfigKey +from app.utils.crypto import RSAUtils +from app.utils.limit import rate_limit_window from app.utils.object import ObjectUtils from app.utils.singleton import Singleton from app.utils.string import StringUtils @@ -30,14 +29,6 @@ from app.utils.system import SystemUtils class PluginMonitorHandler(FileSystemEventHandler): - # 计时器 - __reload_timer = None - # 防抖时间间隔 - __debounce_interval = 0.5 - # 最近一次修改时间 - __last_modified = 0 - # 修改间隔 - __timeout = 2 def on_modified(self, event): """ @@ -50,10 +41,6 @@ class PluginMonitorHandler(FileSystemEventHandler): if not event_path.name.endswith(".py") or "pycache" in event_path.parts: return - current_time = time.time() - if current_time - self.__last_modified < self.__timeout: - return - self.__last_modified = current_time # 读取插件根目录下的__init__.py文件,读取class XXXX(_PluginBase)的类名 try: plugins_root = settings.ROOT_PATH / "app" / "plugins" @@ -75,15 +62,12 @@ class PluginMonitorHandler(FileSystemEventHandler): if line.startswith("class") and "(_PluginBase)" in line: pid = line.split("class ")[1].split("(_PluginBase)")[0].strip() if pid: - # 防抖处理,通过计时器延迟加载 - if self.__reload_timer: - self.__reload_timer.cancel() - self.__reload_timer = threading.Timer(self.__debounce_interval, self.__reload_plugin, [pid]) - self.__reload_timer.start() + self.__reload_plugin(pid) except Exception as e: logger.error(f"插件文件修改后重载出错:{str(e)}") @staticmethod + @rate_limit_window(max_calls=1, window_seconds=2, source="PluginMonitor", enable_logging=False) def __reload_plugin(pid): """ 重新加载插件 diff --git a/app/modules/douban/__init__.py b/app/modules/douban/__init__.py index 801d4de6..41f9cd27 100644 --- a/app/modules/douban/__init__.py +++ b/app/modules/douban/__init__.py @@ -15,8 +15,9 @@ from app.modules.douban.douban_cache import DoubanCache from app.modules.douban.scraper import DoubanScraper from app.schemas import MediaPerson, APIRateLimitException from app.schemas.types import MediaType -from app.utils.common import retry, rate_limit_handler +from app.utils.common import retry from app.utils.http import RequestUtils +from app.utils.limit import rate_limit_exponential class DoubanModule(_ModuleBase): @@ -145,7 +146,7 @@ class DoubanModule(_ModuleBase): return None - @rate_limit_handler(backoff_factor=2, source="douban_info", raise_on_limit=False) + @rate_limit_exponential(source="douban_info") def douban_info(self, doubanid: str, mtype: MediaType = None, raise_exception: bool = True) -> Optional[dict]: """ 获取豆瓣信息 @@ -602,7 +603,7 @@ class DoubanModule(_ModuleBase): return [] @retry(Exception, 5, 3, 3, logger=logger) - @rate_limit_handler(source="match_doubaninfo", raise_on_limit=False) + @rate_limit_exponential(source="match_doubaninfo") def match_doubaninfo(self, name: str, imdbid: str = None, mtype: MediaType = None, year: str = None, season: int = None, raise_exception: bool = False) -> dict: diff --git a/app/utils/common.py b/app/utils/common.py index 6219dc8e..4ba1aa7e 100644 --- a/app/utils/common.py +++ b/app/utils/common.py @@ -1,10 +1,7 @@ -import functools -import threading import time -from typing import Any, Callable, Optional, Tuple +from typing import Any -from app.log import logger -from app.schemas import ImmediateException, RateLimitExceededException, LimitException +from app.schemas import ImmediateException def retry(ExceptionToCheck: Any, @@ -39,111 +36,3 @@ def retry(ExceptionToCheck: Any, return f_retry return deco_retry - - -class RateLimiter: - """ - 限流器类,用于处理调用的限流逻辑 - 通过增加等待时间逐步减少调用的频率,以避免触发限流 - """ - - def __init__(self, base_wait: int = 60, max_wait: int = 600, backoff_factor: float = 2.0, source: str = ""): - """ - 初始化 RateLimiter 实例 - :param base_wait: 基础等待时间(秒),默认值为 60 秒(1 分钟) - :param max_wait: 最大等待时间(秒),默认值为 600 秒(10 分钟) - :param backoff_factor: 等待时间的递增倍数,默认值为 2.0,表示指数退避 - :param source: 业务来源或上下文信息,默认值为 "" - """ - self.next_allowed_time = 0 - self.current_wait = base_wait - self.base_wait = base_wait - self.max_wait = max_wait - self.backoff_factor = backoff_factor - self.source = source - self.lock = threading.Lock() - - def can_call(self) -> Tuple[bool, str]: - """ - 检查是否可以进行下一次调用 - :return: 如果当前时间超过下一次允许调用的时间,返回 True;否则返回 False - """ - current_time = time.time() - with self.lock: - if current_time >= self.next_allowed_time: - return True, "" - wait_time = self.next_allowed_time - current_time - message = self.format_log(f"限流期间,跳过调用,将在 {wait_time:.2f} 秒后允许继续调用") - logger.info(message) - return False, message - - def reset(self): - """ - 重置等待时间 - 当调用成功时调用此方法,重置当前等待时间为基础等待时间 - """ - with self.lock: - if self.next_allowed_time != 0 or self.current_wait > self.base_wait: - logger.info(self.format_log(f"调用成功,重置限流等待时间为{self.base_wait}秒")) - self.next_allowed_time = 0 - self.current_wait = self.base_wait - - def trigger_limit(self): - """ - 触发限流 - 当触发限流异常时调用此方法,增加下一次允许调用的时间并更新当前等待时间 - """ - current_time = time.time() - with self.lock: - self.next_allowed_time = current_time + self.current_wait - logger.warn(self.format_log(f"触发限流,将在 {self.current_wait} 秒后允许继续调用")) - self.current_wait = min(self.current_wait * self.backoff_factor, self.max_wait) - - def format_log(self, message: str) -> str: - """ - 格式化日志消息 - :param message: 日志内容 - :return: 格式化后的日志消息 - """ - return f"[{self.source}] {message}" if self.source else message - - -def rate_limit_handler(base_wait: int = 60, max_wait: int = 600, backoff_factor: float = 2.0, - raise_on_limit: bool = True, source: str = "") -> Callable: - """ - 装饰器,用于处理限流逻辑,支持动态控制是否在限流时抛出异常 - :param base_wait: 基础等待时间(秒),默认值为 60 秒(1 分钟) - :param max_wait: 最大等待时间(秒),默认值为 600 秒(10 分钟) - :param backoff_factor: 等待时间的递增倍数,默认值为 2.0 - :param raise_on_limit: 控制默认情况下是否在限流时抛出异常,默认为 True(限流时抛出异常)。 - 如果在函数调用时传入 `raise_exception` 参数,则以传入值为准。 - :param source: 业务来源或上下文信息,默认为 "" - :return: 装饰器函数 - """ - rate_limiter = RateLimiter(base_wait, max_wait, backoff_factor, source) - - def decorator(func: Callable) -> Callable: - @functools.wraps(func) - def wrapper(*args, **kwargs) -> Optional[Any]: - # 动态检查是否传入了 raise_exception,否则使用默认的 raise_on_limit - raise_exception = kwargs.get("raise_exception", raise_on_limit) - can_call, message = rate_limiter.can_call() - if not can_call: - if raise_exception: - raise RateLimitExceededException(message) - return None - - try: - result = func(*args, **kwargs) - rate_limiter.reset() - return result - except LimitException as e: - rate_limiter.trigger_limit() - logger.error(rate_limiter.format_log(f"触发限流:{str(e)}")) - if raise_exception: - raise e - return None - - return wrapper - - return decorator diff --git a/app/utils/limit.py b/app/utils/limit.py new file mode 100644 index 00000000..7be22a81 --- /dev/null +++ b/app/utils/limit.py @@ -0,0 +1,348 @@ +import functools +import threading +import time +from collections import deque +from typing import Any, Tuple, List, Callable, Optional + +from app.log import logger +from app.schemas import RateLimitExceededException, LimitException + + +# 抽象基类 +class BaseRateLimiter: + """ + 限流器基类,定义了限流器的通用接口,用于子类实现不同的限流策略 + 所有限流器都必须实现 can_call、reset 方法 + """ + + def __init__(self, source: str = "", enable_logging: bool = True): + """ + 初始化 BaseRateLimiter 实例 + :param source: 业务来源或上下文信息,默认为空字符串 + :param enable_logging: 是否启用日志记录,默认为 True + """ + self.source = source + self.enable_logging = enable_logging + self.lock = threading.Lock() + + @property + def reset_on_success(self) -> bool: + """ + 是否在成功调用后自动重置限流器状态,默认为 False + """ + return False + + def can_call(self) -> Tuple[bool, str]: + """ + 检查是否可以进行调用 + :return: 如果允许调用,返回 True 和空消息,否则返回 False 和限流消息 + """ + raise NotImplementedError + + def reset(self): + """ + 重置限流状态 + """ + raise NotImplementedError + + def trigger_limit(self): + """ + 触发限流 + """ + pass + + def record_call(self): + """ + 记录一次调用 + """ + pass + + def format_log(self, message: str) -> str: + """ + 格式化日志消息 + :param message: 日志内容 + :return: 格式化后的日志消息 + """ + return f"[{self.source}] {message}" if self.source else message + + def log(self, level: str, message: str): + """ + 根据日志级别记录日志 + :param level: 日志级别 + :param message: 日志内容 + """ + if self.enable_logging: + log_method = getattr(logger, level, None) + if not callable(log_method): + log_method = logger.info + log_method(self.format_log(message)) + + def log_info(self, message: str): + """ + 记录信息日志 + """ + self.log("info", message) + + def log_warning(self, message: str): + """ + 记录警告日志 + """ + self.log("warning", message) + + +# 指数退避限流器 +class ExponentialBackoffRateLimiter(BaseRateLimiter): + """ + 基于指数退避的限流器,用于处理单次调用频率的控制 + 每次触发限流时,等待时间会成倍增加,直到达到最大等待时间 + """ + + def __init__(self, base_wait: int = 60, max_wait: int = 600, backoff_factor: float = 2.0, + source: str = "", enable_logging: bool = True): + """ + 初始化 ExponentialBackoffRateLimiter 实例 + :param base_wait: 基础等待时间(秒),默认值为 60 秒(1 分钟) + :param max_wait: 最大等待时间(秒),默认值为 600 秒(10 分钟) + :param backoff_factor: 等待时间的递增倍数,默认值为 2.0,表示指数退避 + :param source: 业务来源或上下文信息,默认值为 "" + :param enable_logging: 是否启用日志记录,默认为 True + """ + super().__init__(source, enable_logging) + self.next_allowed_time = 0 + self.current_wait = base_wait + self.base_wait = base_wait + self.max_wait = max_wait + self.backoff_factor = backoff_factor + self.source = source + + @property + def reset_on_success(self) -> bool: + """ + 指数退避限流器在调用成功后应重置等待时间 + """ + return True + + def can_call(self) -> Tuple[bool, str]: + """ + 检查是否可以进行调用,如果当前时间超过下一次允许调用的时间,则允许调用 + :return: 如果允许调用,返回 True 和空消息,否则返回 False 和限流消息 + """ + current_time = time.time() + with self.lock: + if current_time >= self.next_allowed_time: + return True, "" + wait_time = self.next_allowed_time - current_time + message = f"限流期间,跳过调用,将在 {wait_time:.2f} 秒后允许继续调用" + self.log_info(message) + return False, self.format_log(message) + + def reset(self): + """ + 重置等待时间 + 当调用成功时调用此方法,重置当前等待时间为基础等待时间 + """ + with self.lock: + if self.next_allowed_time != 0 or self.current_wait > self.base_wait: + self.log_info(f"调用成功,重置限流等待时间为 {self.base_wait} 秒") + self.next_allowed_time = 0 + self.current_wait = self.base_wait + + def trigger_limit(self): + """ + 触发限流 + 当触发限流异常时调用此方法,增加下一次允许调用的时间并更新当前等待时间 + """ + current_time = time.time() + with self.lock: + self.next_allowed_time = current_time + self.current_wait + self.log_warning(f"触发限流,将在 {self.current_wait} 秒后允许继续调用") + self.current_wait = min(self.current_wait * self.backoff_factor, self.max_wait) + + +# 时间窗口限流器 +class WindowRateLimiter(BaseRateLimiter): + """ + 基于时间窗口的限流器,用于限制在特定时间窗口内的调用次数 + 如果超过允许的最大调用次数,则限流直到窗口期结束 + """ + + def __init__(self, max_calls: int, window_seconds: int, + source: str = "", enable_logging: bool = True): + """ + 初始化 WindowRateLimiter 实例 + :param max_calls: 在时间窗口内允许的最大调用次数 + :param window_seconds: 时间窗口的持续时间(秒) + :param source: 业务来源或上下文信息,默认值为 "" + :param enable_logging: 是否启用日志记录,默认为 True + """ + super().__init__(source, enable_logging) + self.max_calls = max_calls + self.window_seconds = window_seconds + self.call_times = deque() + + def can_call(self) -> Tuple[bool, str]: + """ + 检查是否可以进行调用,如果在时间窗口内的调用次数少于最大允许次数,则允许调用。 + :return: 如果允许调用,返回 True 和空消息,否则返回 False 和限流消息 + """ + current_time = time.time() + with self.lock: + # 清理超出时间窗口的调用记录 + while self.call_times and current_time - self.call_times[0] > self.window_seconds: + self.call_times.popleft() + + if len(self.call_times) < self.max_calls: + return True, "" + else: + wait_time = self.window_seconds - (current_time - self.call_times[0]) + message = f"限流期间,跳过调用,将在 {wait_time:.2f} 秒后允许继续调用" + self.log_info(message) + return False, self.format_log(message) + + def reset(self): + """ + 重置时间窗口内的调用记录 + 当调用成功时调用此方法,清空时间窗口内的调用记录 + """ + with self.lock: + self.call_times.clear() + + def record_call(self): + """ + 记录当前时间戳,用于限流检查 + """ + current_time = time.time() + with self.lock: + self.call_times.append(current_time) + + +# 组合限流器 +class CompositeRateLimiter(BaseRateLimiter): + """ + 组合限流器,可以组合多个限流策略 + 当任意一个限流策略触发限流时,都会阻止调用 + """ + + def __init__(self, limiters: List[BaseRateLimiter], source: str = "", enable_logging: bool = True): + + """ + 初始化 CompositeRateLimiter 实例 + :param limiters: 要组合的限流器列表 + :param source: 业务来源或上下文信息,默认值为 "" + :param enable_logging: 是否启用日志记录,默认为 True + """ + super().__init__(source, enable_logging) + self.limiters = limiters + + def can_call(self) -> Tuple[bool, str]: + """ + 检查是否可以进行调用,当组合的任意限流器触发限流时,阻止调用。 + :return: 如果所有限流器都允许调用,返回 True 和空消息,否则返回 False 和限流信息。 + """ + for limiter in self.limiters: + can_call, message = limiter.can_call() + if not can_call: + return False, message + return True, "" + + def reset(self): + """ + 重置所有组合的限流器状态 + """ + for limiter in self.limiters: + limiter.reset() + + def record_call(self): + """ + 记录所有组合的限流器的调用时间 + """ + for limiter in self.limiters: + limiter.record_call() + + +# 通用装饰器:自定义限流器实例 +def rate_limit_handler(limiter: BaseRateLimiter, raise_on_limit: bool = False) -> Callable: + """ + 通用装饰器,允许用户传递自定义的限流器实例,用于处理限流逻辑 + 该装饰器可灵活支持任意继承自 BaseRateLimiter 的限流器 + + :param limiter: 限流器实例,必须继承自 BaseRateLimiter + :param raise_on_limit: 控制在限流时是否抛出异常,默认为 False + :return: 装饰器函数 + """ + + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + def wrapper(*args, **kwargs) -> Optional[Any]: + # 检查是否传入了 "raise_exception" 参数,优先使用该参数,否则使用默认的 raise_on_limit 值 + raise_exception = kwargs.get("raise_exception", raise_on_limit) + + # 检查是否可以进行调用,调用 limiter.can_call() 方法 + can_call, message = limiter.can_call() + if not can_call: + # 如果调用受限,并且 raise_exception 为 True,则抛出限流异常 + if raise_exception: + raise RateLimitExceededException(message) + # 如果不抛出异常,则返回 None 表示跳过调用 + return None + + # 如果调用允许,执行目标函数,并记录一次调用 + try: + result = func(*args, **kwargs) + limiter.record_call() + if limiter.reset_on_success: + limiter.reset() + return result + except LimitException as e: + # 如果目标函数触发了限流相关的异常,执行限流器的触发逻辑(如递增等待时间) + limiter.trigger_limit() + logger.error(limiter.format_log(f"触发限流:{str(e)}")) + # 如果 raise_exception 为 True,则抛出异常,否则返回 None + if raise_exception: + raise e + return None + + return wrapper + + return decorator + + +# 装饰器:指数退避限流 +def rate_limit_exponential(base_wait: int = 60, max_wait: int = 600, backoff_factor: float = 2.0, + raise_on_limit: bool = False, source: str = "", enable_logging: bool = True) -> Callable: + """ + 装饰器,用于应用指数退避限流策略 + 通过逐渐增加调用等待时间控制调用频率。每次触发限流时,等待时间会成倍增加,直到达到最大等待时间 + + :param base_wait: 基础等待时间(秒),默认值为 60 秒(1 分钟) + :param max_wait: 最大等待时间(秒),默认值为 600 秒(10 分钟) + :param backoff_factor: 等待时间递增的倍数,默认值为 2.0,表示指数退避 + :param raise_on_limit: 控制在限流时是否抛出异常,默认为 False + :param source: 业务来源或上下文信息,默认为空字符串 + :param enable_logging: 是否启用日志记录,默认为 True + :return: 装饰器函数 + """ + # 实例化 ExponentialBackoffRateLimiter,并传入相关参数 + limiter = ExponentialBackoffRateLimiter(base_wait, max_wait, backoff_factor, source, enable_logging) + # 使用通用装饰器逻辑包装该限流器 + return rate_limit_handler(limiter, raise_on_limit) + + +# 装饰器:时间窗口限流 +def rate_limit_window(max_calls: int, window_seconds: int, + raise_on_limit: bool = False, source: str = "", enable_logging: bool = True) -> Callable: + """ + 装饰器,用于应用时间窗口限流策略 + 在固定的时间窗口内限制调用次数,当调用次数超过最大值时,触发限流,直到时间窗口结束 + + :param max_calls: 时间窗口内允许的最大调用次数 + :param window_seconds: 时间窗口的持续时间(秒) + :param raise_on_limit: 控制在限流时是否抛出异常,默认为 False + :param source: 业务来源或上下文信息,默认为空字符串 + :param enable_logging: 是否启用日志记录,默认为 True + :return: 装饰器函数 + """ + # 实例化 WindowRateLimiter,并传入相关参数 + limiter = WindowRateLimiter(max_calls, window_seconds, source, enable_logging) + # 使用通用装饰器逻辑包装该限流器 + return rate_limit_handler(limiter, raise_on_limit)