mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-05-08 22:33:16 +08:00
119 lines
4.0 KiB
Python
119 lines
4.0 KiB
Python
"""安装插件工具"""
|
|
|
|
import json
|
|
from typing import Optional, Type
|
|
|
|
from pydantic import BaseModel, Field
|
|
|
|
from app.agent.tools.base import MoviePilotTool
|
|
from app.agent.tools.impl._plugin_tool_utils import (
|
|
get_plugin_snapshot,
|
|
install_plugin_runtime,
|
|
load_market_plugins,
|
|
summarize_plugin,
|
|
)
|
|
from app.log import logger
|
|
|
|
|
|
class InstallPluginInput(BaseModel):
|
|
"""安装插件工具的输入参数模型"""
|
|
|
|
explanation: str = Field(
|
|
...,
|
|
description="Clear explanation of why this tool is being used in the current context",
|
|
)
|
|
plugin_id: str = Field(
|
|
...,
|
|
description="Exact plugin ID to install. Use query_market_plugins first to find the correct plugin_id.",
|
|
)
|
|
force: bool = Field(
|
|
False,
|
|
description="Whether to force reinstall or upgrade the specified plugin.",
|
|
)
|
|
force_refresh_market: bool = Field(
|
|
False,
|
|
description="Whether to refresh plugin market caches before reading the market list.",
|
|
)
|
|
|
|
|
|
class InstallPluginTool(MoviePilotTool):
|
|
name: str = "install_plugin"
|
|
description: str = (
|
|
"Install a plugin by exact plugin_id from the plugin market or local plugin repositories. "
|
|
"Use query_market_plugins first when you need filtering or discovery."
|
|
)
|
|
require_admin: bool = True
|
|
args_schema: Type[BaseModel] = InstallPluginInput
|
|
|
|
def get_tool_message(self, **kwargs) -> Optional[str]:
|
|
plugin_id = kwargs.get("plugin_id")
|
|
return f"安装插件: {plugin_id or '未知插件'}"
|
|
|
|
async def run(
|
|
self,
|
|
plugin_id: str,
|
|
force: bool = False,
|
|
force_refresh_market: bool = False,
|
|
**kwargs,
|
|
) -> str:
|
|
logger.info(
|
|
f"执行工具: {self.name}, 参数: plugin_id={plugin_id}, force={force}"
|
|
)
|
|
|
|
try:
|
|
plugins = await load_market_plugins(force_refresh=force_refresh_market)
|
|
if not plugins:
|
|
return json.dumps(
|
|
{"success": False, "message": "当前插件市场没有可用插件"},
|
|
ensure_ascii=False,
|
|
)
|
|
|
|
candidate = next((plugin for plugin in plugins if plugin.id == plugin_id), None)
|
|
if not candidate:
|
|
return json.dumps(
|
|
{
|
|
"success": False,
|
|
"message": f"未在插件市场中找到插件: {plugin_id}。请先调用 query_market_plugins 确认 plugin_id。",
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
|
|
success, message, refreshed_only = await install_plugin_runtime(
|
|
candidate.id,
|
|
getattr(candidate, "repo_url", None),
|
|
force=force,
|
|
)
|
|
if not success:
|
|
return json.dumps(
|
|
{
|
|
"success": False,
|
|
"plugin": summarize_plugin(candidate),
|
|
"message": message,
|
|
},
|
|
ensure_ascii=False,
|
|
indent=2,
|
|
)
|
|
|
|
plugin_snapshot = get_plugin_snapshot(candidate.id)
|
|
if refreshed_only and getattr(candidate, "has_update", False) and not force:
|
|
message = "插件已安装,当前仅刷新加载;如需升级到市场新版本,请设置 force=true"
|
|
|
|
return json.dumps(
|
|
{
|
|
"success": True,
|
|
"message": message,
|
|
"force": force,
|
|
"refreshed_only": refreshed_only,
|
|
"plugin": summarize_plugin(candidate),
|
|
"runtime": plugin_snapshot,
|
|
},
|
|
ensure_ascii=False,
|
|
indent=2,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"安装插件失败: {e}", exc_info=True)
|
|
return json.dumps(
|
|
{"success": False, "message": f"安装插件时发生错误: {str(e)}"},
|
|
ensure_ascii=False,
|
|
)
|