fix(event): replace condition-based wait with exponential backoff

This commit is contained in:
InfinityPacer
2024-09-22 02:38:28 +08:00
parent 748836df23
commit 5fc5838abd
2 changed files with 27 additions and 23 deletions

View File

@@ -1,6 +1,7 @@
import copy
import importlib
import inspect
import random
import threading
import time
import traceback
@@ -12,11 +13,13 @@ from app.helper.message import MessageHelper
from app.helper.thread import ThreadHelper
from app.log import logger
from app.schemas.types import EventType, ChainEventType
from app.utils.limit import ExponentialBackoffRateLimiter
from app.utils.singleton import Singleton
DEFAULT_EVENT_PRIORITY = 10 # 事件的默认优先级
MIN_EVENT_CONSUMER_THREADS = 1 # 最小事件消费者线程数
EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 30 # 事件队列空闲时的超时时间(秒)
INITIAL_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 1 # 事件队列空闲时的初始超时时间(秒)
MAX_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 60 # 事件队列空闲时的最大超时时间(秒)
class Event:
@@ -78,7 +81,6 @@ class EventManager(metaclass=Singleton):
self.__disabled_handlers = set() # 禁用的事件处理器集合
self.__disabled_classes = set() # 禁用的事件处理器类集合
self.__lock = threading.Lock() # 线程锁
self.__condition = threading.Condition(self.__lock) # 条件变量
def start(self):
"""
@@ -87,7 +89,7 @@ class EventManager(metaclass=Singleton):
# 启动消费者线程用于处理广播事件
self.__event.set()
for _ in range(MIN_EVENT_CONSUMER_THREADS):
thread = threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True)
thread = threading.Thread(target=self.__broadcast_consumer_loop, daemon=True)
thread.start()
self.__consumer_threads.append(thread) # 将线程对象保存到列表中
@@ -136,8 +138,6 @@ class EventManager(metaclass=Singleton):
event = Event(etype, data, priority)
if isinstance(etype, EventType):
self.__trigger_broadcast_event(event)
with self.__condition:
self.__condition.notify()
elif isinstance(etype, ChainEventType):
return self.__trigger_chain_event(event)
else:
@@ -399,20 +399,24 @@ class EventManager(metaclass=Singleton):
return class_obj
def __fixed_broadcast_consumer(self):
def __broadcast_consumer_loop(self):
"""
固定的后台广播消费者线程,持续从队列中提取事件
持续从队列中提取事件的后台广播消费者线程
"""
jitter_factor = 0.1
rate_limiter = ExponentialBackoffRateLimiter(base_wait=INITIAL_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS,
max_wait=MAX_EVENT_QUEUE_IDLE_TIMEOUT_SECONDS,
backoff_factor=2.0,
source="BroadcastConsumer",
enable_logging=False)
while self.__event.is_set():
# 使用 Condition 优化队列的等待机制,避免频繁触发超时
with self.__condition:
# 阻塞等待,直到有事件插入
self.__condition.wait()
try:
priority, event = self.__event_queue.get(timeout=EVENT_QUEUE_IDLE_TIMEOUT_SECONDS)
self.__dispatch_broadcast_event(event)
except Empty:
logger.debug("Queue is empty, waiting for new events")
try:
priority, event = self.__event_queue.get(timeout=rate_limiter.current_wait)
rate_limiter.reset()
self.__dispatch_broadcast_event(event)
except Empty:
rate_limiter.current_wait = rate_limiter.current_wait * random.uniform(1, 1 + jitter_factor)
rate_limiter.trigger_limit()
@staticmethod
def __log_event_lifecycle(event: Event, stage: str):

View File

@@ -97,7 +97,7 @@ class ExponentialBackoffRateLimiter(BaseRateLimiter):
每次触发限流时,等待时间会成倍增加,直到达到最大等待时间
"""
def __init__(self, base_wait: int = 60, max_wait: int = 600, backoff_factor: float = 2.0,
def __init__(self, base_wait: float = 60.0, max_wait: float = 600.0, backoff_factor: float = 2.0,
source: str = "", enable_logging: bool = True):
"""
初始化 ExponentialBackoffRateLimiter 实例
@@ -108,7 +108,7 @@ class ExponentialBackoffRateLimiter(BaseRateLimiter):
:param enable_logging: 是否启用日志记录,默认为 True
"""
super().__init__(source, enable_logging)
self.next_allowed_time = 0
self.next_allowed_time = 0.0
self.current_wait = base_wait
self.base_wait = base_wait
self.max_wait = max_wait
@@ -144,7 +144,7 @@ class ExponentialBackoffRateLimiter(BaseRateLimiter):
with self.lock:
if self.next_allowed_time != 0 or self.current_wait > self.base_wait:
self.log_info(f"调用成功,重置限流等待时间为 {self.base_wait}")
self.next_allowed_time = 0
self.next_allowed_time = 0.0
self.current_wait = self.base_wait
def trigger_limit(self):
@@ -155,8 +155,8 @@ class ExponentialBackoffRateLimiter(BaseRateLimiter):
current_time = time.time()
with self.lock:
self.next_allowed_time = current_time + self.current_wait
self.log_warning(f"触发限流,将在 {self.current_wait} 秒后允许继续调用")
self.current_wait = min(self.current_wait * self.backoff_factor, self.max_wait)
self.log_warning(f"触发限流,将在 {self.current_wait} 秒后允许继续调用")
# 时间窗口限流器
@@ -166,7 +166,7 @@ class WindowRateLimiter(BaseRateLimiter):
如果超过允许的最大调用次数,则限流直到窗口期结束
"""
def __init__(self, max_calls: int, window_seconds: int,
def __init__(self, max_calls: int, window_seconds: float,
source: str = "", enable_logging: bool = True):
"""
初始化 WindowRateLimiter 实例
@@ -308,7 +308,7 @@ def rate_limit_handler(limiter: BaseRateLimiter, raise_on_limit: bool = False) -
# 装饰器:指数退避限流
def rate_limit_exponential(base_wait: int = 60, max_wait: int = 600, backoff_factor: float = 2.0,
def rate_limit_exponential(base_wait: float = 60.0, max_wait: float = 600.0, backoff_factor: float = 2.0,
raise_on_limit: bool = False, source: str = "", enable_logging: bool = True) -> Callable:
"""
装饰器,用于应用指数退避限流策略
@@ -329,7 +329,7 @@ def rate_limit_exponential(base_wait: int = 60, max_wait: int = 600, backoff_fac
# 装饰器:时间窗口限流
def rate_limit_window(max_calls: int, window_seconds: int,
def rate_limit_window(max_calls: int, window_seconds: float,
raise_on_limit: bool = False, source: str = "", enable_logging: bool = True) -> Callable:
"""
装饰器,用于应用时间窗口限流策略