diff --git a/app/core/event.py b/app/core/event.py index ffa54ec3..80e46c0e 100644 --- a/app/core/event.py +++ b/app/core/event.py @@ -1,6 +1,7 @@ import copy import importlib import inspect +import random import threading import time import traceback @@ -12,11 +13,13 @@ from app.helper.message import MessageHelper from app.helper.thread import ThreadHelper from app.log import logger from app.schemas.types import EventType, ChainEventType +from app.utils.limit import ExponentialBackoffRateLimiter from app.utils.singleton import Singleton DEFAULT_EVENT_PRIORITY = 10 # 事件的默认优先级 MIN_EVENT_CONSUMER_THREADS = 1 # 最小事件消费者线程数 -EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 30 # 事件队列空闲时的超时时间(秒) +INITIAL_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 1 # 事件队列空闲时的初始超时时间(秒) +MAX_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 60 # 事件队列空闲时的最大超时时间(秒) class Event: @@ -78,7 +81,6 @@ class EventManager(metaclass=Singleton): self.__disabled_handlers = set() # 禁用的事件处理器集合 self.__disabled_classes = set() # 禁用的事件处理器类集合 self.__lock = threading.Lock() # 线程锁 - self.__condition = threading.Condition(self.__lock) # 条件变量 def start(self): """ @@ -87,7 +89,7 @@ class EventManager(metaclass=Singleton): # 启动消费者线程用于处理广播事件 self.__event.set() for _ in range(MIN_EVENT_CONSUMER_THREADS): - thread = threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True) + thread = threading.Thread(target=self.__broadcast_consumer_loop, daemon=True) thread.start() self.__consumer_threads.append(thread) # 将线程对象保存到列表中 @@ -136,8 +138,6 @@ class EventManager(metaclass=Singleton): event = Event(etype, data, priority) if isinstance(etype, EventType): self.__trigger_broadcast_event(event) - with self.__condition: - self.__condition.notify() elif isinstance(etype, ChainEventType): return self.__trigger_chain_event(event) else: @@ -399,20 +399,24 @@ class EventManager(metaclass=Singleton): return class_obj - def __fixed_broadcast_consumer(self): + def __broadcast_consumer_loop(self): """ - 固定的后台广播消费者线程,持续从队列中提取事件 + 持续从队列中提取事件的后台广播消费者线程 """ + jitter_factor = 0.1 + rate_limiter = ExponentialBackoffRateLimiter(base_wait=INITIAL_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS, + max_wait=MAX_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS, + backoff_factor=2.0, + source="BroadcastConsumer", + enable_logging=False) while self.__event.is_set(): - # 使用 Condition 优化队列的等待机制,避免频繁触发超时 - with self.__condition: - # 阻塞等待,直到有事件插入 - self.__condition.wait() - try: - priority, event = self.__event_queue.get(timeout=EVENT_QUEUE_IDLE_TIMEOUT_SECONDS) - self.__dispatch_broadcast_event(event) - except Empty: - logger.debug("Queue is empty, waiting for new events") + try: + priority, event = self.__event_queue.get(timeout=rate_limiter.current_wait) + rate_limiter.reset() + self.__dispatch_broadcast_event(event) + except Empty: + rate_limiter.current_wait = rate_limiter.current_wait * random.uniform(1, 1 + jitter_factor) + rate_limiter.trigger_limit() @staticmethod def __log_event_lifecycle(event: Event, stage: str): diff --git a/app/utils/limit.py b/app/utils/limit.py index 7be22a81..27dc7fcd 100644 --- a/app/utils/limit.py +++ b/app/utils/limit.py @@ -97,7 +97,7 @@ class ExponentialBackoffRateLimiter(BaseRateLimiter): 每次触发限流时,等待时间会成倍增加,直到达到最大等待时间 """ - def __init__(self, base_wait: int = 60, max_wait: int = 600, backoff_factor: float = 2.0, + def __init__(self, base_wait: float = 60.0, max_wait: float = 600.0, backoff_factor: float = 2.0, source: str = "", enable_logging: bool = True): """ 初始化 ExponentialBackoffRateLimiter 实例 @@ -108,7 +108,7 @@ class ExponentialBackoffRateLimiter(BaseRateLimiter): :param enable_logging: 是否启用日志记录,默认为 True """ super().__init__(source, enable_logging) - self.next_allowed_time = 0 + self.next_allowed_time = 0.0 self.current_wait = base_wait self.base_wait = base_wait self.max_wait = max_wait @@ -144,7 +144,7 @@ class ExponentialBackoffRateLimiter(BaseRateLimiter): with self.lock: if self.next_allowed_time != 0 or self.current_wait > self.base_wait: self.log_info(f"调用成功,重置限流等待时间为 {self.base_wait} 秒") - self.next_allowed_time = 0 + self.next_allowed_time = 0.0 self.current_wait = self.base_wait def trigger_limit(self): @@ -155,8 +155,8 @@ class ExponentialBackoffRateLimiter(BaseRateLimiter): current_time = time.time() with self.lock: self.next_allowed_time = current_time + self.current_wait - self.log_warning(f"触发限流,将在 {self.current_wait} 秒后允许继续调用") self.current_wait = min(self.current_wait * self.backoff_factor, self.max_wait) + self.log_warning(f"触发限流,将在 {self.current_wait} 秒后允许继续调用") # 时间窗口限流器 @@ -166,7 +166,7 @@ class WindowRateLimiter(BaseRateLimiter): 如果超过允许的最大调用次数,则限流直到窗口期结束 """ - def __init__(self, max_calls: int, window_seconds: int, + def __init__(self, max_calls: int, window_seconds: float, source: str = "", enable_logging: bool = True): """ 初始化 WindowRateLimiter 实例 @@ -308,7 +308,7 @@ def rate_limit_handler(limiter: BaseRateLimiter, raise_on_limit: bool = False) - # 装饰器:指数退避限流 -def rate_limit_exponential(base_wait: int = 60, max_wait: int = 600, backoff_factor: float = 2.0, +def rate_limit_exponential(base_wait: float = 60.0, max_wait: float = 600.0, backoff_factor: float = 2.0, raise_on_limit: bool = False, source: str = "", enable_logging: bool = True) -> Callable: """ 装饰器,用于应用指数退避限流策略 @@ -329,7 +329,7 @@ def rate_limit_exponential(base_wait: int = 60, max_wait: int = 600, backoff_fac # 装饰器:时间窗口限流 -def rate_limit_window(max_calls: int, window_seconds: int, +def rate_limit_window(max_calls: int, window_seconds: float, raise_on_limit: bool = False, source: str = "", enable_logging: bool = True) -> Callable: """ 装饰器,用于应用时间窗口限流策略