refactor(event): optimize broadcast and chain event

This commit is contained in:
InfinityPacer
2024-09-19 23:55:24 +08:00
parent 2486b9274c
commit 49b6052ab0
2 changed files with 220 additions and 111 deletions

View File

@@ -1,150 +1,251 @@
import threading
import typing
from queue import Queue, Empty
from typing import Dict, Any
import uuid
from enum import Enum
from queue import PriorityQueue, Empty
from typing import Callable, Dict, List, Union, Optional
from app.helper.thread import ThreadHelper
from app.log import logger
from app.schemas.types import EventType, SyncEventType
from app.utils.singleton import Singleton
from app.schemas.types import EventType
class Event:
"""
事件类,封装事件的基本信息
"""
def __init__(self, event_type: Union[EventType, SyncEventType], event_data: Optional[Dict] = None,
priority: int = 10):
"""
:param event_type: 事件的类型,支持 EventType 或 SyncEventType
:param event_data: 可选,事件携带的数据,默认为空字典
:param priority: 可选,广播事件的优先级,默认为 10
"""
self.event_id = str(uuid.uuid4()) # 事件ID
self.event_type = event_type # 事件类型
self.event_data = event_data or {} # 事件数据
self.priority = priority # 事件优先级
def __repr__(self) -> str:
"""
重写 __repr__ 方法用于返回事件的详细信息包括事件类型、事件ID和优先级
"""
event_kind = Event.get_event_kind(self.event_type)
return f"<{event_kind}: {self.event_type.value}, ID: {self.event_id}, Priority: {self.priority}>"
@staticmethod
def get_event_kind(event_type: Union[EventType, SyncEventType]) -> str:
"""
根据事件类型判断事件是广播事件还是链式事件
:param event_type: 事件类型,支持 EventType 或 SyncEventType
:return: 返回 Broadcast Event 或 Chain Event
"""
return "Broadcast Event" if isinstance(event_type, EventType) else "Chain Event"
class EventManager(metaclass=Singleton):
"""
事件管理器
EventManager 负责管理和调度广播事件和链式事件,包括订阅、发送和处理事件
"""
def __init__(self):
# 事件队列
self._eventQueue = Queue()
# 事件响应函数字典
self._handlers: Dict[str, Dict[str, Any]] = {}
# 已禁用的事件响应
self._disabled_handlers = []
def __init__(self, max_workers: int = 50):
"""
:param max_workers: 线程池最大工作线程数,默认 50
"""
self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件
self.__event_executor = ThreadHelper(max_workers=3) # 动态线程池,用于处理事件
self.__event_queue = PriorityQueue() # 优先级队列
self.__subscribers: Dict[Union[EventType, SyncEventType], List[Callable[[Dict], None]]] = {} # 订阅者列表
self.__disabled_handlers = set() # 禁用的事件处理器集合
self.__lock = threading.Lock() # 线程锁
self.__dynamic_consuming = False # 标记是否已经在使用动态线程池
def get_event(self):
"""
获取事件
"""
try:
event = self._eventQueue.get(block=True, timeout=1)
handlers = self._handlers.get(event.event_type) or {}
if handlers:
# 去除掉被禁用的事件响应
handlerList = [handler for handler in handlers.values()
if handler.__qualname__.split(".")[0] not in self._disabled_handlers]
return event, handlerList
return event, []
except Empty:
return None, []
# 启动消费者线程用于处理异步事件
threading.Thread(target=self.__fixed_consumer, daemon=True).start()
def check(self, etype: EventType):
def send_event(self, etype: Union[EventType, SyncEventType], data: Optional[Dict] = None, priority: int = 10) -> \
Optional[Dict]:
"""
检查事件是否存在响应,去除掉被禁用的事件响应
发送事件,根据事件类型决定是广播事件还是链式事件
:param etype: 事件类型 (EventType 或 SyncEventType)
:param data: 可选,事件数据
:param priority: 广播事件的优先级,默认为 10
:return: 如果是链式事件,返回处理后的事件数据;否则返回 None
"""
if etype.value not in self._handlers:
return False
handlers = self._handlers.get(etype.value)
return any([handler for handler in handlers.values()
if handler.__qualname__.split(".")[0] not in self._disabled_handlers])
def add_event_listener(self, etype: EventType, handler: type):
"""
注册事件处理
"""
try:
handlers = self._handlers[etype.value]
except KeyError:
handlers = {}
self._handlers[etype.value] = handlers
if handler.__qualname__ in handlers:
handlers.pop(handler.__qualname__)
event = Event(etype, data, priority if isinstance(etype, EventType) else None)
if isinstance(etype, EventType):
self.__trigger_event_async(event, priority)
elif isinstance(etype, SyncEventType):
return self.__trigger_event(event)
else:
logger.debug(f"Event Registed{etype.value} - {handler.__qualname__}")
handlers[handler.__qualname__] = handler
logger.error(f"Unknown event type: {etype}")
def disable_events_hander(self, class_name: str):
def add_event_listener(self, event_type: Union[EventType, SyncEventType], handler: Callable[[Dict], None]) -> None:
"""
标记对应类事件处理为不可用
注册事件处理器,将处理器添加到对应的事件订阅列表中
:param event_type: 事件类型 (EventType 或 SyncEventType)
:param handler: 处理器
"""
if class_name not in self._disabled_handlers:
self._disabled_handlers.append(class_name)
logger.debug(f"Event Disabled{class_name}")
with self.__lock:
if event_type not in self.__subscribers:
self.__subscribers[event_type] = []
self.__subscribers[event_type].append(handler)
event_kind = Event.get_event_kind(event_type)
logger.debug(f"Subscribed to event: {event_type.value} ({event_kind}), Handler: {handler.__name__}")
def enable_events_hander(self, class_name: str):
def remove_event_listener(self, event_type: Union[EventType, SyncEventType],
handler: Callable[[Dict], None]) -> None:
"""
标记对应事件处理为可用
移除事件处理器,将处理器从对应事件的订阅列表中删除
:param event_type: 事件类型 (EventType 或 SyncEventType)
:param handler: 要移除的处理器
"""
if class_name in self._disabled_handlers:
self._disabled_handlers.remove(class_name)
logger.debug(f"Event Enabled{class_name}")
with self.__lock:
if event_type in self.__subscribers:
self.__subscribers[event_type].remove(handler)
event_kind = Event.get_event_kind(event_type)
logger.debug(f"Unsubscribed from event: {event_type.value} ({event_kind}), Handler: {handler.__name__}")
def send_event(self, etype: EventType, data: dict = None, callback: typing.Callable = None):
def disable_event_handler(self, handler_name: str) -> None:
"""
发送事件(异步响应)
禁用指定名称的事件处理器,防止其响应事件
:param handler_name: 要禁用的事件处理器名称
"""
if etype not in EventType:
return
event = Event(etype.value)
event.event_data = data or {}
event.event_callback = callback
logger.debug(f"发送事件:{etype.value} - data{event.event_data}callback{callback}")
self._eventQueue.put(event)
self.__disabled_handlers.add(handler_name)
logger.debug(f"Disabled event handler: {handler_name}")
def send_event_sync(self, etype: EventType, data: dict = None):
def enable_event_handler(self, handler_name: str) -> None:
"""
发送事件(同步响应)
启用指定名称的事件处理器,使其可以继续响应事件
:param handler_name: 要启用的事件处理器名称
"""
result: any = None
condition = threading.Condition()
self.__disabled_handlers.discard(handler_name)
logger.debug(f"Enabled event handler: {handler_name}")
def callback(res):
nonlocal result
if res:
result = res.result()
with condition:
condition.notify_all()
thread = threading.Thread(target=self.send_event, args=(etype, data, callback))
thread.start()
with condition:
condition.wait()
return result
def register(self, etype: [EventType, list]):
def check(self, etype: Union[EventType, SyncEventType]) -> bool:
"""
事件注册
:param etype: 事件类型
检查是否有启用的事件处理器可以响应某个事件类型
:param etype: 事件类型 (EventType 或 SyncEventType)
:return: 返回是否存在可用的处理器
"""
if etype not in self.__subscribers:
return False
handlers = self.__subscribers.get(etype, [])
return any(handler.__name__ not in self.__disabled_handlers for handler in handlers)
def __trigger_event(self, event: Event) -> Dict:
"""
触发链式事件,按顺序调用订阅的处理器
:param event: 要处理的事件对象
:return: 返回处理后的事件数据
"""
logger.debug(f"Triggering synchronous chain event: {event}")
self.__dispatch_event(event)
return event.event_data
def __trigger_event_async(self, event: Event, priority: int) -> None:
"""
触发广播事件,将事件插入到优先级队列中
:param event: 要处理的事件对象
:param priority: 事件的优先级
"""
logger.debug(f"Triggering asynchronous broadcast event: {event}")
self.__event_queue.put((priority, event))
# 当固定消费者无法及时处理时,动态启动线程池
if self.__event_queue.qsize() > 10 and not self.__dynamic_consuming:
self.__dynamic_consuming = True
self.__event_executor.submit(self.__dynamic_consumer)
def __dispatch_event(self, event: Event) -> None:
"""
同步方式调度事件,逐个调用事件处理器
:param event: 要调度的事件对象
"""
handlers = self.__subscribers.get(event.event_type, [])
for handler in handlers:
if handler.__name__ not in self.__disabled_handlers:
handler(event.event_data)
def __dispatch_event_async(self, event: Event) -> None:
"""
异步方式调度事件,通过线程池逐个调用事件处理器
:param event: 要调度的事件对象
"""
handlers = self.__subscribers.get(event.event_type, [])
for handler in handlers:
if handler.__name__ not in self.__disabled_handlers:
self.__executor.submit(handler, event.event_data)
def __fixed_consumer(self) -> None:
"""
固定的后台消费者线程,持续从队列中提取事件处理
该线程始终保持运行状态,确保即使事件量少时也有线程在消费
"""
while True:
try:
# 阻塞方式从队列获取事件
priority, event = self.__event_queue.get(block=True, timeout=1)
logger.debug(f"Fixed consumer processing event: {event}")
self.__dispatch_event_async(event) # 调用事件处理器
except Empty:
continue # 如果队列为空,继续等待
def __dynamic_consumer(self) -> None:
"""
动态消费者线程,通过线程池调度,用于在事件量大时进行扩展
一旦队列为空,则结束动态消费,并重置动态消费标志
"""
while True:
try:
# 非阻塞方式从队列获取事件
priority, event = self.__event_queue.get(block=False)
logger.debug(f"Dynamic consumer processing event: {event}")
self.__dispatch_event_async(event) # 调用事件处理器
except Empty:
self.__dynamic_consuming = False # 队列为空,结束动态消费
break
def register(self, etype: Union[EventType, SyncEventType, List[Union[EventType, SyncEventType]], type]):
"""
事件注册装饰器,用于将函数注册为事件的处理器
:param etype:
- 单个事件类型成员 (如 EventType.MetadataScrape, SyncEventType.PluginAction)
- 事件类型类 (EventType, SyncEventType)
- 或事件类型成员的列表
"""
def decorator(f):
def decorator(f: Callable[[Dict], None]):
event_list = []
# 如果传入的是列表,处理每个事件类型
if isinstance(etype, list):
for et in etype:
self.add_event_listener(et, f)
elif type(etype) == type(EventType):
for et in etype.__members__.values():
self.add_event_listener(et, f)
if isinstance(et, (EventType, SyncEventType)):
event_list.append(et)
else:
raise ValueError(f"列表中无效的事件类型: {et}")
# 如果传入的是 EventType 或 SyncEventType 类,提取该类中的所有成员
elif isinstance(etype, type) and issubclass(etype, Enum):
event_list.extend(etype.__members__.values())
# 如果传入的是单个事件类型成员 (EventType.MetadataScrape 或 SyncEventType.PluginAction)
elif isinstance(etype, (EventType, SyncEventType)):
event_list.append(etype)
else:
self.add_event_listener(etype, f)
raise ValueError(f"无效的事件类型: {etype}")
# 统一注册事件
for event in event_list:
self.add_event_listener(event, f)
return f
return decorator
class Event(object):
"""
事件对象
"""
def __init__(self, event_type=None):
# 事件类型
self.event_type = event_type
# 字典用于保存具体的事件数据
self.event_data = {}
# 事件完成后回调函数
self.event_callback = None
# 实例引用,用于注册事件
# 全局实例定义
eventmanager = EventManager()

View File

@@ -14,7 +14,7 @@ class TorrentStatus(Enum):
DOWNLOADING = "下载中"
# 可监听事件
# 异步广播事件
class EventType(Enum):
# 插件需要重载
PluginReload = "plugin.reload"
@@ -56,6 +56,14 @@ class EventType(Enum):
MetadataScrape = "metadata.scrape"
# 同步链式事件
class SyncEventType(Enum):
# 刮削元数据
MetadataScrape = "metadata.scrape"
# 插件动作
PluginAction = "plugin.action"
# 系统配置Key字典
class SystemConfigKey(Enum):
# 下载器配置