feat(event): update constant and support Condition for thread

This commit is contained in:
InfinityPacer
2024-09-20 00:25:38 +08:00
parent 49b6052ab0
commit e786120e98

View File

@@ -9,6 +9,11 @@ from app.log import logger
from app.schemas.types import EventType, SyncEventType
from app.utils.singleton import Singleton
DEFAULT_EVENT_PRIORITY = 10 # 事件的默认优先级
MIN_EVENT_CONSUMER_THREADS = 1 # 最小事件消费者线程数
MAX_EVENT_WORKER_POOL_SIZE = 50 # 最大事件工作线程池大小
EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 60 # 事件队列空闲时的超时时间(秒)
class Event:
"""
@@ -49,23 +54,23 @@ class EventManager(metaclass=Singleton):
EventManager 负责管理和调度广播事件和链式事件,包括订阅、发送和处理事件
"""
def __init__(self, max_workers: int = 50):
def __init__(self, max_workers: int = MAX_EVENT_WORKER_POOL_SIZE):
"""
:param max_workers: 线程池最大工作线程数,默认 50
:param max_workers: 线程池最大工作线程数
"""
self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件
self.__event_executor = ThreadHelper(max_workers=3) # 动态线程池,用于处理事件
self.__event_queue = PriorityQueue() # 优先级队列
self.__subscribers: Dict[Union[EventType, SyncEventType], List[Callable[[Dict], None]]] = {} # 订阅者列表
self.__disabled_handlers = set() # 禁用的事件处理器集合
self.__lock = threading.Lock() # 线程锁
self.__dynamic_consuming = False # 标记是否已经在使用动态线程池
self.__condition = threading.Condition(self.__lock) # 条件变量
# 启动消费者线程用于处理异步事件
threading.Thread(target=self.__fixed_consumer, daemon=True).start()
for _ in range(MIN_EVENT_CONSUMER_THREADS):
threading.Thread(target=self.__fixed_consumer, daemon=True).start()
def send_event(self, etype: Union[EventType, SyncEventType], data: Optional[Dict] = None, priority: int = 10) -> \
Optional[Dict]:
def send_event(self, etype: Union[EventType, SyncEventType], data: Optional[Dict] = None,
priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Dict]:
"""
发送事件,根据事件类型决定是广播事件还是链式事件
:param etype: 事件类型 (EventType 或 SyncEventType)
@@ -73,15 +78,17 @@ class EventManager(metaclass=Singleton):
:param priority: 广播事件的优先级,默认为 10
:return: 如果是链式事件,返回处理后的事件数据;否则返回 None
"""
event = Event(etype, data, priority if isinstance(etype, EventType) else None)
event = Event(etype, data, priority)
if isinstance(etype, EventType):
self.__trigger_event_async(event, priority)
self.__trigger_event_async(event)
with self.__condition:
self.__condition.notify()
elif isinstance(etype, SyncEventType):
return self.__trigger_event(event)
else:
logger.error(f"Unknown event type: {etype}")
def add_event_listener(self, event_type: Union[EventType, SyncEventType], handler: Callable[[Dict], None]) -> None:
def add_event_listener(self, event_type: Union[EventType, SyncEventType], handler: Callable[[Dict], None]):
"""
注册事件处理器,将处理器添加到对应的事件订阅列表中
:param event_type: 事件类型 (EventType 或 SyncEventType)
@@ -95,7 +102,7 @@ class EventManager(metaclass=Singleton):
logger.debug(f"Subscribed to event: {event_type.value} ({event_kind}), Handler: {handler.__name__}")
def remove_event_listener(self, event_type: Union[EventType, SyncEventType],
handler: Callable[[Dict], None]) -> None:
handler: Callable[[Dict], None]):
"""
移除事件处理器,将处理器从对应事件的订阅列表中删除
:param event_type: 事件类型 (EventType 或 SyncEventType)
@@ -107,7 +114,7 @@ class EventManager(metaclass=Singleton):
event_kind = Event.get_event_kind(event_type)
logger.debug(f"Unsubscribed from event: {event_type.value} ({event_kind}), Handler: {handler.__name__}")
def disable_event_handler(self, handler_name: str) -> None:
def disable_event_handler(self, handler_name: str):
"""
禁用指定名称的事件处理器,防止其响应事件
:param handler_name: 要禁用的事件处理器名称
@@ -115,7 +122,7 @@ class EventManager(metaclass=Singleton):
self.__disabled_handlers.add(handler_name)
logger.debug(f"Disabled event handler: {handler_name}")
def enable_event_handler(self, handler_name: str) -> None:
def enable_event_handler(self, handler_name: str):
"""
启用指定名称的事件处理器,使其可以继续响应事件
:param handler_name: 要启用的事件处理器名称
@@ -144,21 +151,15 @@ class EventManager(metaclass=Singleton):
self.__dispatch_event(event)
return event.event_data
def __trigger_event_async(self, event: Event, priority: int) -> None:
def __trigger_event_async(self, event: Event):
"""
触发广播事件,将事件插入到优先级队列中
:param event: 要处理的事件对象
:param priority: 事件的优先级
"""
logger.debug(f"Triggering asynchronous broadcast event: {event}")
self.__event_queue.put((priority, event))
self.__event_queue.put((event.priority, event))
# 当固定消费者无法及时处理时,动态启动线程池
if self.__event_queue.qsize() > 10 and not self.__dynamic_consuming:
self.__dynamic_consuming = True
self.__event_executor.submit(self.__dynamic_consumer)
def __dispatch_event(self, event: Event) -> None:
def __dispatch_event(self, event: Event):
"""
同步方式调度事件,逐个调用事件处理器
:param event: 要调度的事件对象
@@ -168,7 +169,7 @@ class EventManager(metaclass=Singleton):
if handler.__name__ not in self.__disabled_handlers:
handler(event.event_data)
def __dispatch_event_async(self, event: Event) -> None:
def __dispatch_event_async(self, event: Event):
"""
异步方式调度事件,通过线程池逐个调用事件处理器
:param event: 要调度的事件对象
@@ -178,34 +179,24 @@ class EventManager(metaclass=Singleton):
if handler.__name__ not in self.__disabled_handlers:
self.__executor.submit(handler, event.event_data)
def __fixed_consumer(self) -> None:
def __fixed_consumer(self):
"""
固定的后台消费者线程,持续从队列中提取事件处理
该线程始终保持运行状态,确保即使事件量少时也有线程在消费
固定的后台消费者线程,持续从队列中提取事件
"""
while True:
try:
# 阻塞方式从队列获取事件
priority, event = self.__event_queue.get(block=True, timeout=1)
logger.debug(f"Fixed consumer processing event: {event}")
self.__dispatch_event_async(event) # 调用事件处理器
except Empty:
continue # 如果队列为空,继续等待
# 使用 Condition 优化队列的等待机制,避免频繁触发超时
with self.__condition:
# 当队列为空时,线程进入等待状态,直到有新事件到来
while self.__event_queue.empty():
# 阻塞等待,直到有事件插入
self.__condition.wait()
def __dynamic_consumer(self) -> None:
"""
动态消费者线程,通过线程池调度,用于在事件量大时进行扩展
一旦队列为空,则结束动态消费,并重置动态消费标志
"""
while True:
try:
# 非阻塞方式从队列获取事件
priority, event = self.__event_queue.get(block=False)
logger.debug(f"Dynamic consumer processing event: {event}")
self.__dispatch_event_async(event) # 调用事件处理器
except Empty:
self.__dynamic_consuming = False # 队列为空,结束动态消费
break
try:
priority, event = self.__event_queue.get(timeout=EVENT_QUEUE_IDLE_TIMEOUT_SECONDS)
logger.debug(f"Fixed consumer processing event: {event}")
self.__dispatch_event_async(event)
except Empty:
logger.debug("Queue is empty, waiting for new events.")
def register(self, etype: Union[EventType, SyncEventType, List[Union[EventType, SyncEventType]], type]):
"""