mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-04-14 10:10:20 +08:00
Merge pull request #2758 from InfinityPacer/feature/event
This commit is contained in:
@@ -1,8 +1,4 @@
|
||||
import copy
|
||||
import importlib
|
||||
import threading
|
||||
import traceback
|
||||
from threading import Thread
|
||||
from typing import Any, Union, Dict
|
||||
|
||||
from app.chain import ChainBase
|
||||
@@ -12,11 +8,9 @@ from app.chain.subscribe import SubscribeChain
|
||||
from app.chain.system import SystemChain
|
||||
from app.chain.transfer import TransferChain
|
||||
from app.core.config import settings
|
||||
from app.core.event import Event as ManagerEvent
|
||||
from app.core.event import eventmanager, EventManager
|
||||
from app.core.event import Event as ManagerEvent, eventmanager
|
||||
from app.core.plugin import PluginManager
|
||||
from app.helper.message import MessageHelper
|
||||
from app.helper.thread import ThreadHelper
|
||||
from app.log import logger
|
||||
from app.scheduler import Scheduler
|
||||
from app.schemas import Notification
|
||||
@@ -41,12 +35,7 @@ class Command(metaclass=Singleton):
|
||||
# 内建命令
|
||||
_commands = {}
|
||||
|
||||
# 退出事件
|
||||
_event = threading.Event()
|
||||
|
||||
def __init__(self):
|
||||
# 事件管理器
|
||||
self.eventmanager = EventManager()
|
||||
# 插件管理器
|
||||
self.pluginmanager = PluginManager()
|
||||
# 处理链
|
||||
@@ -55,8 +44,6 @@ class Command(metaclass=Singleton):
|
||||
self.scheduler = Scheduler()
|
||||
# 消息管理器
|
||||
self.messagehelper = MessageHelper()
|
||||
# 线程管理器
|
||||
self.threader = ThreadHelper()
|
||||
# 内置命令
|
||||
self._commands = {
|
||||
"/cookiecloud": {
|
||||
@@ -172,72 +159,9 @@ class Command(metaclass=Singleton):
|
||||
# 广播注册命令菜单
|
||||
if not settings.DEV:
|
||||
self.chain.register_commands(commands=self.get_commands())
|
||||
# 消息处理线程
|
||||
self._thread = Thread(target=self.__run)
|
||||
# 启动事件处理线程
|
||||
self._thread.start()
|
||||
# 重启msg
|
||||
SystemChain().restart_finish()
|
||||
|
||||
def __run(self):
|
||||
"""
|
||||
事件处理线程
|
||||
"""
|
||||
while not self._event.is_set():
|
||||
event, handlers = self.eventmanager.get_event()
|
||||
if event:
|
||||
logger.info(f"处理事件:{event.event_type} - {handlers}")
|
||||
if not handlers and event.event_callback:
|
||||
event.event_callback()
|
||||
for handler in handlers:
|
||||
names = handler.__qualname__.split(".")
|
||||
[class_name, method_name] = names
|
||||
try:
|
||||
if class_name in self.pluginmanager.get_plugin_ids():
|
||||
# 插件事件
|
||||
result = self.threader.submit(
|
||||
self.pluginmanager.run_plugin_method,
|
||||
class_name, method_name, copy.deepcopy(event)
|
||||
)
|
||||
if event.event_callback:
|
||||
event.event_callback(result)
|
||||
else:
|
||||
# 检查全局变量中是否存在
|
||||
if class_name not in globals():
|
||||
# 导入模块,除了插件和Command本身,只有chain能响应事件
|
||||
try:
|
||||
module = importlib.import_module(
|
||||
f"app.chain.{class_name[:-5].lower()}"
|
||||
)
|
||||
class_obj = getattr(module, class_name)()
|
||||
except Exception as e:
|
||||
logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}")
|
||||
continue
|
||||
else:
|
||||
# 通过类名创建类实例
|
||||
class_obj = globals()[class_name]()
|
||||
# 检查类是否存在并调用方法
|
||||
if hasattr(class_obj, method_name):
|
||||
self.threader.submit(
|
||||
getattr(class_obj, method_name),
|
||||
copy.deepcopy(event)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}")
|
||||
self.messagehelper.put(title=f"{event.event_type} 事件处理出错",
|
||||
message=f"{class_name}.{method_name}:{str(e)}",
|
||||
role="system")
|
||||
self.eventmanager.send_event(
|
||||
EventType.SystemError,
|
||||
{
|
||||
"type": "event",
|
||||
"event_type": event.event_type,
|
||||
"event_handle": f"{class_name}.{method_name}",
|
||||
"error": str(e),
|
||||
"traceback": traceback.format_exc()
|
||||
}
|
||||
)
|
||||
|
||||
def __run_command(self, command: Dict[str, any], data_str: str = "",
|
||||
channel: MessageChannel = None, source: str = None, userid: Union[str, int] = None):
|
||||
"""
|
||||
@@ -292,18 +216,6 @@ class Command(metaclass=Singleton):
|
||||
# 没有参数
|
||||
command['func']()
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
停止事件处理线程
|
||||
"""
|
||||
logger.info("正在停止事件处理...")
|
||||
self._event.set()
|
||||
try:
|
||||
self._thread.join()
|
||||
logger.info("事件处理停止完成")
|
||||
except Exception as e:
|
||||
logger.error(f"停止事件处理线程出错:{str(e)} - {traceback.format_exc()}")
|
||||
|
||||
def get_commands(self):
|
||||
"""
|
||||
获取命令列表
|
||||
@@ -361,7 +273,7 @@ class Command(metaclass=Singleton):
|
||||
"""
|
||||
发送插件命令
|
||||
"""
|
||||
EventManager().send_event(etype, data)
|
||||
eventmanager.send_event(etype, data)
|
||||
|
||||
@eventmanager.register(EventType.CommandExcute)
|
||||
def command_event(self, event: ManagerEvent) -> None:
|
||||
|
||||
@@ -1,150 +1,461 @@
|
||||
import copy
|
||||
import importlib
|
||||
import inspect
|
||||
import threading
|
||||
import typing
|
||||
from queue import Queue, Empty
|
||||
from typing import Dict, Any
|
||||
import time
|
||||
import traceback
|
||||
import uuid
|
||||
from queue import PriorityQueue, Empty
|
||||
from typing import Callable, Dict, List, Union, Optional
|
||||
|
||||
from app.helper.message import MessageHelper
|
||||
from app.helper.thread import ThreadHelper
|
||||
from app.log import logger
|
||||
from app.schemas.types import EventType, ChainEventType
|
||||
from app.utils.singleton import Singleton
|
||||
from app.schemas.types import EventType
|
||||
|
||||
DEFAULT_EVENT_PRIORITY = 10 # 事件的默认优先级
|
||||
MIN_EVENT_CONSUMER_THREADS = 1 # 最小事件消费者线程数
|
||||
EVENT_QUEUE_IDLE_TIMEOUT_SECONDS = 30 # 事件队列空闲时的超时时间(秒)
|
||||
|
||||
|
||||
class Event:
|
||||
"""
|
||||
事件类,封装事件的基本信息
|
||||
"""
|
||||
|
||||
def __init__(self, event_type: Union[EventType, ChainEventType], event_data: Optional[Dict] = None,
|
||||
priority: int = DEFAULT_EVENT_PRIORITY):
|
||||
"""
|
||||
:param event_type: 事件的类型,支持 EventType 或 ChainEventType
|
||||
:param event_data: 可选,事件携带的数据,默认为空字典
|
||||
:param priority: 可选,事件的优先级,默认为 10
|
||||
"""
|
||||
self.event_id = str(uuid.uuid4()) # 事件ID
|
||||
self.event_type = event_type # 事件类型
|
||||
self.event_data = event_data or {} # 事件数据
|
||||
self.priority = priority # 事件优先级
|
||||
|
||||
def __repr__(self) -> str:
|
||||
"""
|
||||
重写 __repr__ 方法,用于返回事件的详细信息,包括事件类型、事件ID和优先级
|
||||
"""
|
||||
event_kind = Event.get_event_kind(self.event_type)
|
||||
return f"<{event_kind}: {self.event_type.value}, ID: {self.event_id}, Priority: {self.priority}>"
|
||||
|
||||
def __lt__(self, other):
|
||||
"""
|
||||
定义事件对象的比较规则,基于优先级比较
|
||||
优先级小的事件会被认为“更小”,优先级高的事件将被认为“更大”
|
||||
"""
|
||||
return self.priority < other.priority
|
||||
|
||||
@staticmethod
|
||||
def get_event_kind(event_type: Union[EventType, ChainEventType]) -> str:
|
||||
"""
|
||||
根据事件类型判断事件是广播事件还是链式事件
|
||||
:param event_type: 事件类型,支持 EventType 或 ChainEventType
|
||||
:return: 返回 Broadcast Event 或 Chain Event
|
||||
"""
|
||||
return "Broadcast Event" if isinstance(event_type, EventType) else "Chain Event"
|
||||
|
||||
|
||||
class EventManager(metaclass=Singleton):
|
||||
"""
|
||||
事件管理器
|
||||
EventManager 负责管理和调度广播事件和链式事件,包括订阅、发送和处理事件
|
||||
"""
|
||||
|
||||
# 退出事件
|
||||
__event = threading.Event()
|
||||
|
||||
def __init__(self):
|
||||
# 事件队列
|
||||
self._eventQueue = Queue()
|
||||
# 事件响应函数字典
|
||||
self._handlers: Dict[str, Dict[str, Any]] = {}
|
||||
# 已禁用的事件响应
|
||||
self._disabled_handlers = []
|
||||
self.__messagehelper = MessageHelper()
|
||||
self.__executor = ThreadHelper() # 动态线程池,用于消费事件
|
||||
self.__consumer_threads = [] # 用于保存启动的事件消费者线程
|
||||
self.__event_queue = PriorityQueue() # 优先级队列
|
||||
self.__broadcast_subscribers: Dict[EventType, Dict[str, Callable]] = {} # 广播事件的订阅者
|
||||
self.__chain_subscribers: Dict[ChainEventType, Dict[str, tuple[int, Callable]]] = {} # 链式事件的订阅者
|
||||
self.__disabled_handlers = set() # 禁用的事件处理器集合
|
||||
self.__disabled_classes = set() # 禁用的事件处理器类集合
|
||||
self.__lock = threading.Lock() # 线程锁
|
||||
self.__condition = threading.Condition(self.__lock) # 条件变量
|
||||
|
||||
def get_event(self):
|
||||
def start(self):
|
||||
"""
|
||||
获取事件
|
||||
开始广播事件处理线程
|
||||
"""
|
||||
# 启动消费者线程用于处理广播事件
|
||||
self.__event.set()
|
||||
for _ in range(MIN_EVENT_CONSUMER_THREADS):
|
||||
thread = threading.Thread(target=self.__fixed_broadcast_consumer, daemon=True)
|
||||
thread.start()
|
||||
self.__consumer_threads.append(thread) # 将线程对象保存到列表中
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
停止广播事件处理线程
|
||||
"""
|
||||
logger.info("正在停止事件处理...")
|
||||
self.__event.clear() # 停止广播事件处理
|
||||
try:
|
||||
event = self._eventQueue.get(block=True, timeout=1)
|
||||
handlers = self._handlers.get(event.event_type) or {}
|
||||
if handlers:
|
||||
# 去除掉被禁用的事件响应
|
||||
handlerList = [handler for handler in handlers.values()
|
||||
if handler.__qualname__.split(".")[0] not in self._disabled_handlers]
|
||||
return event, handlerList
|
||||
return event, []
|
||||
except Empty:
|
||||
return None, []
|
||||
# 通过遍历保存的线程来等待它们完成
|
||||
for consumer_thread in self.__consumer_threads:
|
||||
consumer_thread.join()
|
||||
logger.info("事件处理停止完成")
|
||||
except Exception as e:
|
||||
logger.error(f"停止事件处理线程出错:{str(e)} - {traceback.format_exc()}")
|
||||
|
||||
def check(self, etype: EventType):
|
||||
def send_event(self, etype: Union[EventType, ChainEventType], data: Optional[Dict] = None,
|
||||
priority: int = DEFAULT_EVENT_PRIORITY) -> Optional[Event]:
|
||||
"""
|
||||
检查事件是否存在响应,去除掉被禁用的事件响应
|
||||
发送事件,根据事件类型决定是广播事件还是链式事件
|
||||
:param etype: 事件类型 (EventType 或 ChainEventType)
|
||||
:param data: 可选,事件数据
|
||||
:param priority: 广播事件的优先级,默认为 10
|
||||
:return: 如果是链式事件,返回处理后的事件数据;否则返回 None
|
||||
"""
|
||||
if etype.value not in self._handlers:
|
||||
return False
|
||||
handlers = self._handlers.get(etype.value)
|
||||
return any([handler for handler in handlers.values()
|
||||
if handler.__qualname__.split(".")[0] not in self._disabled_handlers])
|
||||
|
||||
def add_event_listener(self, etype: EventType, handler: type):
|
||||
"""
|
||||
注册事件处理
|
||||
"""
|
||||
try:
|
||||
handlers = self._handlers[etype.value]
|
||||
except KeyError:
|
||||
handlers = {}
|
||||
self._handlers[etype.value] = handlers
|
||||
if handler.__qualname__ in handlers:
|
||||
handlers.pop(handler.__qualname__)
|
||||
event = Event(etype, data, priority)
|
||||
if isinstance(etype, EventType):
|
||||
self.__trigger_broadcast_event(event)
|
||||
with self.__condition:
|
||||
self.__condition.notify()
|
||||
elif isinstance(etype, ChainEventType):
|
||||
return self.__trigger_chain_event(event)
|
||||
else:
|
||||
logger.debug(f"Event Registed:{etype.value} - {handler.__qualname__}")
|
||||
handlers[handler.__qualname__] = handler
|
||||
logger.error(f"Unknown event type: {etype}")
|
||||
|
||||
def disable_events_hander(self, class_name: str):
|
||||
def add_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable,
|
||||
priority: int = DEFAULT_EVENT_PRIORITY):
|
||||
"""
|
||||
标记对应类事件处理为不可用
|
||||
注册事件处理器,将处理器添加到对应的事件订阅列表中
|
||||
:param event_type: 事件类型 (EventType 或 ChainEventType)
|
||||
:param handler: 处理器
|
||||
:param priority: 可选,链式事件的优先级,默认为 10;广播事件不需要优先级
|
||||
"""
|
||||
if class_name not in self._disabled_handlers:
|
||||
self._disabled_handlers.append(class_name)
|
||||
logger.debug(f"Event Disabled:{class_name}")
|
||||
with self.__lock:
|
||||
handler_identifier = self.__get_handler_identifier(handler)
|
||||
|
||||
def enable_events_hander(self, class_name: str):
|
||||
"""
|
||||
标记对应类事件处理为可用
|
||||
"""
|
||||
if class_name in self._disabled_handlers:
|
||||
self._disabled_handlers.remove(class_name)
|
||||
logger.debug(f"Event Enabled:{class_name}")
|
||||
|
||||
def send_event(self, etype: EventType, data: dict = None, callback: typing.Callable = None):
|
||||
"""
|
||||
发送事件(异步响应)
|
||||
"""
|
||||
if etype not in EventType:
|
||||
return
|
||||
event = Event(etype.value)
|
||||
event.event_data = data or {}
|
||||
event.event_callback = callback
|
||||
logger.debug(f"发送事件:{etype.value} - data:{event.event_data},callback:{callback}")
|
||||
self._eventQueue.put(event)
|
||||
|
||||
def send_event_sync(self, etype: EventType, data: dict = None):
|
||||
"""
|
||||
发送事件(同步响应)
|
||||
"""
|
||||
result: any = None
|
||||
condition = threading.Condition()
|
||||
|
||||
def callback(res):
|
||||
nonlocal result
|
||||
if res:
|
||||
result = res.result()
|
||||
with condition:
|
||||
condition.notify_all()
|
||||
|
||||
thread = threading.Thread(target=self.send_event, args=(etype, data, callback))
|
||||
thread.start()
|
||||
|
||||
with condition:
|
||||
condition.wait()
|
||||
|
||||
return result
|
||||
|
||||
def register(self, etype: [EventType, list]):
|
||||
"""
|
||||
事件注册
|
||||
:param etype: 事件类型
|
||||
"""
|
||||
|
||||
def decorator(f):
|
||||
if isinstance(etype, list):
|
||||
for et in etype:
|
||||
self.add_event_listener(et, f)
|
||||
elif type(etype) == type(EventType):
|
||||
for et in etype.__members__.values():
|
||||
self.add_event_listener(et, f)
|
||||
if isinstance(event_type, ChainEventType):
|
||||
# 链式事件,按优先级排序
|
||||
if event_type not in self.__chain_subscribers:
|
||||
self.__chain_subscribers[event_type] = {}
|
||||
handlers = self.__chain_subscribers[event_type]
|
||||
if handler_identifier in handlers:
|
||||
handlers.pop(handler_identifier)
|
||||
else:
|
||||
logger.debug(
|
||||
f"Subscribed to chain event: {event_type.value}, "
|
||||
f"Priority: {priority} - {handler_identifier}")
|
||||
handlers[handler_identifier] = (priority, handler)
|
||||
# 根据优先级排序
|
||||
self.__chain_subscribers[event_type] = dict(
|
||||
sorted(self.__chain_subscribers[event_type].items(), key=lambda x: x[1][0])
|
||||
)
|
||||
else:
|
||||
self.add_event_listener(etype, f)
|
||||
# 广播事件
|
||||
if event_type not in self.__broadcast_subscribers:
|
||||
self.__broadcast_subscribers[event_type] = {}
|
||||
handlers = self.__broadcast_subscribers[event_type]
|
||||
if handler_identifier in handlers:
|
||||
handlers.pop(handler_identifier)
|
||||
else:
|
||||
logger.debug(f"Subscribed to broadcast event: {event_type.value} - {handler_identifier}")
|
||||
handlers[handler_identifier] = handler
|
||||
|
||||
def remove_event_listener(self, event_type: Union[EventType, ChainEventType], handler: Callable):
|
||||
"""
|
||||
移除事件处理器,将处理器从对应事件的订阅列表中删除
|
||||
:param event_type: 事件类型 (EventType 或 ChainEventType)
|
||||
:param handler: 要移除的处理器
|
||||
"""
|
||||
with self.__lock:
|
||||
handler_identifier = self.__get_handler_identifier(handler)
|
||||
|
||||
if isinstance(event_type, ChainEventType) and event_type in self.__chain_subscribers:
|
||||
self.__chain_subscribers[event_type].pop(handler_identifier, None)
|
||||
logger.debug(f"Unsubscribed from chain event: {event_type.value} - {handler_identifier}")
|
||||
elif event_type in self.__broadcast_subscribers:
|
||||
self.__broadcast_subscribers[event_type].pop(handler_identifier, None)
|
||||
logger.debug(f"Unsubscribed from broadcast event: {event_type.value} - {handler_identifier}")
|
||||
|
||||
def disable_event_handler(self, target: Union[Callable, type]):
|
||||
"""
|
||||
禁用指定的事件处理器或事件处理器类
|
||||
:param target: 处理器函数或类
|
||||
"""
|
||||
identifier = self.__get_handler_identifier(target)
|
||||
if identifier in self.__disabled_handlers or identifier in self.__disabled_classes:
|
||||
return
|
||||
if isinstance(target, type):
|
||||
self.__disabled_classes.add(identifier)
|
||||
logger.debug(f"Disabled event handler class - {identifier}")
|
||||
else:
|
||||
self.__disabled_handlers.add(identifier)
|
||||
logger.debug(f"Disabled event handler - {identifier}")
|
||||
|
||||
def enable_event_handler(self, target: Union[Callable, type]):
|
||||
"""
|
||||
启用指定的事件处理器或事件处理器类
|
||||
:param target: 处理器函数或类
|
||||
"""
|
||||
identifier = self.__get_handler_identifier(target)
|
||||
if isinstance(target, type):
|
||||
self.__disabled_classes.discard(identifier)
|
||||
logger.debug(f"Enabled event handler class - {identifier}")
|
||||
else:
|
||||
self.__disabled_handlers.discard(identifier)
|
||||
logger.debug(f"Enabled event handler - {identifier}")
|
||||
|
||||
def visualize_handlers(self) -> List[Dict]:
|
||||
"""
|
||||
可视化所有事件处理器,包括是否被禁用的状态
|
||||
:return: 处理器列表,包含事件类型、处理器标识符、优先级(如果有)和状态
|
||||
"""
|
||||
handler_info = []
|
||||
# 统一处理广播事件和链式事件
|
||||
for event_type, subscribers in {**self.__broadcast_subscribers, **self.__chain_subscribers}.items():
|
||||
for handler_data in subscribers:
|
||||
if isinstance(subscribers, dict):
|
||||
priority, handler = handler_data
|
||||
else:
|
||||
priority = None
|
||||
handler = handler_data
|
||||
# 获取处理器的唯一标识符
|
||||
handler_id = self.__get_handler_identifier(handler)
|
||||
# 检查处理器的启用状态
|
||||
status = "enabled" if self.__is_handler_enabled(handler) else "disabled"
|
||||
# 构建处理器信息字典
|
||||
handler_dict = {
|
||||
"event_type": event_type.value,
|
||||
"handler_identifier": handler_id,
|
||||
"status": status
|
||||
}
|
||||
if priority is not None:
|
||||
handler_dict["priority"] = priority
|
||||
handler_info.append(handler_dict)
|
||||
return handler_info
|
||||
|
||||
@staticmethod
|
||||
def __get_handler_identifier(target: Union[Callable, type]) -> str:
|
||||
"""
|
||||
获取处理器或处理器类的唯一标识符,包括模块名和类名
|
||||
:param target: 处理器函数或类
|
||||
:return: 唯一标识符
|
||||
"""
|
||||
if isinstance(target, type):
|
||||
# 如果是类,使用模块名和类名
|
||||
module_name = target.__module__
|
||||
class_name = target.__qualname__
|
||||
return f"{module_name}.{class_name}"
|
||||
else:
|
||||
# 如果是函数或方法,使用 inspect.getmodule 来获取模块名
|
||||
module = inspect.getmodule(target)
|
||||
module_name = module.__name__ if module else "unknown_module"
|
||||
qualname = target.__qualname__
|
||||
return f"{module_name}.{qualname}"
|
||||
|
||||
def __is_handler_enabled(self, handler: Callable) -> bool:
|
||||
"""
|
||||
检查处理器是否已启用(没有被禁用)
|
||||
:param handler: 处理器函数
|
||||
:return: 如果处理器启用则返回 True,否则返回 False
|
||||
"""
|
||||
# 获取处理器的唯一标识符
|
||||
handler_id = self.__get_handler_identifier(handler)
|
||||
|
||||
# 获取处理器所属类的唯一标识符
|
||||
class_id = self.__get_handler_identifier(handler.__self__.__class__) if hasattr(handler, '__self__') else None
|
||||
|
||||
# 检查处理器或类是否被禁用,只要其中之一被禁用则返回 False
|
||||
if handler_id in self.__disabled_handlers or (class_id is not None and class_id in self.__disabled_classes):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def __trigger_chain_event(self, event: Event) -> Event:
|
||||
"""
|
||||
触发链式事件,按顺序调用订阅的处理器,并记录处理耗时
|
||||
"""
|
||||
logger.debug(f"Triggering synchronous chain event: {event}")
|
||||
self.__dispatch_chain_event(event)
|
||||
return event
|
||||
|
||||
def __trigger_broadcast_event(self, event: Event):
|
||||
"""
|
||||
触发广播事件,将事件插入到优先级队列中
|
||||
:param event: 要处理的事件对象
|
||||
"""
|
||||
logger.debug(f"Triggering broadcast event: {event}")
|
||||
self.__event_queue.put((event.priority, event))
|
||||
|
||||
def __dispatch_chain_event(self, event: Event):
|
||||
"""
|
||||
同步方式调度链式事件,按优先级顺序逐个调用事件处理器,并记录每个处理器的处理时间
|
||||
:param event: 要调度的事件对象
|
||||
"""
|
||||
handlers = self.__chain_subscribers.get(event.event_type, {})
|
||||
if not handlers:
|
||||
return
|
||||
self.__log_event_lifecycle(event, "started")
|
||||
for handler_id, (priority, handler) in handlers.items():
|
||||
start_time = time.time()
|
||||
self.__safe_invoke_handler(handler, event)
|
||||
logger.debug(
|
||||
f"Handler {self.__get_handler_identifier(handler)} (Priority: {priority}) ,"
|
||||
f" completed in {time.time() - start_time:.3f}s"
|
||||
)
|
||||
self.__log_event_lifecycle(event, "completed")
|
||||
|
||||
def __dispatch_broadcast_event(self, event: Event):
|
||||
"""
|
||||
异步方式调度广播事件,通过线程池逐个调用事件处理器
|
||||
:param event: 要调度的事件对象
|
||||
"""
|
||||
handlers = self.__broadcast_subscribers.get(event.event_type, {})
|
||||
if not handlers:
|
||||
return
|
||||
for handler_id, handler in handlers.items():
|
||||
self.__executor.submit(self.__safe_invoke_handler, handler, event)
|
||||
|
||||
def __safe_invoke_handler(self, handler: Callable, event: Event):
|
||||
"""
|
||||
调用处理器,处理链式或广播事件
|
||||
:param handler: 处理器
|
||||
:param event: 事件对象
|
||||
"""
|
||||
if not self.__is_handler_enabled(handler):
|
||||
logger.debug(f"Handler {self.__get_handler_identifier(handler)} is disabled. Skipping execution")
|
||||
return
|
||||
|
||||
# 根据事件类型判断是否需要深复制
|
||||
is_broadcast_event = isinstance(event.event_type, EventType)
|
||||
event_to_process = copy.deepcopy(event) if is_broadcast_event else event
|
||||
|
||||
names = handler.__qualname__.split(".")
|
||||
class_name, method_name = names[0], names[1]
|
||||
|
||||
try:
|
||||
from app.core.plugin import PluginManager
|
||||
|
||||
if class_name in PluginManager().get_plugin_ids():
|
||||
# 定义一个插件调用函数
|
||||
def plugin_callable():
|
||||
PluginManager().run_plugin_method(class_name, method_name, event_to_process)
|
||||
|
||||
if is_broadcast_event:
|
||||
self.__executor.submit(plugin_callable)
|
||||
else:
|
||||
plugin_callable()
|
||||
else:
|
||||
# 获取全局对象或模块类的实例
|
||||
class_obj = self.__get_class_instance(class_name)
|
||||
if class_obj and hasattr(class_obj, method_name):
|
||||
method = getattr(class_obj, method_name)
|
||||
if is_broadcast_event:
|
||||
self.__executor.submit(method, event_to_process)
|
||||
else:
|
||||
method(event_to_process)
|
||||
except Exception as e:
|
||||
self.__handle_event_error(event, handler, e)
|
||||
|
||||
@staticmethod
|
||||
def __get_class_instance(class_name: str):
|
||||
"""
|
||||
根据类名获取类实例,首先检查全局变量中是否存在该类,如果不存在则尝试动态导入模块。
|
||||
:param class_name: 类的名称
|
||||
:return: 类的实例
|
||||
"""
|
||||
# 检查类是否在全局变量中
|
||||
if class_name in globals():
|
||||
class_obj = globals()[class_name]()
|
||||
else:
|
||||
# 如果类不在全局变量中,尝试动态导入模块并创建实例
|
||||
# 导入模块,除了插件和Command,只有chain能响应事件
|
||||
try:
|
||||
module = importlib.import_module(f"app.chain.{class_name[:-5].lower()}")
|
||||
class_obj = getattr(module, class_name)()
|
||||
except Exception as e:
|
||||
logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}")
|
||||
return None
|
||||
|
||||
return class_obj
|
||||
|
||||
def __fixed_broadcast_consumer(self):
|
||||
"""
|
||||
固定的后台广播消费者线程,持续从队列中提取事件
|
||||
"""
|
||||
while self.__event.is_set():
|
||||
# 使用 Condition 优化队列的等待机制,避免频繁触发超时
|
||||
with self.__condition:
|
||||
# 阻塞等待,直到有事件插入
|
||||
self.__condition.wait()
|
||||
try:
|
||||
priority, event = self.__event_queue.get(timeout=EVENT_QUEUE_IDLE_TIMEOUT_SECONDS)
|
||||
self.__dispatch_broadcast_event(event)
|
||||
except Empty:
|
||||
logger.debug("Queue is empty, waiting for new events")
|
||||
|
||||
@staticmethod
|
||||
def __log_event_lifecycle(event: Event, stage: str):
|
||||
"""
|
||||
记录事件的生命周期日志
|
||||
"""
|
||||
logger.debug(f"{stage} - {event}")
|
||||
|
||||
def __handle_event_error(self, event: Event, handler: Callable, e: Exception):
|
||||
"""
|
||||
全局错误处理器,用于处理事件处理中的异常
|
||||
"""
|
||||
logger.error(f"事件处理出错:{str(e)} - {traceback.format_exc()}")
|
||||
|
||||
names = handler.__qualname__.split(".")
|
||||
class_name, method_name = names[0], names[1]
|
||||
|
||||
self.__messagehelper.put(title=f"{event.event_type} 事件处理出错",
|
||||
message=f"{class_name}.{method_name}:{str(e)}",
|
||||
role="system")
|
||||
self.send_event(
|
||||
EventType.SystemError,
|
||||
{
|
||||
"type": "event",
|
||||
"event_type": event.event_type,
|
||||
"event_handle": f"{class_name}.{method_name}",
|
||||
"error": str(e),
|
||||
"traceback": traceback.format_exc()
|
||||
}
|
||||
)
|
||||
|
||||
def register(self, etype: Union[EventType, ChainEventType, List[Union[EventType, ChainEventType]], type]):
|
||||
"""
|
||||
事件注册装饰器,用于将函数注册为事件的处理器
|
||||
:param etype:
|
||||
- 单个事件类型成员 (如 EventType.MetadataScrape, ChainEventType.PluginAction)
|
||||
- 事件类型类 (EventType, ChainEventType)
|
||||
- 或事件类型成员的列表
|
||||
"""
|
||||
|
||||
def decorator(f: Callable):
|
||||
# 将输入的事件类型统一转换为列表格式
|
||||
if isinstance(etype, list):
|
||||
event_list = etype # 传入的已经是列表,直接使用
|
||||
else:
|
||||
event_list = [etype] # 不是列表则包裹成单一元素的列表
|
||||
|
||||
# 遍历列表,处理每个事件类型
|
||||
for event in event_list:
|
||||
if isinstance(event, (EventType, ChainEventType)):
|
||||
self.add_event_listener(event, f)
|
||||
elif isinstance(event, type) and issubclass(event, (EventType, ChainEventType)):
|
||||
# 如果是 EventType 或 ChainEventType 类,提取该类中的所有成员
|
||||
for et in event.__members__.values():
|
||||
self.add_event_listener(et, f)
|
||||
else:
|
||||
raise ValueError(f"无效的事件类型: {event}")
|
||||
|
||||
return f
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
class Event(object):
|
||||
"""
|
||||
事件对象
|
||||
"""
|
||||
|
||||
def __init__(self, event_type=None):
|
||||
# 事件类型
|
||||
self.event_type = event_type
|
||||
# 字典用于保存具体的事件数据
|
||||
self.event_data = {}
|
||||
# 事件完成后回调函数
|
||||
self.event_callback = None
|
||||
|
||||
|
||||
# 实例引用,用于注册事件
|
||||
# 全局实例定义
|
||||
eventmanager = EventManager()
|
||||
|
||||
@@ -156,7 +156,7 @@ class PluginManager(metaclass=Singleton):
|
||||
# 未安装的不加载
|
||||
if plugin_id not in installed_plugins:
|
||||
# 设置事件状态为不可用
|
||||
eventmanager.disable_events_hander(plugin_id)
|
||||
eventmanager.disable_event_handler(plugin)
|
||||
continue
|
||||
# 生成实例
|
||||
plugin_obj = plugin()
|
||||
@@ -167,9 +167,9 @@ class PluginManager(metaclass=Singleton):
|
||||
logger.info(f"加载插件:{plugin_id} 版本:{plugin_obj.plugin_version}")
|
||||
# 启用的插件才设置事件注册状态可用
|
||||
if plugin_obj.get_state():
|
||||
eventmanager.enable_events_hander(plugin_id)
|
||||
eventmanager.enable_event_handler(plugin)
|
||||
else:
|
||||
eventmanager.disable_events_hander(plugin_id)
|
||||
eventmanager.disable_event_handler(plugin)
|
||||
except Exception as err:
|
||||
logger.error(f"加载插件 {plugin_id} 出错:{str(err)} - {traceback.format_exc()}")
|
||||
|
||||
@@ -179,15 +179,18 @@ class PluginManager(metaclass=Singleton):
|
||||
:param plugin_id: 插件ID
|
||||
:param conf: 插件配置
|
||||
"""
|
||||
if not self._running_plugins.get(plugin_id):
|
||||
plugin = self._running_plugins.get(plugin_id)
|
||||
if not plugin:
|
||||
return
|
||||
self._running_plugins[plugin_id].init_plugin(conf)
|
||||
if self._running_plugins[plugin_id].get_state():
|
||||
# 设置启用的插件事件注册状态可用
|
||||
eventmanager.enable_events_hander(plugin_id)
|
||||
# 初始化插件
|
||||
plugin.init_plugin(conf)
|
||||
# 检查插件状态并启用/禁用事件处理器
|
||||
if plugin.get_state():
|
||||
# 启用插件类的事件处理器
|
||||
eventmanager.enable_event_handler(type(plugin))
|
||||
else:
|
||||
# 设置事件状态为不可用
|
||||
eventmanager.disable_events_hander(plugin_id)
|
||||
# 禁用插件类的事件处理器
|
||||
eventmanager.disable_event_handler(type(plugin))
|
||||
|
||||
def stop(self, pid: str = None):
|
||||
"""
|
||||
@@ -202,6 +205,7 @@ class PluginManager(metaclass=Singleton):
|
||||
for plugin_id, plugin in self._running_plugins.items():
|
||||
if pid and plugin_id != pid:
|
||||
continue
|
||||
eventmanager.disable_event_handler(type(plugin))
|
||||
self.__stop_plugin(plugin)
|
||||
# 清空对像
|
||||
if pid:
|
||||
|
||||
@@ -30,6 +30,7 @@ except ImportError as e:
|
||||
print(error_message, file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
from app.core.event import EventManager
|
||||
from app.core.plugin import PluginManager
|
||||
from app.db.init import init_db, update_db
|
||||
from app.helper.thread import ThreadHelper
|
||||
@@ -212,7 +213,7 @@ def shutdown_server():
|
||||
PluginManager().stop()
|
||||
PluginManager().stop_monitor()
|
||||
# 停止事件消费
|
||||
Command().stop()
|
||||
EventManager().stop()
|
||||
# 停止虚拟显示
|
||||
DisplayHelper().stop()
|
||||
# 停止定时服务
|
||||
@@ -237,6 +238,8 @@ def start_module():
|
||||
ResourceHelper()
|
||||
# 加载模块
|
||||
ModuleManager()
|
||||
# 启动事件消费
|
||||
EventManager().start()
|
||||
# 安装在线插件
|
||||
PluginManager().sync()
|
||||
# 加载插件
|
||||
@@ -245,7 +248,7 @@ def start_module():
|
||||
Monitor()
|
||||
# 启动定时服务
|
||||
Scheduler()
|
||||
# 启动事件消费
|
||||
# 加载命令
|
||||
Command()
|
||||
# 初始化路由
|
||||
init_routers()
|
||||
|
||||
@@ -14,7 +14,7 @@ class TorrentStatus(Enum):
|
||||
DOWNLOADING = "下载中"
|
||||
|
||||
|
||||
# 可监听事件
|
||||
# 异步广播事件
|
||||
class EventType(Enum):
|
||||
# 插件需要重载
|
||||
PluginReload = "plugin.reload"
|
||||
@@ -56,6 +56,11 @@ class EventType(Enum):
|
||||
MetadataScrape = "metadata.scrape"
|
||||
|
||||
|
||||
# 同步链式事件
|
||||
class ChainEventType(Enum):
|
||||
pass
|
||||
|
||||
|
||||
# 系统配置Key字典
|
||||
class SystemConfigKey(Enum):
|
||||
# 下载器配置
|
||||
|
||||
Reference in New Issue
Block a user