mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-02-14 07:55:07 +08:00
feat(event): optimize handler
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
import inspect
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
@@ -60,9 +61,8 @@ class EventManager(metaclass=Singleton):
|
||||
"""
|
||||
self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件
|
||||
self.__event_queue = PriorityQueue() # 优先级队列
|
||||
self.__broadcast_subscribers: Dict[EventType, List[Callable[[Dict], None]]] = {} # 广播事件的订阅者
|
||||
self.__chain_subscribers: Dict[
|
||||
ChainEventType, List[tuple[int, Callable[[Dict], None]]]] = {} # 链式事件的订阅者(优先级+处理器)
|
||||
self.__broadcast_subscribers: Dict[EventType, List[Callable]] = {} # 广播事件的订阅者
|
||||
self.__chain_subscribers: Dict[ChainEventType, List[tuple[int, Callable]]] = {} # 链式事件的订阅者
|
||||
self.__disabled_handlers = set() # 禁用的事件处理器集合
|
||||
self.__disabled_classes = set() # 禁用的事件处理器类集合
|
||||
self.__lock = threading.Lock() # 线程锁
|
||||
@@ -73,7 +73,7 @@ class EventManager(metaclass=Singleton):
|
||||
threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True).start()
|
||||
|
||||
def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dict] = None,
|
||||
priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Dict]:
|
||||
priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Event]:
|
||||
"""
|
||||
发送事件,根据事件类型决定是广播事件还是链式事件
|
||||
:param etype: 事件类型 (EventType 或 ChainEventType)
|
||||
@@ -91,7 +91,7 @@ class EventManager(metaclass=Singleton):
|
||||
else:
|
||||
logger.error(f"Unknown event type: {etype}")
|
||||
|
||||
def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None],
|
||||
def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable,
|
||||
priority: int = DEFAULT_EVENT_PRIORITY):
|
||||
"""
|
||||
注册事件处理器,将处理器添加到对应的事件订阅列表中
|
||||
@@ -115,7 +115,7 @@ class EventManager(metaclass=Singleton):
|
||||
self.__broadcast_subscribers[event_type].append(handler)
|
||||
logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}")
|
||||
|
||||
def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None]):
|
||||
def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable):
|
||||
"""
|
||||
移除事件处理器,将处理器从对应事件的订阅列表中删除
|
||||
:param event_type: 事件类型 (EventType 或 ChainEventType)
|
||||
@@ -130,96 +130,105 @@ class EventManager(metaclass=Singleton):
|
||||
self.__broadcast_subscribers[event_type].remove(handler)
|
||||
logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}")
|
||||
|
||||
def disable_event_handler(self, handler_name: str, class_name: Optional[str] = None):
|
||||
def disable_event_handler(self, target: Union[Callable, type]):
|
||||
"""
|
||||
禁用指定名称的事件处理器或事件处理类,防止其响应事件
|
||||
:param handler_name: 要禁用的事件处理器名称
|
||||
:param class_name: 可选,要禁用的事件处理器类名称。如果提供,将禁用该类的所有处理器
|
||||
禁用指定的事件处理器或事件处理器类
|
||||
:param target: 处理器函数或类
|
||||
"""
|
||||
if class_name:
|
||||
self.__disabled_classes.add(class_name)
|
||||
logger.debug(f"Disabled event handler class: {class_name}")
|
||||
identifier = self.__get_handler_identifier(target)
|
||||
if isinstance(target, type):
|
||||
self.__disabled_classes.add(identifier)
|
||||
logger.debug(f"Disabled event handler class: {identifier}")
|
||||
else:
|
||||
self.__disabled_handlers.add(handler_name)
|
||||
logger.debug(f"Disabled event handler: {handler_name}")
|
||||
self.__disabled_handlers.add(identifier)
|
||||
logger.debug(f"Disabled event handler: {identifier}")
|
||||
|
||||
def enable_event_handler(self, handler_name: str, class_name: Optional[str] = None):
|
||||
def enable_event_handler(self, target: Union[Callable, type]):
|
||||
"""
|
||||
启用指定名称的事件处理器或事件处理类,使其可以继续响应事件
|
||||
:param handler_name: 要启用的事件处理器名称
|
||||
:param class_name: 可选,要启用的事件处理器类名称。如果提供,将启用该类的所有处理器
|
||||
启用指定的事件处理器或事件处理器类
|
||||
:param target: 处理器函数或类
|
||||
"""
|
||||
if class_name:
|
||||
self.__disabled_classes.discard(class_name)
|
||||
logger.debug(f"Enabled event handler class: {class_name}")
|
||||
identifier = self.__get_handler_identifier(target)
|
||||
if isinstance(target, type):
|
||||
self.__disabled_classes.discard(identifier)
|
||||
logger.debug(f"Enabled event handler class: {identifier}")
|
||||
else:
|
||||
self.__disabled_handlers.discard(handler_name)
|
||||
logger.debug(f"Enabled event handler: {handler_name}")
|
||||
self.__disabled_handlers.discard(identifier)
|
||||
logger.debug(f"Enabled event handler: {identifier}")
|
||||
|
||||
def check(self, etype: Union[EventType, ChainEventType]) -> bool:
|
||||
"""
|
||||
检查是否有启用的事件处理器可以响应某个事件类型
|
||||
:param etype: 事件类型 (EventType 或 ChainEventType)
|
||||
:return: 返回是否存在可用的处理器
|
||||
"""
|
||||
if isinstance(etype, ChainEventType):
|
||||
handlers = self.__chain_subscribers.get(etype, [])
|
||||
return any(
|
||||
handler.__name__ not in self.__disabled_handlers and
|
||||
handler.__qualname__.split(".")[0] not in self.__disabled_classes
|
||||
for _, handler in handlers
|
||||
)
|
||||
else:
|
||||
handlers = self.__broadcast_subscribers.get(etype, [])
|
||||
return any(
|
||||
handler.__name__ not in self.__disabled_handlers and
|
||||
handler.__qualname__.split(".")[0] not in self.__disabled_classes
|
||||
for handler in handlers
|
||||
)
|
||||
|
||||
def visualize_handlers(self) -> List[Dict[str, str]]:
|
||||
def visualize_handlers(self) -> List[Dict]:
|
||||
"""
|
||||
可视化所有事件处理器,包括是否被禁用的状态
|
||||
:return: 处理器列表,包含处理器名称、类名和状态
|
||||
:return: 处理器列表,包含事件类型、处理器标识符、优先级(如果有)和状态
|
||||
"""
|
||||
handler_info = []
|
||||
with self.__lock:
|
||||
for event_type, handlers in self.__broadcast_subscribers.items():
|
||||
for handler in handlers:
|
||||
class_name = handler.__qualname__.split(".")[0]
|
||||
status = (
|
||||
"disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled"
|
||||
)
|
||||
handler_info.append({
|
||||
"event_type": event_type.value,
|
||||
"handler_name": handler.__name__,
|
||||
"class_name": class_name,
|
||||
"status": status
|
||||
})
|
||||
for event_type, handlers in self.__chain_subscribers.items():
|
||||
for priority, handler in handlers:
|
||||
class_name = handler.__qualname__.split(".")[0]
|
||||
status = (
|
||||
"disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled"
|
||||
)
|
||||
handler_info.append({
|
||||
"event_type": event_type.value,
|
||||
"handler_name": handler.__name__,
|
||||
"class_name": class_name,
|
||||
"priority": priority,
|
||||
"status": status
|
||||
})
|
||||
# 统一处理广播事件和链式事件
|
||||
for event_type, subscribers in {**self.__broadcast_subscribers, **self.__chain_subscribers}.items():
|
||||
for handler_data in subscribers:
|
||||
if isinstance(subscribers, dict):
|
||||
priority, handler = handler_data
|
||||
else:
|
||||
priority = None
|
||||
handler = handler_data
|
||||
# 获取处理器的唯一标识符
|
||||
handler_id = self.__get_handler_identifier(handler)
|
||||
# 检查处理器的启用状态
|
||||
status = "enabled" if self.__is_handler_enabled(handler) else "disabled"
|
||||
# 构建处理器信息字典
|
||||
handler_dict = {
|
||||
"event_type": event_type.value,
|
||||
"handler_identifier": handler_id,
|
||||
"status": status
|
||||
}
|
||||
if priority is not None:
|
||||
handler_dict["priority"] = priority
|
||||
handler_info.append(handler_dict)
|
||||
return handler_info
|
||||
|
||||
def __trigger_chain_event(self, event: Event) -> Dict:
|
||||
@staticmethod
|
||||
def __get_handler_identifier(target: Union[Callable, type]) -> str:
|
||||
"""
|
||||
触发链式事件,按顺序调用订阅的处理器
|
||||
:param event: 要处理的事件对象
|
||||
:return: 返回处理后的事件数据
|
||||
获取处理器或处理器类的唯一标识符,包括模块名和类名
|
||||
:param target: 处理器函数或类
|
||||
:return: 唯一标识符
|
||||
"""
|
||||
if isinstance(target, type):
|
||||
# 如果是类,使用模块名和类名
|
||||
module_name = target.__module__
|
||||
class_name = target.__qualname__
|
||||
return f"{module_name}.{class_name}"
|
||||
else:
|
||||
# 如果是函数或方法,使用 inspect.getmodule 来获取模块名
|
||||
module = inspect.getmodule(target)
|
||||
module_name = module.__name__ if module else "unknown_module"
|
||||
qualname = target.__qualname__
|
||||
return f"{module_name}.{qualname}"
|
||||
|
||||
def __is_handler_enabled(self, handler: Callable) -> bool:
|
||||
"""
|
||||
检查处理器是否已启用(没有被禁用)
|
||||
:param handler: 处理器函数
|
||||
:return: 如果处理器启用则返回 True,否则返回 False
|
||||
"""
|
||||
# 获取处理器的唯一标识符
|
||||
handler_id = self.__get_handler_identifier(handler)
|
||||
|
||||
# 获取处理器所属类的唯一标识符
|
||||
class_id = self.__get_handler_identifier(handler.__self__.__class__) if hasattr(handler, '__self__') else None
|
||||
|
||||
# 检查处理器或类是否被禁用,只要其中之一被禁用则返回 False
|
||||
if handler_id in self.__disabled_handlers or (class_id is not None and class_id in self.__disabled_classes):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def __trigger_chain_event(self, event: Event) -> Event:
|
||||
"""
|
||||
触发链式事件,按顺序调用订阅的处理器,并记录处理耗时
|
||||
"""
|
||||
logger.debug(f"Triggering synchronous chain event: {event}")
|
||||
self.__dispatch_chain_event(event)
|
||||
return event.event_data
|
||||
return event
|
||||
|
||||
def __trigger_broadcast_event(self, event: Event):
|
||||
"""
|
||||
@@ -231,22 +240,18 @@ class EventManager(metaclass=Singleton):
|
||||
|
||||
def __dispatch_chain_event(self, event: Event):
|
||||
"""
|
||||
同步方式调度链式事件,按优先级顺序逐个调用事件处理器
|
||||
同步方式调度链式事件,按优先级顺序逐个调用事件处理器,并记录每个处理器的处理时间
|
||||
:param event: 要调度的事件对象
|
||||
"""
|
||||
handlers = self.__chain_subscribers.get(event.event_type, [])
|
||||
self.__log_event_lifecycle(event, "started")
|
||||
for priority, handler in handlers:
|
||||
class_name = handler.__qualname__.split(".")[0]
|
||||
if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes:
|
||||
start_time = time.time()
|
||||
try:
|
||||
handler(event.event_data)
|
||||
logger.debug(
|
||||
f"Handler {handler.__qualname__} (Priority: {priority}) "
|
||||
f"completed in {time.time() - start_time:.3f}s")
|
||||
except Exception as e:
|
||||
self.__handle_event_error(event, handler, e)
|
||||
start_time = time.time()
|
||||
self.__safe_invoke_handler(handler, event)
|
||||
logger.debug(
|
||||
f"Handler {handler.__qualname__} (Priority: {priority}) "
|
||||
f"completed in {time.time() - start_time:.3f}s")
|
||||
|
||||
self.__log_event_lifecycle(event, "completed")
|
||||
|
||||
def __dispatch_broadcast_event(self, event: Event):
|
||||
@@ -256,18 +261,19 @@ class EventManager(metaclass=Singleton):
|
||||
"""
|
||||
handlers = self.__broadcast_subscribers.get(event.event_type, [])
|
||||
for handler in handlers:
|
||||
class_name = handler.__qualname__.split(".")[0]
|
||||
if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes:
|
||||
self.__executor.submit(self.__safe_invoke_handler, handler, event)
|
||||
self.__executor.submit(self.__safe_invoke_handler, handler, event)
|
||||
|
||||
def __safe_invoke_handler(self, handler: Callable[[Dict], None], event: Event):
|
||||
def __safe_invoke_handler(self, handler: Callable, event: Event):
|
||||
"""
|
||||
安全调用事件处理器,捕获异常并记录日志
|
||||
:param handler: 要调用的处理器
|
||||
:param event: 事件对象
|
||||
"""
|
||||
if not self.__is_handler_enabled(handler):
|
||||
logger.debug(f"Handler {handler.__qualname__} is disabled. Skipping execution.")
|
||||
return
|
||||
try:
|
||||
handler(event.event_data)
|
||||
handler(event)
|
||||
except Exception as e:
|
||||
self.__handle_event_error(event, handler, e)
|
||||
|
||||
@@ -316,7 +322,7 @@ class EventManager(metaclass=Singleton):
|
||||
- 或事件类型成员的列表
|
||||
"""
|
||||
|
||||
def decorator(f: Callable[[Dict], None]):
|
||||
def decorator(f: Callable):
|
||||
event_list = []
|
||||
|
||||
# 如果传入的是列表,处理每个事件类型
|
||||
|
||||
@@ -7,7 +7,6 @@ import traceback
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Type
|
||||
|
||||
from app.helper.sites import SitesHelper
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
from watchdog.observers import Observer
|
||||
|
||||
@@ -18,6 +17,7 @@ from app.db.plugindata_oper import PluginDataOper
|
||||
from app.db.systemconfig_oper import SystemConfigOper
|
||||
from app.helper.module import ModuleHelper
|
||||
from app.helper.plugin import PluginHelper
|
||||
from app.helper.sites import SitesHelper
|
||||
from app.log import logger
|
||||
from app.schemas.types import SystemConfigKey
|
||||
from app.utils.crypto import RSAUtils
|
||||
@@ -156,7 +156,7 @@ class PluginManager(metaclass=Singleton):
|
||||
# 未安装的不加载
|
||||
if plugin_id not in installed_plugins:
|
||||
# 设置事件状态为不可用
|
||||
eventmanager.disable_event_handler(class_name=plugin_id)
|
||||
eventmanager.disable_event_handler(plugin)
|
||||
continue
|
||||
# 生成实例
|
||||
plugin_obj = plugin()
|
||||
@@ -167,9 +167,9 @@ class PluginManager(metaclass=Singleton):
|
||||
logger.info(f"加载插件:{plugin_id} 版本:{plugin_obj.plugin_version}")
|
||||
# 启用的插件才设置事件注册状态可用
|
||||
if plugin_obj.get_state():
|
||||
eventmanager.enable_event_handler(class_name=plugin_id)
|
||||
eventmanager.enable_event_handler(plugin)
|
||||
else:
|
||||
eventmanager.disable_event_handler(class_name=plugin_id)
|
||||
eventmanager.disable_event_handler(plugin)
|
||||
except Exception as err:
|
||||
logger.error(f"加载插件 {plugin_id} 出错:{str(err)} - {traceback.format_exc()}")
|
||||
|
||||
@@ -179,15 +179,18 @@ class PluginManager(metaclass=Singleton):
|
||||
:param plugin_id: 插件ID
|
||||
:param conf: 插件配置
|
||||
"""
|
||||
if not self._running_plugins.get(plugin_id):
|
||||
plugin = self._running_plugins.get(plugin_id)
|
||||
if not plugin:
|
||||
return
|
||||
self._running_plugins[plugin_id].init_plugin(conf)
|
||||
if self._running_plugins[plugin_id].get_state():
|
||||
# 设置启用的插件事件注册状态可用
|
||||
eventmanager.enable_event_handler(class_name=plugin_id)
|
||||
# 初始化插件
|
||||
plugin.init_plugin(conf)
|
||||
# 检查插件状态并启用/禁用事件处理器
|
||||
if plugin.get_state():
|
||||
# 启用插件类的事件处理器
|
||||
eventmanager.enable_event_handler(type(plugin))
|
||||
else:
|
||||
# 设置事件状态为不可用
|
||||
eventmanager.disable_event_handler(class_name=plugin_id)
|
||||
# 禁用插件类的事件处理器
|
||||
eventmanager.disable_event_handler(type(plugin))
|
||||
|
||||
def stop(self, pid: str = None):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user