feat(event): separate implementation of broadcast and chain

This commit is contained in:
InfinityPacer
2024-09-20 13:52:09 +08:00
parent 85cb9f7cd7
commit 3bee5a8a86
2 changed files with 127 additions and 76 deletions

View File

@@ -1,12 +1,12 @@
import threading
import time
import uuid
from enum import Enum
from queue import PriorityQueue, Empty
from typing import Callable, Dict, List, Union, Optional
from app.helper.thread import ThreadHelper
from app.log import logger
from app.schemas.types import EventType, SyncEventType
from app.schemas.types import EventType, ChainEventType
from app.utils.singleton import Singleton
DEFAULT_EVENT_PRIORITY = 10 # 事件的默认优先级
@@ -20,12 +20,12 @@ class Event:
事件类,封装事件的基本信息
"""
def __init__(self, event_type: Union[EventType, SyncEventType], event_data: Optional[Dict] = None,
priority: int = 10):
def __init__(self, event_type: Union[EventType, ChainEventType], event_data: Optional[Dict] = None,
priority: int = DEFAULT_EVENT_PRIORITY):
"""
:param event_type: 事件的类型,支持 EventType 或 SyncEventType
:param event_type: 事件的类型,支持 EventType 或 ChainEventType
:param event_data: 可选,事件携带的数据,默认为空字典
:param priority: 可选,广播事件的优先级,默认为 10
:param priority: 可选,事件的优先级,默认为 10
"""
self.event_id = str(uuid.uuid4()) # 事件ID
self.event_type = event_type # 事件类型
@@ -40,10 +40,10 @@ class Event:
return f"<{event_kind}: {self.event_type.value}, ID: {self.event_id}, Priority: {self.priority}>"
@staticmethod
def get_event_kind(event_type: Union[EventType, SyncEventType]) -> str:
def get_event_kind(event_type: Union[EventType, ChainEventType]) -> str:
"""
根据事件类型判断事件是广播事件还是链式事件
:param event_type: 事件类型,支持 EventType 或 SyncEventType
:param event_type: 事件类型,支持 EventType 或 ChainEventType
:return: 返回 Broadcast Event 或 Chain Event
"""
return "Broadcast Event" if isinstance(event_type, EventType) else "Chain Event"
@@ -60,60 +60,75 @@ class EventManager(metaclass=Singleton):
"""
self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件
self.__event_queue = PriorityQueue() # 优先级队列
self.__subscribers: Dict[Union[EventType, SyncEventType], List[Callable[[Dict], None]]] = {} # 订阅者列表
self.__broadcast_subscribers: Dict[EventType, List[Callable[[Dict], None]]] = {} # 广播事件的订阅者
self.__chain_subscribers: Dict[
ChainEventType, List[tuple[int, Callable[[Dict], None]]]] = {} # 链式事件的订阅者(优先级+处理器)
self.__disabled_handlers = set() # 禁用的事件处理器集合
self.__disabled_classes = set() # 禁用的事件处理器类集合
self.__lock = threading.Lock() # 线程锁
self.__condition = threading.Condition(self.__lock) # 条件变量
# 启动消费者线程用于处理异步事件
# 启动消费者线程用于处理广播事件
for _ in range(MIN_EVENT_CONSUMER_THREADS):
threading.Thread(target=self.__fixed_consumer, daemon=True).start()
threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True).start()
def send_event(self, etype: Union[EventType, SyncEventType], data: Optional[Dict] = None,
def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dict] = None,
priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Dict]:
"""
发送事件,根据事件类型决定是广播事件还是链式事件
:param etype: 事件类型 (EventType 或 SyncEventType)
:param etype: 事件类型 (EventType 或 ChainEventType)
:param data: 可选,事件数据
:param priority: 广播事件的优先级,默认为 10
:return: 如果是链式事件,返回处理后的事件数据;否则返回 None
"""
event = Event(etype, data, priority)
if isinstance(etype, EventType):
self.__trigger_event_async(event)
self.__trigger_broadcast_event(event)
with self.__condition:
self.__condition.notify()
elif isinstance(etype, SyncEventType):
return self.__trigger_event(event)
elif isinstance(etype, ChainEventType):
return self.__trigger_chain_event(event)
else:
logger.error(f"Unknown event type: {etype}")
def add_event_listener(self, event_type: Union[EventType, SyncEventType], handler: Callable[[Dict], None]):
def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None],
priority: int = DEFAULT_EVENT_PRIORITY):
"""
注册事件处理器,将处理器添加到对应的事件订阅列表中
:param event_type: 事件类型 (EventType 或 SyncEventType)
:param event_type: 事件类型 (EventType 或 ChainEventType)
:param handler: 处理器
:param priority: 可选,事件的优先级,默认为 10
"""
with self.__lock:
if event_type not in self.__subscribers:
self.__subscribers[event_type] = []
self.__subscribers[event_type].append(handler)
event_kind = Event.get_event_kind(event_type)
logger.debug(f"Subscribed to event: {event_type.value} ({event_kind}), Handler: {handler.__name__}")
if isinstance(event_type, ChainEventType):
# 链式事件,按优先级排序
if event_type not in self.__chain_subscribers:
self.__chain_subscribers[event_type] = []
self.__chain_subscribers[event_type].append((priority, handler))
self.__chain_subscribers[event_type].sort(key=lambda x: x[0]) # 按优先级排序
logger.debug(
f"Subscribed to chain event: {event_type.value}, Handler: {handler.__name__}, Priority: {priority}")
else:
# 广播事件
if event_type not in self.__broadcast_subscribers:
self.__broadcast_subscribers[event_type] = []
self.__broadcast_subscribers[event_type].append(handler)
logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}")
def remove_event_listener(self, event_type: Union[EventType, SyncEventType],
handler: Callable[[Dict], None]):
def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None]):
"""
移除事件处理器,将处理器从对应事件的订阅列表中删除
:param event_type: 事件类型 (EventType 或 SyncEventType)
:param event_type: 事件类型 (EventType 或 ChainEventType)
:param handler: 要移除的处理器
"""
with self.__lock:
if event_type in self.__subscribers:
self.__subscribers[event_type].remove(handler)
event_kind = Event.get_event_kind(event_type)
logger.debug(f"Unsubscribed from event: {event_type.value} ({event_kind}), Handler: {handler.__name__}")
if isinstance(event_type, ChainEventType) and event_type in self.__chain_subscribers:
self.__chain_subscribers[event_type] = [h for h in self.__chain_subscribers[event_type] if
h[1] != handler]
logger.debug(f"Unsubscribed from chain event: {event_type.value}, Handler: {handler.__name__}")
elif event_type in self.__broadcast_subscribers:
self.__broadcast_subscribers[event_type].remove(handler)
logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}")
def disable_event_handler(self, handler_name: str, class_name: Optional[str] = None):
"""
@@ -141,20 +156,26 @@ class EventManager(metaclass=Singleton):
self.__disabled_handlers.discard(handler_name)
logger.debug(f"Enabled event handler: {handler_name}")
def check(self, etype: Union[EventType, SyncEventType]) -> bool:
def check(self, etype: Union[EventType, ChainEventType]) -> bool:
"""
检查是否有启用的事件处理器可以响应某个事件类型
:param etype: 事件类型 (EventType 或 SyncEventType)
:param etype: 事件类型 (EventType 或 ChainEventType)
:return: 返回是否存在可用的处理器
"""
if etype not in self.__subscribers:
return False
handlers = self.__subscribers.get(etype, [])
return any(
handler.__name__ not in self.__disabled_handlers and
handler.__qualname__.split(".")[0] not in self.__disabled_classes
for handler in handlers
)
if isinstance(etype, ChainEventType):
handlers = self.__chain_subscribers.get(etype, [])
return any(
handler.__name__ not in self.__disabled_handlers and
handler.__qualname__.split(".")[0] not in self.__disabled_classes
for _, handler in handlers
)
else:
handlers = self.__broadcast_subscribers.get(etype, [])
return any(
handler.__name__ not in self.__disabled_handlers and
handler.__qualname__.split(".")[0] not in self.__disabled_classes
for handler in handlers
)
def visualize_handlers(self) -> List[Dict[str, str]]:
"""
@@ -162,14 +183,12 @@ class EventManager(metaclass=Singleton):
:return: 处理器列表,包含处理器名称、类名和状态
"""
handler_info = []
with (self.__lock):
for event_type, handlers in self.__subscribers.items():
with self.__lock:
for event_type, handlers in self.__broadcast_subscribers.items():
for handler in handlers:
class_name = handler.__qualname__.split(".")[0]
status = (
"disabled"
if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes
else "enabled"
"disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled"
)
handler_info.append({
"event_type": event_type.value,
@@ -177,53 +196,71 @@ class EventManager(metaclass=Singleton):
"class_name": class_name,
"status": status
})
for event_type, handlers in self.__chain_subscribers.items():
for priority, handler in handlers:
class_name = handler.__qualname__.split(".")[0]
status = (
"disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled"
)
handler_info.append({
"event_type": event_type.value,
"handler_name": handler.__name__,
"class_name": class_name,
"priority": priority,
"status": status
})
return handler_info
def __trigger_event(self, event: Event) -> Dict:
def __trigger_chain_event(self, event: Event) -> Dict:
"""
触发链式事件,按顺序调用订阅的处理器
:param event: 要处理的事件对象
:return: 返回处理后的事件数据
"""
logger.debug(f"Triggering synchronous chain event: {event}")
self.__dispatch_event(event)
self.__dispatch_chain_event(event)
return event.event_data
def __trigger_event_async(self, event: Event):
def __trigger_broadcast_event(self, event: Event):
"""
触发广播事件,将事件插入到优先级队列中
:param event: 要处理的事件对象
"""
logger.debug(f"Triggering asynchronous broadcast event: {event}")
logger.debug(f"Triggering broadcast event: {event}")
self.__event_queue.put((event.priority, event))
def __dispatch_event(self, event: Event):
def __dispatch_chain_event(self, event: Event):
"""
同步方式调度事件,逐个调用事件处理器
同步方式调度链式事件,按优先级顺序逐个调用事件处理器
:param event: 要调度的事件对象
"""
handlers = self.__subscribers.get(event.event_type, [])
for handler in handlers:
handlers = self.__chain_subscribers.get(event.event_type, [])
self.__log_event_lifecycle(event, "started")
for priority, handler in handlers:
class_name = handler.__qualname__.split(".")[0]
if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes:
start_time = time.time()
try:
handler(event.event_data)
logger.debug(
f"Handler {handler.__qualname__} (Priority: {priority}) "
f"completed in {time.time() - start_time:.3f}s")
except Exception as e:
logger.error(f"Error handling event {event.event_type}: {str(e)}", exc_info=True)
self.__handle_event_error(event, handler, e)
self.__log_event_lifecycle(event, "completed")
def __dispatch_event_async(self, event: Event):
def __dispatch_broadcast_event(self, event: Event):
"""
异步方式调度事件,通过线程池逐个调用事件处理器
异步方式调度广播事件,通过线程池逐个调用事件处理器
:param event: 要调度的事件对象
"""
handlers = self.__subscribers.get(event.event_type, [])
handlers = self.__broadcast_subscribers.get(event.event_type, [])
for handler in handlers:
class_name = handler.__qualname__.split(".")[0]
if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes:
self.__executor.submit(self.__safe_invoke_handler, handler, event)
@staticmethod
def __safe_invoke_handler(handler: Callable[[Dict], None], event: Event):
def __safe_invoke_handler(self, handler: Callable[[Dict], None], event: Event):
"""
安全调用事件处理器,捕获异常并记录日志
:param handler: 要调用的处理器
@@ -232,11 +269,11 @@ class EventManager(metaclass=Singleton):
try:
handler(event.event_data)
except Exception as e:
logger.error(f"Error in asynchronous handler {handler.__name__}: {str(e)}", exc_info=True)
self.__handle_event_error(event, handler, e)
def __fixed_consumer(self):
def __fixed_broadcast_consumer(self):
"""
固定的后台消费者线程,持续从队列中提取事件
固定的后台广播消费者线程,持续从队列中提取事件
"""
while True:
# 使用 Condition 优化队列的等待机制,避免频繁触发超时
@@ -249,16 +286,33 @@ class EventManager(metaclass=Singleton):
try:
priority, event = self.__event_queue.get(timeout=EVENT_QUEUE_IDLE_TIMEOUT_SECONDS)
logger.debug(f"Fixed consumer processing event: {event}")
self.__dispatch_event_async(event)
self.__dispatch_broadcast_event(event)
except Empty:
logger.debug("Queue is empty, waiting for new events.")
def register(self, etype: Union[EventType, SyncEventType, List[Union[EventType, SyncEventType]], type]):
@staticmethod
def __log_event_lifecycle(event: Event, stage: str):
"""
记录事件的生命周期日志
"""
logger.debug(f"{stage} - {event}")
@staticmethod
def __handle_event_error(event: Event, handler: Callable, error: Exception):
"""
全局错误处理器,用于处理事件处理中的异常
"""
logger.error(
f"Global error handler: Event {event.event_type.value} failed in handler {handler.__name__}: {str(error)}")
# 可以将错误事件重新发送到事件队列或执行其他逻辑
# eventmanager.send_event(EventType.SystemError, {"error": str(error), "event_id": event.event_id})
def register(self, etype: Union[EventType, ChainEventType, List[Union[EventType, ChainEventType]], type]):
"""
事件注册装饰器,用于将函数注册为事件的处理器
:param etype:
- 单个事件类型成员 (如 EventType.MetadataScrape, SyncEventType.PluginAction)
- 事件类型类 (EventType, SyncEventType)
- 单个事件类型成员 (如 EventType.MetadataScrape, ChainEventType.PluginAction)
- 事件类型类 (EventType, ChainEventType)
- 或事件类型成员的列表
"""
@@ -268,17 +322,17 @@ class EventManager(metaclass=Singleton):
# 如果传入的是列表,处理每个事件类型
if isinstance(etype, list):
for et in etype:
if isinstance(et, (EventType, SyncEventType)):
if isinstance(et, (EventType, ChainEventType)):
event_list.append(et)
else:
raise ValueError(f"列表中无效的事件类型: {et}")
# 如果传入的是 EventType 或 SyncEventType 类,提取该类中的所有成员
elif isinstance(etype, type) and issubclass(etype, Enum):
# 如果传入的是 EventType 或 ChainEventType 类,提取该类中的所有成员
elif isinstance(etype, type) and issubclass(etype, (EventType, ChainEventType)):
event_list.extend(etype.__members__.values())
# 如果传入的是单个事件类型成员 (EventType.MetadataScrape 或 SyncEventType.PluginAction)
elif isinstance(etype, (EventType, SyncEventType)):
# 如果传入的是单个事件类型成员 (EventType.MetadataScrape 或 ChainEventType.PluginAction)
elif isinstance(etype, (EventType, ChainEventType)):
event_list.append(etype)
else:

View File

@@ -57,11 +57,8 @@ class EventType(Enum):
# 同步链式事件
class SyncEventType(Enum):
# 刮削元数据
MetadataScrape = "metadata.scrape"
# 插件动作
PluginAction = "plugin.action"
class ChainEventType(Enum):
pass
# 系统配置Key字典