Files
MoviePilot/app/command.py
jxxghp b8ef5d1efc feat: add session management to MessageChain
- Implemented session ID creation and reuse based on user activity, with a timeout of 15 minutes.
- Added remote command to clear user sessions, enhancing user session management capabilities.
- Updated Command class to include a new command for clearing sessions.
2025-11-18 08:41:05 +08:00

428 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import copy
import threading
import traceback
from typing import Any, Union, Dict, Optional
from app.chain import ChainBase
from app.chain.download import DownloadChain
from app.chain.message import MessageChain
from app.chain.site import SiteChain
from app.chain.subscribe import SubscribeChain
from app.chain.system import SystemChain
from app.chain.transfer import TransferChain
from app.core.event import Event as ManagerEvent, eventmanager, Event
from app.core.plugin import PluginManager
from app.helper.message import MessageHelper
from app.helper.thread import ThreadHelper
from app.log import logger
from app.scheduler import Scheduler
from app.schemas import Notification, CommandRegisterEventData
from app.schemas.types import EventType, MessageChannel, ChainEventType
from app.utils.object import ObjectUtils
from app.utils.singleton import Singleton
from app.utils.structures import DictUtils
class CommandChain(ChainBase):
pass
class Command(metaclass=Singleton):
"""
全局命令管理,消费事件
"""
def __init__(self):
# 插件管理器
super().__init__()
# 注册的命令集合
self._registered_commands = {}
# 所有命令集合
self._commands = {}
# 内建命令集合
self._preset_commands = {
"/cookiecloud": {
"id": "cookiecloud",
"type": "scheduler",
"description": "同步站点",
"category": "站点"
},
"/sites": {
"func": SiteChain().remote_list,
"description": "查询站点",
"category": "站点",
"data": {}
},
"/site_cookie": {
"func": SiteChain().remote_cookie,
"description": "更新站点Cookie",
"data": {}
},
"/site_statistic": {
"func": SiteChain().remote_refresh_userdatas,
"description": "站点数据统计",
"data": {}
},
"/site_enable": {
"func": SiteChain().remote_enable,
"description": "启用站点",
"data": {}
},
"/site_disable": {
"func": SiteChain().remote_disable,
"description": "禁用站点",
"data": {}
},
"/mediaserver_sync": {
"id": "mediaserver_sync",
"type": "scheduler",
"description": "同步媒体服务器",
"category": "管理"
},
"/subscribes": {
"func": SubscribeChain().remote_list,
"description": "查询订阅",
"category": "订阅",
"data": {}
},
"/subscribe_refresh": {
"id": "subscribe_refresh",
"type": "scheduler",
"description": "刷新订阅",
"category": "订阅"
},
"/subscribe_search": {
"id": "subscribe_search",
"type": "scheduler",
"description": "搜索订阅",
"category": "订阅"
},
"/subscribe_delete": {
"func": SubscribeChain().remote_delete,
"description": "删除订阅",
"data": {}
},
"/subscribe_tmdb": {
"id": "subscribe_tmdb",
"type": "scheduler",
"description": "订阅元数据更新"
},
"/downloading": {
"func": DownloadChain().remote_downloading,
"description": "正在下载",
"category": "管理",
"data": {}
},
"/transfer": {
"id": "transfer",
"type": "scheduler",
"description": "下载文件整理",
"category": "管理"
},
"/redo": {
"func": TransferChain().remote_transfer,
"description": "手动整理",
"data": {}
},
"/clear_cache": {
"func": SystemChain().remote_clear_cache,
"description": "清理缓存",
"category": "管理",
"data": {}
},
"/restart": {
"func": SystemChain().restart,
"description": "重启系统",
"category": "管理",
"data": {}
},
"/version": {
"func": SystemChain().version,
"description": "当前版本",
"category": "管理",
"data": {}
},
"/clear_session": {
"func": MessageChain().remote_clear_session,
"description": "清除会话",
"category": "管理",
"data": {}
}
}
# 插件命令集合
self._plugin_commands = {}
# 其他命令集合
self._other_commands = {}
# 初始化锁
self._rlock = threading.RLock()
# 插件管理
self.pluginmanager = PluginManager()
# 定时服务管理
self.scheduler = Scheduler()
# 消息管理器
self.messagehelper = MessageHelper()
# 初始化命令
self.init_commands()
def init_commands(self, pid: Optional[str] = None) -> None:
"""
初始化菜单命令
"""
# 使用线程池提交后台任务,避免引起阻塞
ThreadHelper().submit(self.__init_commands_background, pid)
def __init_commands_background(self, pid: Optional[str] = None) -> None:
"""
后台初始化菜单命令
"""
try:
with self._rlock:
logger.debug("Acquired lock for initializing commands in background.")
self._plugin_commands = self.__build_plugin_commands(pid)
self._commands = {
**self._preset_commands,
**self._plugin_commands,
**self._other_commands
}
# 强制触发注册
force_register = False
# 触发事件允许可以拦截和调整命令
event, initial_commands = self.__trigger_register_commands_event()
if event and event.event_data:
# 如果事件返回有效的 event_data使用事件中调整后的命令
event_data: CommandRegisterEventData = event.event_data
# 如果事件被取消,跳过命令注册
if event_data.cancel:
logger.debug(f"Command initialization canceled by event: {event_data.source}")
return
# 如果拦截源与插件标识一致时,这里认为需要强制触发注册
if pid is not None and pid == event_data.source:
force_register = True
initial_commands = event_data.commands or {}
logger.debug(f"Registering command count from event: {len(initial_commands)}")
else:
logger.debug(f"Registering initial command count: {len(initial_commands)}")
# initial_commands 必须是 self._commands 的子集
filtered_initial_commands = DictUtils.filter_keys_to_subset(initial_commands, self._commands)
# 如果 filtered_initial_commands 为空,则跳过注册
if not filtered_initial_commands and not force_register:
logger.debug("Filtered commands are empty, skipping registration.")
return
# 对比调整后的命令与当前命令
if filtered_initial_commands != self._registered_commands or force_register:
logger.debug("Command set has changed or force registration is enabled.")
self._registered_commands = filtered_initial_commands
CommandChain().register_commands(commands=filtered_initial_commands)
else:
logger.debug("Command set unchanged, skipping broadcast registration.")
except Exception as e:
logger.error(f"Error occurred during command initialization in background: {e}", exc_info=True)
def __trigger_register_commands_event(self) -> tuple[Optional[Event], dict]:
"""
触发事件,允许调整命令数据
"""
def add_commands(source, command_type):
"""
添加命令集合
"""
for cmd, command in source.items():
if not command.get("show", True):
continue
command_data = {
"type": command_type,
"description": command.get("description"),
"category": command.get("category")
}
# 如果有 pid则添加到命令数据中
plugin_id = command.get("pid")
if plugin_id:
command_data["pid"] = plugin_id
commands[cmd] = command_data
# 初始化命令字典
commands: Dict[str, dict] = {}
add_commands(self._preset_commands, "preset")
add_commands(self._plugin_commands, "plugin")
add_commands(self._other_commands, "other")
# 触发事件允许可以拦截和调整命令
event_data = CommandRegisterEventData(commands=commands, origin="CommandChain", service=None)
event = eventmanager.send_event(ChainEventType.CommandRegister, event_data)
return event, commands
def __build_plugin_commands(self, _: Optional[str] = None) -> Dict[str, dict]:
"""
构建插件命令
"""
# 为了保证命令顺序的一致性,目前这里没有直接使用 pid 获取单一插件命令,后续如果存在性能问题,可以考虑优化这里的逻辑
plugin_commands = {}
for command in self.pluginmanager.get_plugin_commands():
cmd = command.get("cmd")
if cmd:
plugin_commands[cmd] = {
"pid": command.get("pid"),
"func": self.send_plugin_event,
"description": command.get("desc"),
"category": command.get("category"),
"show": command.get("show", True),
"data": {
"etype": command.get("event"),
"data": command.get("data")
}
}
return plugin_commands
def __run_command(self, command: Dict[str, any], data_str: Optional[str] = "",
channel: MessageChannel = None, source: Optional[str] = None, userid: Union[str, int] = None):
"""
运行定时服务
"""
if command.get("type") == "scheduler":
# 定时服务
if userid:
CommandChain().post_message(
Notification(
channel=channel,
source=source,
title=f"开始执行 {command.get('description')} ...",
userid=userid
)
)
# 执行定时任务
self.scheduler.start(job_id=command.get("id"))
if userid:
CommandChain().post_message(
Notification(
channel=channel,
source=source,
title=f"{command.get('description')} 执行完成",
userid=userid
)
)
else:
# 命令
cmd_data = copy.deepcopy(command['data']) if command.get('data') else {}
args_num = ObjectUtils.arguments(command['func'])
if args_num > 0:
if cmd_data:
# 有内置参数直接使用内置参数
data = cmd_data.get("data") or {}
data['channel'] = channel
data['source'] = source
data['user'] = userid
if data_str:
data['arg_str'] = data_str
cmd_data['data'] = data
command['func'](**cmd_data)
elif args_num == 3:
# 没有输入参数只输入渠道来源、用户ID和消息来源
command['func'](channel, userid, source)
elif args_num > 3:
# 多个输入参数用户输入、用户ID
command['func'](data_str, channel, userid, source)
else:
# 没有参数
command['func']()
def get_commands(self):
"""
获取命令列表
"""
return self._commands
def get(self, cmd: str) -> Any:
"""
获取命令
"""
return self._commands.get(cmd, {})
def register(self, cmd: str, func: Any, data: Optional[dict] = None,
desc: Optional[str] = None, category: Optional[str] = None,
show: bool = True) -> None:
"""
注册单个命令
"""
# 单独调用的,统一注册到其他
self._other_commands[cmd] = {
"func": func,
"description": desc,
"category": category,
"data": data or {},
"show": show
}
def execute(self, cmd: str, data_str: Optional[str] = "",
channel: MessageChannel = None, source: Optional[str] = None,
userid: Union[str, int] = None) -> None:
"""
执行命令
"""
command = self.get(cmd)
if command:
try:
if userid:
logger.info(f"用户 {userid} 开始执行:{command.get('description')} ...")
else:
logger.info(f"开始执行:{command.get('description')} ...")
# 执行命令
self.__run_command(command, data_str=data_str,
channel=channel, source=source, userid=userid)
if userid:
logger.info(f"用户 {userid} {command.get('description')} 执行完成")
else:
logger.info(f"{command.get('description')} 执行完成")
except Exception as err:
logger.error(f"执行命令 {cmd} 出错:{str(err)} - {traceback.format_exc()}")
self.messagehelper.put(title=f"执行命令 {cmd} 出错",
message=str(err),
role="system")
@staticmethod
def send_plugin_event(etype: EventType, data: dict) -> None:
"""
发送插件命令
"""
eventmanager.send_event(etype, data)
@eventmanager.register(EventType.CommandExcute)
def command_event(self, event: ManagerEvent) -> None:
"""
注册命令执行事件
event_data: {
"cmd": "/xxx args"
}
"""
# 命令参数
event_str = event.event_data.get('cmd')
# 消息渠道
event_channel = event.event_data.get('channel')
# 消息来源
event_source = event.event_data.get('source')
# 消息用户
event_user = event.event_data.get('user')
if event_str:
cmd = event_str.split()[0]
args = " ".join(event_str.split()[1:])
if self.get(cmd):
self.execute(cmd=cmd, data_str=args,
channel=event_channel, source=event_source, userid=event_user)
@eventmanager.register(EventType.ModuleReload)
def module_reload_event(self, _: ManagerEvent) -> None:
"""
注册模块重载事件
"""
# 发生模块重载时,重新注册命令
self.init_commands()