mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-04-07 20:59:19 +08:00
feat(event): optimize handler
This commit is contained in:
@@ -1,3 +1,4 @@
|
|||||||
|
import inspect
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
@@ -60,9 +61,8 @@ class EventManager(metaclass=Singleton):
|
|||||||
"""
|
"""
|
||||||
self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件
|
self.__executor = ThreadHelper(max_workers=max_workers) # 动态线程池,用于消费事件
|
||||||
self.__event_queue = PriorityQueue() # 优先级队列
|
self.__event_queue = PriorityQueue() # 优先级队列
|
||||||
self.__broadcast_subscribers: Dict[EventType, List[Callable[[Dict], None]]] = {} # 广播事件的订阅者
|
self.__broadcast_subscribers: Dict[EventType, List[Callable]] = {} # 广播事件的订阅者
|
||||||
self.__chain_subscribers: Dict[
|
self.__chain_subscribers: Dict[ChainEventType, List[tuple[int, Callable]]] = {} # 链式事件的订阅者
|
||||||
ChainEventType, List[tuple[int, Callable[[Dict], None]]]] = {} # 链式事件的订阅者(优先级+处理器)
|
|
||||||
self.__disabled_handlers = set() # 禁用的事件处理器集合
|
self.__disabled_handlers = set() # 禁用的事件处理器集合
|
||||||
self.__disabled_classes = set() # 禁用的事件处理器类集合
|
self.__disabled_classes = set() # 禁用的事件处理器类集合
|
||||||
self.__lock = threading.Lock() # 线程锁
|
self.__lock = threading.Lock() # 线程锁
|
||||||
@@ -73,7 +73,7 @@ class EventManager(metaclass=Singleton):
|
|||||||
threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True).start()
|
threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True).start()
|
||||||
|
|
||||||
def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dict] = None,
|
def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dict] = None,
|
||||||
priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Dict]:
|
priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Event]:
|
||||||
"""
|
"""
|
||||||
发送事件,根据事件类型决定是广播事件还是链式事件
|
发送事件,根据事件类型决定是广播事件还是链式事件
|
||||||
:param etype: 事件类型 (EventType 或 ChainEventType)
|
:param etype: 事件类型 (EventType 或 ChainEventType)
|
||||||
@@ -91,7 +91,7 @@ class EventManager(metaclass=Singleton):
|
|||||||
else:
|
else:
|
||||||
logger.error(f"Unknown event type: {etype}")
|
logger.error(f"Unknown event type: {etype}")
|
||||||
|
|
||||||
def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None],
|
def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable,
|
||||||
priority: int = DEFAULT_EVENT_PRIORITY):
|
priority: int = DEFAULT_EVENT_PRIORITY):
|
||||||
"""
|
"""
|
||||||
注册事件处理器,将处理器添加到对应的事件订阅列表中
|
注册事件处理器,将处理器添加到对应的事件订阅列表中
|
||||||
@@ -115,7 +115,7 @@ class EventManager(metaclass=Singleton):
|
|||||||
self.__broadcast_subscribers[event_type].append(handler)
|
self.__broadcast_subscribers[event_type].append(handler)
|
||||||
logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}")
|
logger.debug(f"Subscribed to broadcast event: {event_type.value}, Handler: {handler.__name__}")
|
||||||
|
|
||||||
def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable[[Dict], None]):
|
def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable):
|
||||||
"""
|
"""
|
||||||
移除事件处理器,将处理器从对应事件的订阅列表中删除
|
移除事件处理器,将处理器从对应事件的订阅列表中删除
|
||||||
:param event_type: 事件类型 (EventType 或 ChainEventType)
|
:param event_type: 事件类型 (EventType 或 ChainEventType)
|
||||||
@@ -130,96 +130,105 @@ class EventManager(metaclass=Singleton):
|
|||||||
self.__broadcast_subscribers[event_type].remove(handler)
|
self.__broadcast_subscribers[event_type].remove(handler)
|
||||||
logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}")
|
logger.debug(f"Unsubscribed from broadcast event: {event_type.value}, Handler: {handler.__name__}")
|
||||||
|
|
||||||
def disable_event_handler(self, handler_name: str, class_name: Optional[str] = None):
|
def disable_event_handler(self, target: Union[Callable, type]):
|
||||||
"""
|
"""
|
||||||
禁用指定名称的事件处理器或事件处理类,防止其响应事件
|
禁用指定的事件处理器或事件处理器类
|
||||||
:param handler_name: 要禁用的事件处理器名称
|
:param target: 处理器函数或类
|
||||||
:param class_name: 可选,要禁用的事件处理器类名称。如果提供,将禁用该类的所有处理器
|
|
||||||
"""
|
"""
|
||||||
if class_name:
|
identifier = self.__get_handler_identifier(target)
|
||||||
self.__disabled_classes.add(class_name)
|
if isinstance(target, type):
|
||||||
logger.debug(f"Disabled event handler class: {class_name}")
|
self.__disabled_classes.add(identifier)
|
||||||
|
logger.debug(f"Disabled event handler class: {identifier}")
|
||||||
else:
|
else:
|
||||||
self.__disabled_handlers.add(handler_name)
|
self.__disabled_handlers.add(identifier)
|
||||||
logger.debug(f"Disabled event handler: {handler_name}")
|
logger.debug(f"Disabled event handler: {identifier}")
|
||||||
|
|
||||||
def enable_event_handler(self, handler_name: str, class_name: Optional[str] = None):
|
def enable_event_handler(self, target: Union[Callable, type]):
|
||||||
"""
|
"""
|
||||||
启用指定名称的事件处理器或事件处理类,使其可以继续响应事件
|
启用指定的事件处理器或事件处理器类
|
||||||
:param handler_name: 要启用的事件处理器名称
|
:param target: 处理器函数或类
|
||||||
:param class_name: 可选,要启用的事件处理器类名称。如果提供,将启用该类的所有处理器
|
|
||||||
"""
|
"""
|
||||||
if class_name:
|
identifier = self.__get_handler_identifier(target)
|
||||||
self.__disabled_classes.discard(class_name)
|
if isinstance(target, type):
|
||||||
logger.debug(f"Enabled event handler class: {class_name}")
|
self.__disabled_classes.discard(identifier)
|
||||||
|
logger.debug(f"Enabled event handler class: {identifier}")
|
||||||
else:
|
else:
|
||||||
self.__disabled_handlers.discard(handler_name)
|
self.__disabled_handlers.discard(identifier)
|
||||||
logger.debug(f"Enabled event handler: {handler_name}")
|
logger.debug(f"Enabled event handler: {identifier}")
|
||||||
|
|
||||||
def check(self, etype: Union[EventType, ChainEventType]) -> bool:
|
def visualize_handlers(self) -> List[Dict]:
|
||||||
"""
|
|
||||||
检查是否有启用的事件处理器可以响应某个事件类型
|
|
||||||
:param etype: 事件类型 (EventType 或 ChainEventType)
|
|
||||||
:return: 返回是否存在可用的处理器
|
|
||||||
"""
|
|
||||||
if isinstance(etype, ChainEventType):
|
|
||||||
handlers = self.__chain_subscribers.get(etype, [])
|
|
||||||
return any(
|
|
||||||
handler.__name__ not in self.__disabled_handlers and
|
|
||||||
handler.__qualname__.split(".")[0] not in self.__disabled_classes
|
|
||||||
for _, handler in handlers
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
handlers = self.__broadcast_subscribers.get(etype, [])
|
|
||||||
return any(
|
|
||||||
handler.__name__ not in self.__disabled_handlers and
|
|
||||||
handler.__qualname__.split(".")[0] not in self.__disabled_classes
|
|
||||||
for handler in handlers
|
|
||||||
)
|
|
||||||
|
|
||||||
def visualize_handlers(self) -> List[Dict[str, str]]:
|
|
||||||
"""
|
"""
|
||||||
可视化所有事件处理器,包括是否被禁用的状态
|
可视化所有事件处理器,包括是否被禁用的状态
|
||||||
:return: 处理器列表,包含处理器名称、类名和状态
|
:return: 处理器列表,包含事件类型、处理器标识符、优先级(如果有)和状态
|
||||||
"""
|
"""
|
||||||
handler_info = []
|
handler_info = []
|
||||||
with self.__lock:
|
# 统一处理广播事件和链式事件
|
||||||
for event_type, handlers in self.__broadcast_subscribers.items():
|
for event_type, subscribers in {**self.__broadcast_subscribers, **self.__chain_subscribers}.items():
|
||||||
for handler in handlers:
|
for handler_data in subscribers:
|
||||||
class_name = handler.__qualname__.split(".")[0]
|
if isinstance(subscribers, dict):
|
||||||
status = (
|
priority, handler = handler_data
|
||||||
"disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled"
|
else:
|
||||||
)
|
priority = None
|
||||||
handler_info.append({
|
handler = handler_data
|
||||||
"event_type": event_type.value,
|
# 获取处理器的唯一标识符
|
||||||
"handler_name": handler.__name__,
|
handler_id = self.__get_handler_identifier(handler)
|
||||||
"class_name": class_name,
|
# 检查处理器的启用状态
|
||||||
"status": status
|
status = "enabled" if self.__is_handler_enabled(handler) else "disabled"
|
||||||
})
|
# 构建处理器信息字典
|
||||||
for event_type, handlers in self.__chain_subscribers.items():
|
handler_dict = {
|
||||||
for priority, handler in handlers:
|
"event_type": event_type.value,
|
||||||
class_name = handler.__qualname__.split(".")[0]
|
"handler_identifier": handler_id,
|
||||||
status = (
|
"status": status
|
||||||
"disabled" if handler.__name__ in self.__disabled_handlers or class_name in self.__disabled_classes else "enabled"
|
}
|
||||||
)
|
if priority is not None:
|
||||||
handler_info.append({
|
handler_dict["priority"] = priority
|
||||||
"event_type": event_type.value,
|
handler_info.append(handler_dict)
|
||||||
"handler_name": handler.__name__,
|
|
||||||
"class_name": class_name,
|
|
||||||
"priority": priority,
|
|
||||||
"status": status
|
|
||||||
})
|
|
||||||
return handler_info
|
return handler_info
|
||||||
|
|
||||||
def __trigger_chain_event(self, event: Event) -> Dict:
|
@staticmethod
|
||||||
|
def __get_handler_identifier(target: Union[Callable, type]) -> str:
|
||||||
"""
|
"""
|
||||||
触发链式事件,按顺序调用订阅的处理器
|
获取处理器或处理器类的唯一标识符,包括模块名和类名
|
||||||
:param event: 要处理的事件对象
|
:param target: 处理器函数或类
|
||||||
:return: 返回处理后的事件数据
|
:return: 唯一标识符
|
||||||
|
"""
|
||||||
|
if isinstance(target, type):
|
||||||
|
# 如果是类,使用模块名和类名
|
||||||
|
module_name = target.__module__
|
||||||
|
class_name = target.__qualname__
|
||||||
|
return f"{module_name}.{class_name}"
|
||||||
|
else:
|
||||||
|
# 如果是函数或方法,使用 inspect.getmodule 来获取模块名
|
||||||
|
module = inspect.getmodule(target)
|
||||||
|
module_name = module.__name__ if module else "unknown_module"
|
||||||
|
qualname = target.__qualname__
|
||||||
|
return f"{module_name}.{qualname}"
|
||||||
|
|
||||||
|
def __is_handler_enabled(self, handler: Callable) -> bool:
|
||||||
|
"""
|
||||||
|
检查处理器是否已启用(没有被禁用)
|
||||||
|
:param handler: 处理器函数
|
||||||
|
:return: 如果处理器启用则返回 True,否则返回 False
|
||||||
|
"""
|
||||||
|
# 获取处理器的唯一标识符
|
||||||
|
handler_id = self.__get_handler_identifier(handler)
|
||||||
|
|
||||||
|
# 获取处理器所属类的唯一标识符
|
||||||
|
class_id = self.__get_handler_identifier(handler.__self__.__class__) if hasattr(handler, '__self__') else None
|
||||||
|
|
||||||
|
# 检查处理器或类是否被禁用,只要其中之一被禁用则返回 False
|
||||||
|
if handler_id in self.__disabled_handlers or (class_id is not None and class_id in self.__disabled_classes):
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def __trigger_chain_event(self, event: Event) -> Event:
|
||||||
|
"""
|
||||||
|
触发链式事件,按顺序调用订阅的处理器,并记录处理耗时
|
||||||
"""
|
"""
|
||||||
logger.debug(f"Triggering synchronous chain event: {event}")
|
logger.debug(f"Triggering synchronous chain event: {event}")
|
||||||
self.__dispatch_chain_event(event)
|
self.__dispatch_chain_event(event)
|
||||||
return event.event_data
|
return event
|
||||||
|
|
||||||
def __trigger_broadcast_event(self, event: Event):
|
def __trigger_broadcast_event(self, event: Event):
|
||||||
"""
|
"""
|
||||||
@@ -231,22 +240,18 @@ class EventManager(metaclass=Singleton):
|
|||||||
|
|
||||||
def __dispatch_chain_event(self, event: Event):
|
def __dispatch_chain_event(self, event: Event):
|
||||||
"""
|
"""
|
||||||
同步方式调度链式事件,按优先级顺序逐个调用事件处理器
|
同步方式调度链式事件,按优先级顺序逐个调用事件处理器,并记录每个处理器的处理时间
|
||||||
:param event: 要调度的事件对象
|
:param event: 要调度的事件对象
|
||||||
"""
|
"""
|
||||||
handlers = self.__chain_subscribers.get(event.event_type, [])
|
handlers = self.__chain_subscribers.get(event.event_type, [])
|
||||||
self.__log_event_lifecycle(event, "started")
|
self.__log_event_lifecycle(event, "started")
|
||||||
for priority, handler in handlers:
|
for priority, handler in handlers:
|
||||||
class_name = handler.__qualname__.split(".")[0]
|
start_time = time.time()
|
||||||
if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes:
|
self.__safe_invoke_handler(handler, event)
|
||||||
start_time = time.time()
|
logger.debug(
|
||||||
try:
|
f"Handler {handler.__qualname__} (Priority: {priority}) "
|
||||||
handler(event.event_data)
|
f"completed in {time.time() - start_time:.3f}s")
|
||||||
logger.debug(
|
|
||||||
f"Handler {handler.__qualname__} (Priority: {priority}) "
|
|
||||||
f"completed in {time.time() - start_time:.3f}s")
|
|
||||||
except Exception as e:
|
|
||||||
self.__handle_event_error(event, handler, e)
|
|
||||||
self.__log_event_lifecycle(event, "completed")
|
self.__log_event_lifecycle(event, "completed")
|
||||||
|
|
||||||
def __dispatch_broadcast_event(self, event: Event):
|
def __dispatch_broadcast_event(self, event: Event):
|
||||||
@@ -256,18 +261,19 @@ class EventManager(metaclass=Singleton):
|
|||||||
"""
|
"""
|
||||||
handlers = self.__broadcast_subscribers.get(event.event_type, [])
|
handlers = self.__broadcast_subscribers.get(event.event_type, [])
|
||||||
for handler in handlers:
|
for handler in handlers:
|
||||||
class_name = handler.__qualname__.split(".")[0]
|
self.__executor.submit(self.__safe_invoke_handler, handler, event)
|
||||||
if handler.__name__ not in self.__disabled_handlers and class_name not in self.__disabled_classes:
|
|
||||||
self.__executor.submit(self.__safe_invoke_handler, handler, event)
|
|
||||||
|
|
||||||
def __safe_invoke_handler(self, handler: Callable[[Dict], None], event: Event):
|
def __safe_invoke_handler(self, handler: Callable, event: Event):
|
||||||
"""
|
"""
|
||||||
安全调用事件处理器,捕获异常并记录日志
|
安全调用事件处理器,捕获异常并记录日志
|
||||||
:param handler: 要调用的处理器
|
:param handler: 要调用的处理器
|
||||||
:param event: 事件对象
|
:param event: 事件对象
|
||||||
"""
|
"""
|
||||||
|
if not self.__is_handler_enabled(handler):
|
||||||
|
logger.debug(f"Handler {handler.__qualname__} is disabled. Skipping execution.")
|
||||||
|
return
|
||||||
try:
|
try:
|
||||||
handler(event.event_data)
|
handler(event)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.__handle_event_error(event, handler, e)
|
self.__handle_event_error(event, handler, e)
|
||||||
|
|
||||||
@@ -316,7 +322,7 @@ class EventManager(metaclass=Singleton):
|
|||||||
- 或事件类型成员的列表
|
- 或事件类型成员的列表
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def decorator(f: Callable[[Dict], None]):
|
def decorator(f: Callable):
|
||||||
event_list = []
|
event_list = []
|
||||||
|
|
||||||
# 如果传入的是列表,处理每个事件类型
|
# 如果传入的是列表,处理每个事件类型
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ import traceback
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Type
|
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, Type
|
||||||
|
|
||||||
from app.helper.sites import SitesHelper
|
|
||||||
from watchdog.events import FileSystemEventHandler
|
from watchdog.events import FileSystemEventHandler
|
||||||
from watchdog.observers import Observer
|
from watchdog.observers import Observer
|
||||||
|
|
||||||
@@ -18,6 +17,7 @@ from app.db.plugindata_oper import PluginDataOper
|
|||||||
from app.db.systemconfig_oper import SystemConfigOper
|
from app.db.systemconfig_oper import SystemConfigOper
|
||||||
from app.helper.module import ModuleHelper
|
from app.helper.module import ModuleHelper
|
||||||
from app.helper.plugin import PluginHelper
|
from app.helper.plugin import PluginHelper
|
||||||
|
from app.helper.sites import SitesHelper
|
||||||
from app.log import logger
|
from app.log import logger
|
||||||
from app.schemas.types import SystemConfigKey
|
from app.schemas.types import SystemConfigKey
|
||||||
from app.utils.crypto import RSAUtils
|
from app.utils.crypto import RSAUtils
|
||||||
@@ -156,7 +156,7 @@ class PluginManager(metaclass=Singleton):
|
|||||||
# 未安装的不加载
|
# 未安装的不加载
|
||||||
if plugin_id not in installed_plugins:
|
if plugin_id not in installed_plugins:
|
||||||
# 设置事件状态为不可用
|
# 设置事件状态为不可用
|
||||||
eventmanager.disable_event_handler(class_name=plugin_id)
|
eventmanager.disable_event_handler(plugin)
|
||||||
continue
|
continue
|
||||||
# 生成实例
|
# 生成实例
|
||||||
plugin_obj = plugin()
|
plugin_obj = plugin()
|
||||||
@@ -167,9 +167,9 @@ class PluginManager(metaclass=Singleton):
|
|||||||
logger.info(f"加载插件:{plugin_id} 版本:{plugin_obj.plugin_version}")
|
logger.info(f"加载插件:{plugin_id} 版本:{plugin_obj.plugin_version}")
|
||||||
# 启用的插件才设置事件注册状态可用
|
# 启用的插件才设置事件注册状态可用
|
||||||
if plugin_obj.get_state():
|
if plugin_obj.get_state():
|
||||||
eventmanager.enable_event_handler(class_name=plugin_id)
|
eventmanager.enable_event_handler(plugin)
|
||||||
else:
|
else:
|
||||||
eventmanager.disable_event_handler(class_name=plugin_id)
|
eventmanager.disable_event_handler(plugin)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.error(f"加载插件 {plugin_id} 出错:{str(err)} - {traceback.format_exc()}")
|
logger.error(f"加载插件 {plugin_id} 出错:{str(err)} - {traceback.format_exc()}")
|
||||||
|
|
||||||
@@ -179,15 +179,18 @@ class PluginManager(metaclass=Singleton):
|
|||||||
:param plugin_id: 插件ID
|
:param plugin_id: 插件ID
|
||||||
:param conf: 插件配置
|
:param conf: 插件配置
|
||||||
"""
|
"""
|
||||||
if not self._running_plugins.get(plugin_id):
|
plugin = self._running_plugins.get(plugin_id)
|
||||||
|
if not plugin:
|
||||||
return
|
return
|
||||||
self._running_plugins[plugin_id].init_plugin(conf)
|
# 初始化插件
|
||||||
if self._running_plugins[plugin_id].get_state():
|
plugin.init_plugin(conf)
|
||||||
# 设置启用的插件事件注册状态可用
|
# 检查插件状态并启用/禁用事件处理器
|
||||||
eventmanager.enable_event_handler(class_name=plugin_id)
|
if plugin.get_state():
|
||||||
|
# 启用插件类的事件处理器
|
||||||
|
eventmanager.enable_event_handler(type(plugin))
|
||||||
else:
|
else:
|
||||||
# 设置事件状态为不可用
|
# 禁用插件类的事件处理器
|
||||||
eventmanager.disable_event_handler(class_name=plugin_id)
|
eventmanager.disable_event_handler(type(plugin))
|
||||||
|
|
||||||
def stop(self, pid: str = None):
|
def stop(self, pid: str = None):
|
||||||
"""
|
"""
|
||||||
|
|||||||
Reference in New Issue
Block a user