Refactor plugin cache handling and update related docs

This commit is contained in:
jxxghp
2026-06-21 18:29:27 +08:00
parent 43d1abdec8
commit 6647565ec4
14 changed files with 808 additions and 456 deletions

View File

@@ -27,7 +27,10 @@ from langgraph.checkpoint.memory import InMemorySaver
from app.agent.callback import StreamingHandler
from app.agent.llm import LLMHelper
from app.agent.memory import memory_manager
from app.agent.middleware.activity_log import ActivityLogMiddleware
from app.agent.middleware.activity_log import (
ActivityLogMiddleware,
QUERY_ACTIVITY_LOG_TOOL_NAME,
)
from app.agent.middleware.jobs import (
JobsMiddleware,
filter_active_jobs,
@@ -36,7 +39,7 @@ from app.agent.middleware.jobs import (
from app.agent.middleware.memory import MemoryMiddleware
from app.agent.middleware.patch_tool_calls import PatchToolCallsMiddleware
from app.agent.middleware.runtime_config import RuntimeConfigMiddleware
from app.agent.middleware.skills import SkillsMiddleware
from app.agent.middleware.skills import SKILL_TOOL_NAME, SkillsMiddleware
from app.agent.middleware.subagents import (
SUBAGENT_CONTROL_TOOL_NAME,
SUBAGENT_TASK_TOOL_NAME,
@@ -972,6 +975,20 @@ class MoviePilotAgent:
# 工具列表
tools = self._initialize_tools()
skills_middleware = SkillsMiddleware(
sources=[str(agent_runtime_manager.skills_dir)],
bundled_skills_dir=str(settings.ROOT_PATH / "skills"),
)
skill_tools = list(getattr(skills_middleware, "tools", []) or [])
activity_log_middleware = None
activity_log_tools = []
if self.has_message_context:
activity_log_middleware = ActivityLogMiddleware(
activity_dir=str(agent_runtime_manager.activity_dir),
)
activity_log_tools = list(
getattr(activity_log_middleware, "tools", []) or []
)
subagent_middlewares, subagent_task_tools = create_subagent_middlewares(
model=non_streaming_model,
tools=self._initialize_subagent_tools(),
@@ -988,14 +1005,23 @@ class MoviePilotAgent:
if getattr(tool, "name", None)
in {SUBAGENT_TASK_TOOL_NAME, SUBAGENT_CONTROL_TOOL_NAME}
)
if skill_tools:
always_include_tools.extend(
tool.name
for tool in skill_tools
if getattr(tool, "name", None) == SKILL_TOOL_NAME
)
if activity_log_tools:
always_include_tools.extend(
tool.name
for tool in activity_log_tools
if getattr(tool, "name", None) == QUERY_ACTIVITY_LOG_TOOL_NAME
)
# 中间件
middlewares = [
# Skills
SkillsMiddleware(
sources=[str(agent_runtime_manager.skills_dir)],
bundled_skills_dir=str(settings.ROOT_PATH / "skills"),
),
skills_middleware,
# Jobs 任务管理
JobsMiddleware(
sources=[str(agent_runtime_manager.jobs_dir)],
@@ -1019,9 +1045,7 @@ class MoviePilotAgent:
if self.has_message_context:
middlewares.insert(
4,
ActivityLogMiddleware(
activity_dir=str(agent_runtime_manager.activity_dir),
),
activity_log_middleware,
)
# 工具选择
@@ -1029,7 +1053,12 @@ class MoviePilotAgent:
middlewares.append(
ToolSelectorMiddleware(
model=non_streaming_model,
selection_tools=[*tools, *subagent_task_tools],
selection_tools=[
*tools,
*skill_tools,
*activity_log_tools,
*subagent_task_tools,
],
max_tools=max_tools,
always_include=always_include_tools,
)
@@ -1037,7 +1066,7 @@ class MoviePilotAgent:
return create_agent(
model=agent_model,
tools=tools,
tools=[*tools, *skill_tools, *activity_log_tools],
system_prompt=system_prompt,
middleware=middlewares,
checkpointer=InMemorySaver(),

View File

@@ -6,6 +6,7 @@
并在每次 Agent 启动时注入轻量索引,完整日志由工具按需查询。
"""
import json
import re
from collections.abc import Awaitable, Callable
from datetime import datetime, timedelta
@@ -23,9 +24,12 @@ from langchain.agents.middleware.types import (
ResponseT,
)
from langchain_core.messages import AIMessage, HumanMessage, ToolMessage
from langchain_core.tools import StructuredTool
from langgraph.runtime import Runtime
from pydantic import BaseModel, Field
from app.agent.middleware.utils import append_to_system_message
from app.agent.tools.tags import ToolTag
from app.log import logger
# 活动日志保留天数
@@ -48,6 +52,13 @@ MAX_LOG_FILE_SIZE = 256 * 1024
MAX_CONTEXT_FOR_SUMMARY = 4000
SUMMARY_SKIP_MARKER = "SKIP"
QUERY_ACTIVITY_LOG_TOOL_NAME = "query_activity_log"
QUERY_ACTIVITY_LOG_TOOL_DESCRIPTION = (
"Query recent MoviePilot Agent activity logs on demand. Use this when the user asks what was done before, "
"asks to continue a previous task, or explicitly references recent agent activity. Supports keyword, date, "
"recent-day window, limit, and optional regex filters. If a keyword search returns no results, retry with "
"a shorter keyword, a larger days window, or no keyword to inspect recent entries."
)
# LLM 总结的提示词
SUMMARY_PROMPT = """请判断以下 AI 助手与用户的对话是否值得写入 MoviePilot 活动日志。
@@ -71,6 +82,41 @@ SUMMARY_PROMPT = """请判断以下 AI 助手与用户的对话是否值得写
ACTIVITY_ENTRY_PATTERN = re.compile(r"^-\s+\*\*(?P<time>\d{2}:\d{2})\*\*\s+(?P<summary>.+)$")
class QueryActivityLogInput(BaseModel):
"""查询活动日志工具的输入参数模型。"""
explanation: Optional[str] = Field(
None,
description="Clear explanation of why this tool is being used in the current context",
)
keyword: Optional[str] = Field(
None,
description=(
"Optional plain-text keyword to filter activity summaries. Use short title, path, site, task, "
"or status fragments; omit it to inspect latest entries."
),
)
use_regex: Optional[bool] = Field(
False,
description=(
"Whether to treat keyword as a regular expression. Defaults to false; enable only for "
"alternative or pattern matching."
),
)
date: Optional[str] = Field(
None,
description="Optional exact date in YYYY-MM-DD format. If omitted, recent days are searched.",
)
days: Optional[int] = Field(
DEFAULT_QUERY_DAYS,
description="Number of recent days to search when date is not specified.",
)
limit: Optional[int] = Field(
DEFAULT_QUERY_LIMIT,
description="Maximum number of activity entries to return.",
)
def _coerce_query_limit(limit: Optional[int]) -> int:
"""规范化活动日志查询条数。"""
if limit is None:
@@ -225,6 +271,53 @@ def query_activity_logs(
}
class _ActivityLogToolProvider:
"""活动日志工具的查询实现。"""
def __init__(self, *, activity_dir: str) -> None:
"""初始化活动日志查询目录。"""
self._activity_dir = activity_dir
async def query_activity_log(
self,
keyword: Optional[str] = None,
use_regex: Optional[bool] = False,
date: Optional[str] = None,
days: Optional[int] = DEFAULT_QUERY_DAYS,
limit: Optional[int] = DEFAULT_QUERY_LIMIT,
explanation: Optional[str] = None,
) -> str:
"""查询活动日志并返回 JSON 字符串。"""
logger.info(
"查询活动日志: keyword=%s, use_regex=%s, date=%s, days=%s, limit=%s, explanation=%s",
keyword,
use_regex,
date,
days,
limit,
explanation or "-",
)
try:
payload = query_activity_logs(
self._activity_dir,
keyword=keyword,
use_regex=bool(use_regex),
date=date,
days=days or DEFAULT_QUERY_DAYS,
limit=limit,
)
return json.dumps(payload, ensure_ascii=False, indent=2)
except Exception as err:
logger.error(f"查询活动日志失败: {err}", exc_info=True)
return json.dumps(
{
"success": False,
"message": f"查询活动日志时发生错误: {str(err)}",
},
ensure_ascii=False,
)
class ActivityLogState(AgentState):
"""ActivityLogMiddleware 的状态模型。"""
@@ -373,17 +466,9 @@ ACTIVITY_LOG_SYSTEM_PROMPT = """<activity_log>
</activity_log_index>
<activity_log_guidelines>
Activity logs are automatically maintained by the system and are available for continuity, but full log contents are not injected into context by default.
**How to use this information:**
- The <activity_log_index> above only lists which recent dates have activity records and how many entries exist.
- Use `query_activity_log` when the user asks about previous work, asks to continue a prior task, or when recent activity is clearly relevant to the current request.
- To find related logs, start with a broad search: use the exact date if known; otherwise query recent days with a short keyword, or omit `keyword` and inspect the latest entries. If there are no matches, retry with a larger `days` value or a shorter object/path fragment.
- `query_activity_log.keyword` is a plain substring by default. Set `use_regex=true` only when matching alternatives or patterns such as multiple titles, paths, or task IDs.
- Do not query activity logs for routine standalone tasks such as file organization, media recognition, downloads, subscriptions, or diagnostics unless the user explicitly references prior activity.
- Activity logs are read-only from your perspective. Do not attempt to edit or write to activity log files.
- For long-term preferences and knowledge, continue to use MEMORY.md.
- Activity logs are retained for {retention_days} days and then automatically cleaned up.
The index only shows recent dates and entry counts, not full log contents.
Use `query_activity_log` only when the user references previous work, asks to continue a prior task, or recent activity is clearly relevant.
Activity logs are read-only and retained for {retention_days} days; use MEMORY.md for durable preferences.
</activity_log_guidelines>
</activity_log>
"""
@@ -414,6 +499,16 @@ class ActivityLogMiddleware(AgentMiddleware[ActivityLogState, ContextT, Response
self.activity_dir = activity_dir
self.retention_days = retention_days
self.prompt_load_days = prompt_load_days
self._tool_provider = _ActivityLogToolProvider(activity_dir=activity_dir)
self.tools = [
StructuredTool.from_function(
coroutine=self._tool_provider.query_activity_log,
name=QUERY_ACTIVITY_LOG_TOOL_NAME,
description=QUERY_ACTIVITY_LOG_TOOL_DESCRIPTION,
args_schema=QueryActivityLogInput,
tags=[ToolTag.Read, ToolTag.System],
)
]
def _get_log_path(self, date_str: str) -> AsyncPath:
"""获取指定日期的日志文件路径。"""
@@ -584,6 +679,7 @@ class ActivityLogMiddleware(AgentMiddleware[ActivityLogState, ContextT, Response
__all__ = [
"ActivityLogMiddleware",
"QUERY_ACTIVITY_LOG_TOOL_NAME",
"load_activity_log_index",
"query_activity_logs",
]

View File

@@ -195,7 +195,7 @@ async def load_jobs_metadata(source_paths: list[str]) -> list[JobMetadata]:
JOBS_SYSTEM_PROMPT = """
<jobs_system>
You have a **scheduled jobs** system that allows you to track and execute long-running or recurring tasks.
You have a scheduled jobs system for user-requested delayed or recurring work.
**Jobs Location:** `{jobs_location}`
@@ -203,71 +203,11 @@ You have a **scheduled jobs** system that allows you to track and execute long-r
{jobs_list}
**Job File Format:**
Each job is a directory containing a `JOB.md` file with YAML frontmatter followed by task details:
```markdown
---
name: 任务名称(简短中文描述)
description: 任务的详细描述,说明要做什么
schedule: once 或 recurring
status: pending / in_progress / completed / cancelled
last_run: "YYYY-MM-DD HH:MM"(上次执行时间,可选)
---
# 任务详情
## 目标
详细描述这个任务要完成的目标。
## 执行日志
记录每次执行的情况和结果。
- **2024-01-15 10:00** - 执行了XXX操作结果成功/失败
- **2024-01-16 10:00** - 继续执行XXX...
```
**Job Lifecycle Rules:**
1. **Creating a Job**: When a user asks you to do something periodically or at a later time:
- Create a new directory under the jobs location, directory name is the `job-id` (lowercase, hyphens, 1-64 chars)
- Write a `JOB.md` file with proper frontmatter and detailed task description
- Set `schedule: once` for one-time tasks, `schedule: recurring` for repeating tasks (e.g., daily sign-in, weekly checks)
- Set initial `status: pending`
2. **Executing a Job**: When you work on a job:
- Update `status: in_progress` in the frontmatter
- Execute the required actions using your tools
- Log the execution result in the "执行日志" section with timestamp
- Update `last_run` in frontmatter to current time
3. **Completing a Job**:
- For `schedule: once` tasks: set `status: completed` after successful execution
- For `schedule: recurring` tasks: keep `status: pending` after execution, only update `last_run` time. The job stays active for the next scheduled run.
- Set `status: cancelled` if the user explicitly asks to cancel/stop a task
4. **Heartbeat Check**: You will be periodically woken up to check pending jobs. When woken up:
- Read the jobs directory to find all active jobs (status: pending or in_progress)
- Skip jobs with `status: completed` or `status: cancelled`
- For `schedule: recurring` jobs, check `last_run` to determine if it's time to run again
- Execute pending jobs and update their status/logs accordingly
**Important Notes:**
- Each job MUST have its own separate directory and JOB.md file to avoid conflicts
- Always update the frontmatter fields (status, last_run) when executing a job
- Keep execution logs concise but informative
- For recurring jobs, maintain a rolling log (keep recent entries, you can summarize/remove old entries to keep the file manageable)
- When creating jobs, make the description detailed enough that you can understand and execute the task in future sessions without additional context
**When to Create Jobs:**
- User says "每天帮我..." / "定期..." / "定时..." / "提醒我..." / "以后每次..."
- User requests a task that should be done repeatedly
- User asks for monitoring or periodic checking of something
**When NOT to Create Jobs:**
- User asks for an immediate one-time action (just do it now)
- Simple questions or conversations
- Tasks that are already handled by MoviePilot's built-in scheduler services
Rules:
- Create jobs only when the user asks for delayed, recurring, reminder, or monitoring behavior.
- Do not create jobs for immediate one-time work or work already handled by MoviePilot schedulers.
- Each job lives in its own directory with a `JOB.md`; read the listed file before executing or updating an active job.
- During heartbeat checks, act only on `pending` or `in_progress` jobs, update status/last_run/logs, and leave recurring jobs `pending` after each run.
</jobs_system>
"""

View File

@@ -1,8 +1,9 @@
import json
import re
import shutil
from collections.abc import Awaitable, Callable
from pathlib import Path
from typing import Annotated, List
from typing import Annotated, List, Optional
from typing import NotRequired, TypedDict
import yaml # noqa
@@ -17,9 +18,12 @@ from langchain.agents.middleware.types import (
)
from langchain.agents.middleware.types import PrivateStateAttr # noqa
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import StructuredTool
from langgraph.runtime import Runtime
from pydantic import BaseModel, Field
from app.agent.middleware.utils import append_to_system_message
from app.agent.tools.tags import ToolTag
from app.log import logger
# 安全提示: SKILL.md 文件最大限制为 10MB防止 DoS 攻击
@@ -84,6 +88,19 @@ class SkillsStateUpdate(TypedDict):
"""待合并的 skill 元数据列表。"""
class SkillToolInput(BaseModel):
"""Skill 加载工具的输入参数模型。"""
explanation: Optional[str] = Field(
None,
description="Clear explanation of why this skill is needed in the current context",
)
name: str = Field(
...,
description="Skill name or id from the available skills list.",
)
def _parse_skill_metadata( # noqa: C901
content: str,
skill_path: str,
@@ -248,66 +265,56 @@ async def _alist_skills(source_path: AsyncPath) -> list[SkillMetadata]:
return skills
def _list_skills(source_path: Path) -> list[SkillMetadata]:
"""同步列出指定路径下的所有技能元数据。"""
if not source_path.exists():
return []
skill_dirs = [
path
for path in source_path.iterdir()
if path.is_dir() and (path / "SKILL.md").is_file()
]
if not skill_dirs:
return []
skill_dirs.sort(key=lambda p: p.name.casefold())
skills: list[SkillMetadata] = []
for skill_path in skill_dirs:
skill_md_path = skill_path / "SKILL.md"
skill_content = skill_md_path.read_text(encoding="utf-8", errors="replace")
skill_metadata = _parse_skill_metadata(
content=skill_content,
skill_path=str(skill_md_path),
skill_id=skill_path.name,
)
if skill_metadata:
skills.append(skill_metadata)
return skills
SKILLS_SYSTEM_PROMPT = """
<skills_system>
You have access to a skills library that provides specialized capabilities and domain knowledge.
{skills_locations}
You have access to a skills library for specialized MoviePilot workflows.
**Available Skills:**
{skills_list}
**How to Use Skills (Progressive Disclosure):**
Skills follow a **progressive disclosure** pattern - you see their name and description above, but only read full instructions when needed:
1. **Recognize when a skill applies**: Check if the user's task matches a skill's description
2. **Read the skill's full instructions**: Use the path shown in the skill list above
3. **Follow the skill's instructions**: SKILL.md contains step-by-step workflows, best practices, and examples
4. **Access supporting files**: Skills may include helper scripts, configs, or reference docs - use absolute paths
**Creating New Skills:**
When you identify a repetitive complex workflow or specialized task that would benefit from being a skill, you can create one:
1. **Directory Structure**: Create a new directory in one of the skills locations. The directory name is the `skill-id`.
- Path format: `<skills_location>/<skill-id>/SKILL.md`
- `skill-id` constraints: 1-64 characters, lowercase letters, numbers, and hyphens only.
2. **SKILL.md Format**: Must start with a YAML frontmatter followed by markdown instructions.
```markdown
---
name: Brief tool name (Chinese)
description: Detailed functional description and use cases (1-1024 chars)
allowed-tools: "tool1 tool2" (optional, space-separated list of recommended tools)
compatibility: "Environment requirements" (optional, max 500 chars)
---
# Skill Instructions
Step-by-step workflows, best practices, and examples go here.
```
3. **Supporting Files**: You can add `.py` scripts, `.yaml` configs, or other files within the same skill directory. Reference them using absolute paths in `SKILL.md`.
**When to Use Skills:**
- User's request matches a skill's domain (e.g., "research X" -> web-research skill)
- You need specialized knowledge or structured workflows
- A skill provides proven patterns for complex tasks
**Executing Skill Scripts:**
Skills may contain Python scripts or other executable files. Always use absolute paths from the skill list.
**Example Workflow:**
User: "Can you research the latest developments in quantum computing?"
1. Check available skills -> See "web-research" skill with its path
2. Read the skill using the path shown
3. Follow the skill's research workflow (search -> organize -> synthesize)
4. Use any helper scripts with absolute paths
Remember: Skills make you more capable and consistent. When in doubt, check if a skill exists for the task!
When the user's request matches a skill description, call the `skill` tool with that skill name before taking task actions. Follow the loaded SKILL.md instructions, and load referenced supporting files only when needed. Do not create or rewrite skills unless the user explicitly asks for skill authoring.
</skills_system>
"""
SKILL_TOOL_NAME = "skill"
SKILL_TOOL_DESCRIPTION = """Loads the full instructions for a MoviePilot skill by name or id.
Available skills:
{skills_catalog}
Call this tool when the user's task matches one of the available skills. The tool returns the SKILL.md content and metadata so you can follow the skill's instructions. Do not use this for simple tasks that do not need a skill.
"""
def _extract_version(skill_md: Path) -> int:
"""从 SKILL.md 文件中快速提取 version 字段,无法提取时返回 0。"""
@@ -402,6 +409,108 @@ def _sync_bundled_skills(bundled_dir: Path, target_dir: Path) -> None:
logger.warning("更新内置技能 '%s' 失败: %s", skill_src.name, e)
class _SkillToolProvider:
"""Skill 工具的目录扫描和文件读取实现。"""
def __init__(self, *, sources: list[str]) -> None:
"""初始化 Skill 工具数据源。"""
self._sources = sources
@staticmethod
def _normalize_name(value: object) -> str:
"""标准化技能名称用于匹配。"""
return str(value or "").strip().casefold()
@classmethod
def _skill_matches(cls, skill: SkillMetadata, query: str) -> bool:
"""判断技能元数据是否匹配用户提供的名称。"""
normalized_query = cls._normalize_name(query)
candidates = [
skill.get("id"),
skill.get("name"),
]
return any(
cls._normalize_name(candidate) == normalized_query
for candidate in candidates
)
async def _find_skill(self, name: str) -> Optional[SkillMetadata]:
"""从中间件配置的 skills 目录中查找指定技能。"""
all_skills: dict[str, SkillMetadata] = {}
for source_path in self._sources:
skill_source_path = AsyncPath(source_path)
if not await skill_source_path.exists():
continue
for skill in await _alist_skills(skill_source_path):
all_skills[skill["name"]] = skill
for skill in all_skills.values():
if self._skill_matches(skill, name):
return skill
return None
@staticmethod
async def _read_skill_content(skill_path: str) -> tuple[str, bool]:
"""读取技能文件内容,并在超出上限时返回截断标记。"""
path = AsyncPath(skill_path)
stat = await path.stat()
truncated = stat.st_size > MAX_SKILL_FILE_SIZE
async with await path.open("rb") as handle:
raw_content = await handle.read(MAX_SKILL_FILE_SIZE)
return raw_content.decode("utf-8", errors="replace"), truncated
async def load_skill(self, name: str, explanation: Optional[str] = None) -> str:
"""加载指定 Skill 的完整说明并返回 JSON 字符串。"""
logger.info(f"加载 Skill: name={name}, explanation={explanation or '-'}")
try:
skill = await self._find_skill(name)
if not skill:
return json.dumps(
{
"success": False,
"message": f"未找到 Skill: {name}",
},
ensure_ascii=False,
)
content, truncated = await self._read_skill_content(skill["path"])
return json.dumps(
{
"success": True,
"skill": {
"id": skill.get("id"),
"name": skill.get("name"),
"description": skill.get("description"),
"path": skill.get("path"),
"allowed_tools": skill.get("allowed_tools", []),
},
"content": content,
"truncated": truncated,
},
ensure_ascii=False,
indent=2,
)
except Exception as err:
logger.error(f"加载 Skill 失败: {err}", exc_info=True)
return json.dumps(
{
"success": False,
"message": f"加载 Skill 时发生错误: {str(err)}",
},
ensure_ascii=False,
)
def _format_skill_tool_catalog(skills: list[SkillMetadata]) -> str:
"""渲染 Skill 工具描述中的可用技能目录。"""
if not skills:
return "(No skills are currently available.)"
return "\n".join(
f"- {skill['id']}: {skill['name']} - {skill['description']}"
for skill in skills
)
class SkillsMiddleware(AgentMiddleware[SkillsState, ContextT, ResponseT]): # noqa
"""加载并向系统提示词注入 Agent Skill 的中间件。
@@ -430,22 +539,55 @@ class SkillsMiddleware(AgentMiddleware[SkillsState, ContextT, ResponseT]): # no
self.sources = sources
self.bundled_skills_dir = bundled_skills_dir
self.system_prompt_template = SKILLS_SYSTEM_PROMPT
self._skill_provider = _SkillToolProvider(sources=sources)
self.tools = [
StructuredTool.from_function(
coroutine=self._skill_provider.load_skill,
name=SKILL_TOOL_NAME,
description=SKILL_TOOL_DESCRIPTION.format(
skills_catalog=_format_skill_tool_catalog(
self._load_skills_metadata()
)
),
args_schema=SkillToolInput,
tags=[ToolTag.Read, ToolTag.Skill],
)
]
def _format_skills_locations(self) -> str:
"""格式化技能位置信息用于系统提示词"""
locations = []
def _sync_bundled_skills(self) -> None:
"""将项目内置 Skill 同步到首个用户技能目录"""
if not self.bundled_skills_dir or not self.sources:
return
bundled = Path(self.bundled_skills_dir)
target = Path(self.sources[0])
try:
_sync_bundled_skills(bundled, target)
except Exception as e:
logger.warning("同步内置技能失败: %s", e)
for i, source_path in enumerate(self.sources):
suffix = " (higher priority)" if i == len(self.sources) - 1 else ""
locations.append(f"**MoviePilot Skills**: `{source_path}`{suffix}")
def _load_skills_metadata(self) -> list[SkillMetadata]:
"""同步加载当前配置目录中的 Skill 元数据。"""
self._sync_bundled_skills()
all_skills: dict[str, SkillMetadata] = {}
for source_path in self.sources:
for skill in _list_skills(Path(source_path)):
all_skills[skill["name"]] = skill
return list(all_skills.values())
return "\n".join(locations)
def _refresh_skill_tool_description(
self, skills: list[SkillMetadata]
) -> None:
"""刷新 skill 工具描述中的可用技能目录。"""
if not self.tools:
return
self.tools[0].description = SKILL_TOOL_DESCRIPTION.format(
skills_catalog=_format_skill_tool_catalog(skills)
)
def _format_skills_list(self, skills: list[SkillMetadata]) -> str:
"""格式化技能元数据列表用于系统提示词。"""
if not skills:
paths = [f"{source_path}" for source_path in self.sources]
return f"(No skills available yet. You can create skills in {' or '.join(paths)})"
return "(No skills available yet.)"
lines = []
for skill in skills:
@@ -456,18 +598,15 @@ class SkillsMiddleware(AgentMiddleware[SkillsState, ContextT, ResponseT]): # no
lines.append(desc_line)
if skill["allowed_tools"]:
lines.append(f" -> Allowed tools: {', '.join(skill['allowed_tools'])}")
lines.append(f" -> Read `{skill['path']}` for full instructions")
return "\n".join(lines)
def modify_request(self, request: ModelRequest[ContextT]) -> ModelRequest[ContextT]:
"""将技能文档注入模型请求的系统消息中。"""
skills_metadata = request.state.get("skills_metadata", []) # noqa
skills_locations = self._format_skills_locations()
skills_list = self._format_skills_list(skills_metadata)
skills_section = self.system_prompt_template.format(
skills_locations=skills_locations,
skills_list=skills_list,
)
@@ -489,14 +628,7 @@ class SkillsMiddleware(AgentMiddleware[SkillsState, ContextT, ResponseT]): # no
if "skills_metadata" in state:
return None
# 自动同步内置技能到首个用户技能目录
if self.bundled_skills_dir and self.sources:
bundled = Path(self.bundled_skills_dir)
target = Path(self.sources[0])
try:
_sync_bundled_skills(bundled, target)
except Exception as e:
logger.warning("同步内置技能失败: %s", e)
self._sync_bundled_skills()
all_skills: dict[str, SkillMetadata] = {}
@@ -511,6 +643,7 @@ class SkillsMiddleware(AgentMiddleware[SkillsState, ContextT, ResponseT]): # no
all_skills[skill["name"]] = skill
skills = list(all_skills.values())
self._refresh_skill_tool_description(skills)
return SkillsStateUpdate(skills_metadata=skills)
async def awrap_model_call(
@@ -525,4 +658,4 @@ class SkillsMiddleware(AgentMiddleware[SkillsState, ContextT, ResponseT]): # no
return await handler(modified_request)
__all__ = ["SkillMetadata", "SkillsMiddleware"]
__all__ = ["SKILL_TOOL_NAME", "SkillMetadata", "SkillsMiddleware"]

View File

@@ -1,7 +1,6 @@
"""提示词管理器"""
import shutil
import socket
from dataclasses import dataclass, field
from pathlib import Path
from string import Formatter
@@ -24,8 +23,6 @@ from app.utils.system import SystemUtils
SYSTEM_TASKS_FILE = "System Tasks.yaml"
SYSTEM_TASKS_SCHEMA_VERSION = 2
COMMON_SHELL_COMMANDS = (
# 只探测会明显改变 Agent 执行策略的可选能力。基础命令、语言运行时、
# 包管理器、服务管理器和数据库客户端默认不做启动探测,减少 which 扫描量。
"ssh",
"scp",
"sftp",
@@ -91,7 +88,7 @@ class PromptManager:
self.prompts_cache: Dict[str, str] = {}
self._system_tasks_cache: Optional[SystemTasksDefinition] = None
self._system_tasks_signature: Optional[tuple[int, int]] = None
self._available_shell_commands_cache: Optional[list[tuple[str, str]]] = None
self._available_shell_command_names_cache: Optional[list[str]] = None
def load_prompt(self, prompt_name: str) -> str:
"""
@@ -281,90 +278,62 @@ class PromptManager:
def _get_moviepilot_info(self) -> str:
"""
获取MoviePilot系统信息用于注入到系统提示词中
获取需要常驻注入的最小 MoviePilot 运行信息。
"""
# 获取主机名和IP地址
try:
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
except Exception: # noqa
hostname = "localhost"
ip_address = "127.0.0.1"
# 配置文件和日志文件目录
config_path = str(settings.CONFIG_PATH)
log_path = str(settings.LOG_PATH)
# API地址构建
api_port = settings.PORT
api_path = settings.API_V1_STR
# 数据库信息只保留非敏感连接摘要,凭据由内部工具自行读取。
db_type = settings.DB_TYPE
if db_type == "sqlite":
db_info = "SQLite本地配置目录数据库路径由内部工具读取"
else:
db_info = (
f"PostgreSQL{settings.DB_POSTGRESQL_TARGET}/"
f"{settings.DB_POSTGRESQL_DATABASE},凭据由内部工具读取)"
)
# 保留日期用于提供“今天是哪天”的稳定上下文,但不再注入秒级时间,
# 避免每次请求都生成不同的 system prompt影响 provider 侧 cache 命中率。
info_lines = [
f"- 当前日期: {strftime('%Y-%m-%d')}",
f"- 运行环境: {SystemUtils.platform} {'docker' if SystemUtils.is_docker() else ''}",
f"- 主机名: {hostname}",
f"- IP地址: {ip_address}",
f"- API端口: {api_port}",
f"- API路径: {api_path}",
"- API认证: 由内部工具自动处理,不在提示词中暴露令牌",
f"- 外网域名: {settings.APP_DOMAIN or '未设置'}",
f"- 数据库类型: {db_type}",
f"- 数据库: {db_info}",
f"- 配置文件目录: {config_path}",
f"- 日志文件目录: {log_path}",
f"- 系统安装目录: {settings.ROOT_PATH}",
f"- 插件安装目录: {settings.ROOT_PATH / 'app' / 'plugins'}",
"- 详细运行状态、数据库、API 和配置值需要时通过 `query_doctor_report`、`query_system_settings` 或 `execute_command` 查询。",
]
available_commands = self._get_available_shell_commands()
if available_commands:
info_lines.append("- 可用系统命令(可通过 `execute_command` 调用):")
path_lines = self._get_runtime_path_lines()
if path_lines:
info_lines.extend(
f" - {command}: {path}" for command, path in available_commands
[
"- 关键运行路径(必要时可用文件/命令工具读取,避免扫描无关目录):",
*path_lines,
]
)
# `rg` 同时覆盖文件枚举和文本检索,且比通用 shell 查找更适合
# Agent 的代码阅读与定位场景;只有在它不可用或不适合时才退回其他工具。
if any(command == "rg" for command, _ in available_commands):
available_commands = self._get_available_shell_command_names()
if available_commands:
info_lines.append(
"- 已安装的常用系统命令(仅列命令名,可通过 `execute_command` 调用): "
+ ", ".join(f"`{command}`" for command in available_commands)
)
if "rg" in available_commands:
info_lines.append(
"- When searching files or text, prefer `rg` / `rg --files`. Only fall back to other search tools when `rg` is unavailable or unsuitable."
"- 搜索文件或文本时优先使用 `rg` / `rg --files`,不适合或不可用时再使用其他命令。"
)
return "\n".join(info_lines)
def _get_available_shell_commands(self) -> list[tuple[str, str]]:
"""
探测 PATH 中已经安装的常用命令。
@staticmethod
def _get_runtime_path_lines() -> list[str]:
"""返回基础系统提示词需要常驻注入的全局运行路径。"""
paths = {
"项目根目录": settings.ROOT_PATH,
"配置目录": settings.CONFIG_PATH,
"临时目录": settings.TEMP_PATH,
"日志目录": settings.LOG_PATH,
"主日志文件": settings.LOG_PATH / "moviepilot.log",
}
return [f" - {label}: `{path}`" for label, path in paths.items()]
这里只使用 shutil.which 做无副作用查找,不实际执行命令;执行权限、
高风险操作确认和输出限制仍由 execute_command 工具负责。探测结果
在进程内缓存,避免每次组装提示词都重复扫描 PATH。
"""
if self._available_shell_commands_cache is not None:
return self._available_shell_commands_cache
def _get_available_shell_command_names(self) -> list[str]:
"""探测 PATH 中可用的常用命令名称,不把绝对路径注入提示词。"""
if self._available_shell_command_names_cache is not None:
return self._available_shell_command_names_cache
available_commands: list[tuple[str, str]] = []
for command in COMMON_SHELL_COMMANDS:
command_path = shutil.which(command)
if command_path:
available_commands.append((command, command_path))
self._available_shell_commands_cache = available_commands
available_commands = [
command for command in COMMON_SHELL_COMMANDS if shutil.which(command)
]
self._available_shell_command_names_cache = available_commands
return available_commands
def clear_available_shell_commands_cache(self) -> None:
"""清理可用系统命令缓存,供测试或运行时手动刷新使用。"""
self._available_shell_commands_cache = None
def clear_available_shell_command_names_cache(self) -> None:
"""清理可用命令名称缓存,供测试或运行时手动刷新使用。"""
self._available_shell_command_names_cache = None
@staticmethod
def _generate_formatting_instructions(caps: ChannelCapabilities) -> str:

View File

@@ -166,21 +166,11 @@ class AgentRuntimeConfig:
def render_prompt_sections(self) -> str:
"""渲染进入系统提示词的运行时片段。"""
sections: list[str] = [
"<agent_runtime_config>",
f"- Active persona: `{self.active_persona}`",
f"- Active persona source: `{self.persona.path}`",
]
if self.available_personas:
sections.append("- Available personas:")
sections.extend(f" - {persona.summary_line()}" for persona in self.available_personas)
if self.available_subagents:
sections.append("- Available subagents:")
sections.extend(
f" - {subagent.summary_line()}"
for subagent in self.available_subagents
)
sections.append("</agent_runtime_config>")
sections: list[str] = ["<agent_runtime_config>", f"- Active persona: `{self.active_persona}`",
f"- Active persona file: `personas/{self.persona.persona_id}/{PERSONA_FILE}`",
"- Use `query_personas` before switching persona when the requested speaking style is unclear.",
"- Subagent availability is exposed by the subagent task tools; do not rely on this runtime section as a catalog.",
"</agent_runtime_config>"]
if self.warnings:
sections.extend(

View File

@@ -75,7 +75,6 @@ from app.agent.tools.impl.uninstall_plugin import UninstallPluginTool
from app.agent.tools.impl.run_slash_command import RunSlashCommandTool
from app.agent.tools.impl.list_slash_commands import ListSlashCommandsTool
from app.agent.tools.impl.query_custom_identifiers import QueryCustomIdentifiersTool
from app.agent.tools.impl.query_activity_log import QueryActivityLogTool
from app.agent.tools.impl.query_doctor_report import QueryDoctorReportTool
from app.agent.tools.impl.update_custom_identifiers import UpdateCustomIdentifiersTool
from app.agent.tools.impl.query_system_settings import QuerySystemSettingsTool
@@ -165,7 +164,6 @@ class MoviePilotToolFactory:
UninstallPluginTool,
RunSlashCommandTool,
ListSlashCommandsTool,
QueryActivityLogTool,
QueryDoctorReportTool,
QueryCustomIdentifiersTool,
UpdateCustomIdentifiersTool,
@@ -182,7 +180,6 @@ class MoviePilotToolFactory:
"read_file",
"edit_file",
"execute_command",
"query_activity_log",
"ask_user_choice",
)

View File

@@ -1,120 +0,0 @@
"""查询 Agent 活动日志工具。"""
import json
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.middleware.activity_log import (
DEFAULT_QUERY_DAYS,
DEFAULT_QUERY_LIMIT,
query_activity_logs,
)
from app.agent.runtime import agent_runtime_manager
from app.agent.tools.base import MoviePilotTool
from app.agent.tools.tags import ToolTag
from app.log import logger
class QueryActivityLogInput(BaseModel):
"""查询活动日志工具的输入参数模型。"""
explanation: Optional[str] = Field(
None,
description="Clear explanation of why this tool is being used in the current context",
)
keyword: Optional[str] = Field(
None,
description=(
"Optional plain-text keyword to filter activity summaries. Use short title, path, site, task, "
"or status fragments; omit it to inspect latest entries."
),
)
use_regex: Optional[bool] = Field(
False,
description=(
"Whether to treat keyword as a regular expression. Defaults to false; enable only for "
"alternative or pattern matching."
),
)
date: Optional[str] = Field(
None,
description="Optional exact date in YYYY-MM-DD format. If omitted, recent days are searched.",
)
days: Optional[int] = Field(
DEFAULT_QUERY_DAYS,
description="Number of recent days to search when date is not specified.",
)
limit: Optional[int] = Field(
DEFAULT_QUERY_LIMIT,
description="Maximum number of activity entries to return.",
)
class QueryActivityLogTool(MoviePilotTool):
"""
Agent 活动日志只读查询工具。
"""
name: str = "query_activity_log"
tags: list[str] = [
ToolTag.Read,
ToolTag.System,
]
description: str = (
"Query recent MoviePilot Agent activity logs on demand. Use this when the user asks what was done before, "
"asks to continue a previous task, or explicitly references recent agent activity. Supports keyword, date, "
"recent-day window, limit, and optional regex filters. If a keyword search returns no results, retry with "
"a shorter keyword, a larger days window, or no keyword to inspect recent entries."
)
args_schema: Type[BaseModel] = QueryActivityLogInput
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据查询参数生成友好的提示消息。"""
keyword = kwargs.get("keyword")
date = kwargs.get("date")
if date and keyword:
return f"查询活动日志: {date} / {keyword}"
if date:
return f"查询活动日志: {date}"
if keyword:
return f"搜索活动日志: {keyword}"
return "查询近期活动日志"
async def run(
self,
keyword: Optional[str] = None,
use_regex: Optional[bool] = False,
date: Optional[str] = None,
days: Optional[int] = DEFAULT_QUERY_DAYS,
limit: Optional[int] = DEFAULT_QUERY_LIMIT,
**kwargs,
) -> str:
"""
查询活动日志并返回 JSON 字符串。
"""
logger.info(
f"执行工具: {self.name}, keyword={keyword}, use_regex={use_regex}, date={date}, "
f"days={days}, limit={limit}"
)
try:
payload = await self.run_blocking(
"default",
query_activity_logs,
str(agent_runtime_manager.activity_dir),
keyword=keyword,
use_regex=bool(use_regex),
date=date,
days=days or DEFAULT_QUERY_DAYS,
limit=limit,
)
return json.dumps(payload, ensure_ascii=False, indent=2)
except Exception as err:
logger.error(f"查询活动日志失败: {err}", exc_info=True)
return json.dumps(
{
"success": False,
"message": f"查询活动日志时发生错误: {str(err)}",
},
ensure_ascii=False,
)

View File

@@ -34,6 +34,7 @@ class ToolTag(str, Enum):
SlashCommand = "slash_command"
Recommendation = "recommendation"
Metadata = "metadata"
Skill = "skill"
__all__ = ["ToolTag"]

View File

@@ -8,13 +8,13 @@ from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, Tool
from app.agent.middleware.activity_log import (
ActivityLogMiddleware,
QUERY_ACTIVITY_LOG_TOOL_NAME,
_summarize_with_llm,
load_activity_log_index,
query_activity_logs,
)
from app.agent.tools.factory import MoviePilotToolFactory
from app.agent.tools.impl.query_activity_log import QueryActivityLogTool
from app.agent.tools.manager import MoviePilotToolsManager
from app.agent.tools.tags import ToolTag
def _write_activity_log(activity_dir, date_str: str, lines: list[str]) -> None:
@@ -230,22 +230,29 @@ def test_query_activity_logs_reports_invalid_regex(tmp_path):
assert payload["entries"] == []
def test_query_activity_log_tool_returns_json_payload(tmp_path):
"""query_activity_log 工具应返回结构化 JSON 查询结果"""
def test_activity_log_middleware_exposes_query_tool(tmp_path):
"""ActivityLogMiddleware 应以中间件工具形式暴露活动日志查询"""
middleware = ActivityLogMiddleware(activity_dir=str(tmp_path))
assert [tool.name for tool in middleware.tools] == [QUERY_ACTIVITY_LOG_TOOL_NAME]
assert ToolTag.Read in middleware.tools[0].tags
assert ToolTag.System in middleware.tools[0].tags
assert "recent MoviePilot Agent activity logs" in middleware.tools[0].description
def test_activity_log_middleware_query_tool_returns_json_payload(tmp_path):
"""query_activity_log 中间件工具应返回结构化 JSON 查询结果。"""
_write_activity_log(
tmp_path,
"2026-06-18",
["- **10:00** 帮用户整理了电影 A"],
)
tool = QueryActivityLogTool(session_id="activity-session", user_id="10001")
middleware = ActivityLogMiddleware(activity_dir=str(tmp_path))
tool = middleware.tools[0]
with patch(
"app.agent.tools.impl.query_activity_log.agent_runtime_manager.activity_dir",
tmp_path,
):
result = asyncio.run(
tool.run(keyword="整理", date="2026-06-18", limit=5)
)
result = asyncio.run(
tool.ainvoke({"keyword": "整理", "date": "2026-06-18", "limit": 5})
)
payload = json.loads(result)
assert payload["success"] is True
@@ -253,8 +260,8 @@ def test_query_activity_log_tool_returns_json_payload(tmp_path):
assert payload["entries"][0]["summary"] == "帮用户整理了电影 A"
def test_factory_registers_activity_log_tool():
"""工具工厂应注册活动日志查询工具。"""
def test_factory_does_not_register_activity_log_tool():
"""活动日志查询工具应由中间件注册,不应进入全局工具工厂"""
with patch(
"app.agent.tools.factory.PluginManager.get_plugin_agent_tools",
return_value=[],
@@ -265,22 +272,4 @@ def test_factory_registers_activity_log_tool():
)
tool_names = {tool.name for tool in tools}
assert "query_activity_log" in tool_names
def test_mcp_tool_manager_exposes_activity_log_tool():
"""MCP 工具管理器应暴露活动日志查询工具。"""
tool = QueryActivityLogTool(session_id="activity-session", user_id="10001")
with patch(
"app.agent.tools.manager.MoviePilotToolFactory.create_tools",
return_value=[tool],
):
manager = MoviePilotToolsManager(is_admin=True)
tool_definitions = manager.list_tools()
assert [item.name for item in tool_definitions] == ["query_activity_log"]
schema = tool_definitions[0].input_schema
assert "keyword" in schema["properties"]
assert "use_regex" in schema["properties"]
assert "date" in schema["properties"]
assert QUERY_ACTIVITY_LOG_TOOL_NAME not in tool_names

View File

@@ -13,6 +13,8 @@ from app.agent import (
_MessageTask,
)
from app.agent.memory import memory_manager
from app.agent.middleware.activity_log import QUERY_ACTIVITY_LOG_TOOL_NAME
from app.agent.middleware.skills import SKILL_TOOL_NAME
from app.agent.middleware.subagents import (
SUBAGENT_CONTROL_TOOL_NAME,
SUBAGENT_TASK_TOOL_NAME,
@@ -67,6 +69,16 @@ class StreamChunkTimeoutError(RuntimeError):
"""模拟 langchain_openai 的流式分块超时异常。"""
def _fake_skills_middleware(tool=None):
"""构造带 tools 属性的 SkillsMiddleware 测试替身。"""
return SimpleNamespace(name="skills", tools=[] if tool is None else [tool])
def _fake_activity_log_middleware(tool=None):
"""构造带 tools 属性的 ActivityLogMiddleware 测试替身。"""
return SimpleNamespace(name="activity", tools=[] if tool is None else [tool])
class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase):
async def test_background_non_streaming_does_not_send_by_default(self):
agent = MoviePilotAgent(session_id="bg-test", user_id="system")
@@ -394,11 +406,17 @@ class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase):
"app.agent.MoviePilotToolFactory.get_tool_selector_always_include_names",
return_value=[],
),
patch("app.agent.SkillsMiddleware", side_effect=lambda *args, **kwargs: "skills"),
patch(
"app.agent.SkillsMiddleware",
side_effect=lambda *args, **kwargs: _fake_skills_middleware(),
),
patch("app.agent.JobsMiddleware", side_effect=lambda *args, **kwargs: "jobs"),
patch("app.agent.RuntimeConfigMiddleware", side_effect=lambda *args, **kwargs: "runtime"),
patch("app.agent.MemoryMiddleware", side_effect=lambda *args, **kwargs: "memory"),
patch("app.agent.ActivityLogMiddleware", side_effect=lambda *args, **kwargs: "activity"),
patch(
"app.agent.ActivityLogMiddleware",
side_effect=lambda *args, **kwargs: _fake_activity_log_middleware(),
),
patch("app.agent.SummarizationMiddleware", side_effect=lambda *args, **kwargs: "summary"),
patch("app.agent.PatchToolCallsMiddleware", side_effect=lambda *args, **kwargs: "patch"),
patch("app.agent.UsageMiddleware", side_effect=lambda *args, **kwargs: "usage"),
@@ -408,10 +426,73 @@ class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase):
created = await agent._create_agent(streaming=False)
self.assertEqual(
["skills", "jobs", "runtime", "memory", "summary", "patch", "usage"],
created["middleware"],
[
"skills",
"jobs",
"runtime",
"memory",
"summary",
"patch",
"usage",
],
[getattr(item, "name", item) for item in created["middleware"]],
)
async def test_create_agent_registers_skill_tool_from_middleware(self):
"""SkillsMiddleware 暴露的 skill 工具应进入 Agent 工具和筛选候选。"""
captured = {}
skill_tool = SimpleNamespace(name=SKILL_TOOL_NAME)
agent = MoviePilotAgent(session_id="normal-session", user_id="system")
agent._initialize_tools = lambda: []
agent._initialize_subagent_tools = lambda: []
def _tool_selector(**kwargs):
captured["selection_tools"] = kwargs["selection_tools"]
captured["always_include"] = kwargs["always_include"]
return "selector"
with (
patch.object(settings, "LLM_MAX_TOOLS", 5),
patch.object(agent, "_initialize_llm", new=AsyncMock(return_value=object())),
patch("app.agent.prompt_manager.get_agent_prompt", return_value="PROMPT"),
patch("app.agent.create_subagent_middlewares", return_value=([], [])),
patch(
"app.agent.MoviePilotToolFactory.get_tool_selector_always_include_names",
return_value=[],
),
patch(
"app.agent.SkillsMiddleware",
side_effect=lambda *args, **kwargs: _fake_skills_middleware(skill_tool),
),
patch("app.agent.JobsMiddleware", side_effect=lambda *args, **kwargs: "jobs"),
patch(
"app.agent.RuntimeConfigMiddleware",
side_effect=lambda *args, **kwargs: "runtime",
),
patch("app.agent.MemoryMiddleware", side_effect=lambda *args, **kwargs: "memory"),
patch(
"app.agent.ActivityLogMiddleware",
side_effect=lambda *args, **kwargs: _fake_activity_log_middleware(),
),
patch(
"app.agent.SummarizationMiddleware",
side_effect=lambda *args, **kwargs: "summary",
),
patch(
"app.agent.PatchToolCallsMiddleware",
side_effect=lambda *args, **kwargs: "patch",
),
patch("app.agent.UsageMiddleware", side_effect=lambda *args, **kwargs: "usage"),
patch("app.agent.ToolSelectorMiddleware", side_effect=_tool_selector),
patch("app.agent.InMemorySaver", return_value="checkpointer"),
patch("app.agent.create_agent", side_effect=lambda **kwargs: kwargs),
):
created = await agent._create_agent(streaming=False)
self.assertIn(skill_tool, created["tools"])
self.assertIn(skill_tool, captured["selection_tools"])
self.assertIn(SKILL_TOOL_NAME, captured["always_include"])
async def test_create_agent_excludes_activity_log_without_message_context(self):
"""无渠道信息的后台捕获任务不应注入活动日志。"""
agent = MoviePilotAgent(
@@ -431,13 +512,28 @@ class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase):
"app.agent.MoviePilotToolFactory.get_tool_selector_always_include_names",
return_value=[],
),
patch("app.agent.SkillsMiddleware", side_effect=lambda *args, **kwargs: "skills"),
patch(
"app.agent.SkillsMiddleware",
side_effect=lambda *args, **kwargs: _fake_skills_middleware(),
),
patch("app.agent.JobsMiddleware", side_effect=lambda *args, **kwargs: "jobs"),
patch("app.agent.RuntimeConfigMiddleware", side_effect=lambda *args, **kwargs: "runtime"),
patch(
"app.agent.RuntimeConfigMiddleware",
side_effect=lambda *args, **kwargs: "runtime",
),
patch("app.agent.MemoryMiddleware", side_effect=lambda *args, **kwargs: "memory"),
patch("app.agent.ActivityLogMiddleware", side_effect=lambda *args, **kwargs: "activity"),
patch("app.agent.SummarizationMiddleware", side_effect=lambda *args, **kwargs: "summary"),
patch("app.agent.PatchToolCallsMiddleware", side_effect=lambda *args, **kwargs: "patch"),
patch(
"app.agent.ActivityLogMiddleware",
side_effect=lambda *args, **kwargs: _fake_activity_log_middleware(),
),
patch(
"app.agent.SummarizationMiddleware",
side_effect=lambda *args, **kwargs: "summary",
),
patch(
"app.agent.PatchToolCallsMiddleware",
side_effect=lambda *args, **kwargs: "patch",
),
patch("app.agent.UsageMiddleware", side_effect=lambda *args, **kwargs: "usage"),
patch("app.agent.InMemorySaver", return_value="checkpointer"),
patch("app.agent.create_agent", side_effect=lambda **kwargs: kwargs),
@@ -445,8 +541,16 @@ class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase):
created = await agent._create_agent(streaming=False)
self.assertEqual(
["skills", "jobs", "runtime", "memory", "summary", "patch", "usage"],
created["middleware"],
[
"skills",
"jobs",
"runtime",
"memory",
"summary",
"patch",
"usage",
],
[getattr(item, "name", item) for item in created["middleware"]],
)
def test_message_tool_is_not_always_included_by_tool_selector(self):
@@ -459,15 +563,77 @@ class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase):
self.assertNotIn("send_message", always_include)
def test_activity_log_tool_is_always_included_by_tool_selector(self):
"""活动日志查询工具应绕过工具筛选"""
activity_log_tool = SimpleNamespace(name="query_activity_log")
def test_activity_log_tool_is_not_registered_by_tool_factory(self):
"""活动日志查询工具不应再由全局工具工厂保留"""
activity_log_tool = SimpleNamespace(name=QUERY_ACTIVITY_LOG_TOOL_NAME)
always_include = MoviePilotToolFactory.get_tool_selector_always_include_names(
[activity_log_tool]
)
self.assertIn("query_activity_log", always_include)
self.assertNotIn(QUERY_ACTIVITY_LOG_TOOL_NAME, always_include)
async def test_create_agent_registers_activity_log_tool_from_middleware(self):
"""ActivityLogMiddleware 暴露的工具应进入 Agent 工具和筛选候选。"""
captured = {}
activity_tool = SimpleNamespace(name=QUERY_ACTIVITY_LOG_TOOL_NAME)
agent = MoviePilotAgent(
session_id="normal-session",
user_id="system",
channel="Web",
source="openai",
)
agent._initialize_tools = lambda: []
agent._initialize_subagent_tools = lambda: []
def _tool_selector(**kwargs):
captured["selection_tools"] = kwargs["selection_tools"]
captured["always_include"] = kwargs["always_include"]
return "selector"
with (
patch.object(settings, "LLM_MAX_TOOLS", 5),
patch.object(agent, "_initialize_llm", new=AsyncMock(return_value=object())),
patch("app.agent.prompt_manager.get_agent_prompt", return_value="PROMPT"),
patch("app.agent.create_subagent_middlewares", return_value=([], [])),
patch(
"app.agent.MoviePilotToolFactory.get_tool_selector_always_include_names",
return_value=[],
),
patch(
"app.agent.SkillsMiddleware",
side_effect=lambda *args, **kwargs: _fake_skills_middleware(),
),
patch("app.agent.JobsMiddleware", side_effect=lambda *args, **kwargs: "jobs"),
patch(
"app.agent.RuntimeConfigMiddleware",
side_effect=lambda *args, **kwargs: "runtime",
),
patch("app.agent.MemoryMiddleware", side_effect=lambda *args, **kwargs: "memory"),
patch(
"app.agent.ActivityLogMiddleware",
side_effect=lambda *args, **kwargs: _fake_activity_log_middleware(
activity_tool
),
),
patch(
"app.agent.SummarizationMiddleware",
side_effect=lambda *args, **kwargs: "summary",
),
patch(
"app.agent.PatchToolCallsMiddleware",
side_effect=lambda *args, **kwargs: "patch",
),
patch("app.agent.UsageMiddleware", side_effect=lambda *args, **kwargs: "usage"),
patch("app.agent.ToolSelectorMiddleware", side_effect=_tool_selector),
patch("app.agent.InMemorySaver", return_value="checkpointer"),
patch("app.agent.create_agent", side_effect=lambda **kwargs: kwargs),
):
created = await agent._create_agent(streaming=False)
self.assertIn(activity_tool, created["tools"])
self.assertIn(activity_tool, captured["selection_tools"])
self.assertIn(QUERY_ACTIVITY_LOG_TOOL_NAME, captured["always_include"])
async def test_create_agent_always_includes_subagent_tools(self):
"""工具筛选开启时应保留同步和异步子代理入口。"""
@@ -498,13 +664,28 @@ class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase):
"app.agent.MoviePilotToolFactory.get_tool_selector_always_include_names",
return_value=[],
),
patch("app.agent.SkillsMiddleware", side_effect=lambda *args, **kwargs: "skills"),
patch(
"app.agent.SkillsMiddleware",
side_effect=lambda *args, **kwargs: _fake_skills_middleware(),
),
patch("app.agent.JobsMiddleware", side_effect=lambda *args, **kwargs: "jobs"),
patch("app.agent.RuntimeConfigMiddleware", side_effect=lambda *args, **kwargs: "runtime"),
patch(
"app.agent.RuntimeConfigMiddleware",
side_effect=lambda *args, **kwargs: "runtime",
),
patch("app.agent.MemoryMiddleware", side_effect=lambda *args, **kwargs: "memory"),
patch("app.agent.ActivityLogMiddleware", side_effect=lambda *args, **kwargs: "activity"),
patch("app.agent.SummarizationMiddleware", side_effect=lambda *args, **kwargs: "summary"),
patch("app.agent.PatchToolCallsMiddleware", side_effect=lambda *args, **kwargs: "patch"),
patch(
"app.agent.ActivityLogMiddleware",
side_effect=lambda *args, **kwargs: _fake_activity_log_middleware(),
),
patch(
"app.agent.SummarizationMiddleware",
side_effect=lambda *args, **kwargs: "summary",
),
patch(
"app.agent.PatchToolCallsMiddleware",
side_effect=lambda *args, **kwargs: "patch",
),
patch("app.agent.UsageMiddleware", side_effect=lambda *args, **kwargs: "usage"),
patch("app.agent.ToolSelectorMiddleware", side_effect=_tool_selector),
patch("app.agent.InMemorySaver", return_value="checkpointer"),
@@ -534,13 +715,28 @@ class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase):
"app.agent.MoviePilotToolFactory.get_tool_selector_always_include_names",
return_value=[],
),
patch("app.agent.SkillsMiddleware", side_effect=lambda *args, **kwargs: "skills"),
patch(
"app.agent.SkillsMiddleware",
side_effect=lambda *args, **kwargs: _fake_skills_middleware(),
),
patch("app.agent.JobsMiddleware", side_effect=lambda *args, **kwargs: "jobs"),
patch("app.agent.RuntimeConfigMiddleware", side_effect=lambda *args, **kwargs: "runtime"),
patch(
"app.agent.RuntimeConfigMiddleware",
side_effect=lambda *args, **kwargs: "runtime",
),
patch("app.agent.MemoryMiddleware", side_effect=lambda *args, **kwargs: "memory"),
patch("app.agent.ActivityLogMiddleware", side_effect=lambda *args, **kwargs: "activity"),
patch("app.agent.SummarizationMiddleware", side_effect=lambda *args, **kwargs: "summary"),
patch("app.agent.PatchToolCallsMiddleware", side_effect=lambda *args, **kwargs: "patch"),
patch(
"app.agent.ActivityLogMiddleware",
side_effect=lambda *args, **kwargs: _fake_activity_log_middleware(),
),
patch(
"app.agent.SummarizationMiddleware",
side_effect=lambda *args, **kwargs: "summary",
),
patch(
"app.agent.PatchToolCallsMiddleware",
side_effect=lambda *args, **kwargs: "patch",
),
patch("app.agent.UsageMiddleware", side_effect=lambda *args, **kwargs: "usage"),
patch("app.agent.InMemorySaver", return_value="checkpointer"),
patch("app.agent.create_agent", side_effect=lambda **kwargs: kwargs),
@@ -548,8 +744,17 @@ class AgentBackgroundOutputTest(unittest.IsolatedAsyncioTestCase):
created = await agent._create_agent(streaming=False)
self.assertEqual(
["skills", "jobs", "runtime", "memory", "activity", "summary", "patch", "usage"],
created["middleware"],
[
"skills",
"jobs",
"runtime",
"memory",
"activity",
"summary",
"patch",
"usage",
],
[getattr(item, "name", item) for item in created["middleware"]],
)
async def test_run_background_prompt_forces_disable_message_tools_when_capture_only(self):

View File

@@ -3,7 +3,7 @@ from app.core.config import settings
def test_moviepilot_info_does_not_expose_api_token_or_database_password(monkeypatch) -> None:
"""系统提示词中的运行信息不能暴露 API 令牌或数据库密码"""
"""系统提示词中的运行信息不能暴露敏感或细粒度部署信息"""
monkeypatch.setattr(settings, "API_TOKEN", "prompt-secret-token")
monkeypatch.setattr(settings, "DB_TYPE", "postgresql")
monkeypatch.setattr(settings, "DB_POSTGRESQL_HOST", "db.example.local")
@@ -18,5 +18,47 @@ def test_moviepilot_info_does_not_expose_api_token_or_database_password(monkeypa
assert "prompt-secret-token" not in moviepilot_info
assert "prompt-db-password" not in moviepilot_info
assert "moviepilot_user:prompt-db-password" not in moviepilot_info
assert "API认证: 由内部工具自动处理" in moviepilot_info
assert "凭据由内部工具读取" in moviepilot_info
assert "db.example.local" not in moviepilot_info
assert str(settings.CONFIG_PATH) in moviepilot_info
assert str(settings.TEMP_PATH) in moviepilot_info
assert str(settings.LOG_PATH) in moviepilot_info
assert str(settings.LOG_PATH / "moviepilot.log") in moviepilot_info
assert str(settings.LOG_PATH / "moviepilot.stdout.log") not in moviepilot_info
assert str(settings.LOG_PATH / "moviepilot.frontend.stdout.log") not in moviepilot_info
assert str(settings.CONFIG_PATH / "agent") not in moviepilot_info
assert str(settings.CONFIG_PATH / "agent" / "memory") not in moviepilot_info
assert str(settings.CONFIG_PATH / "agent" / "skills") not in moviepilot_info
assert str(settings.CONFIG_PATH / "agent" / "jobs") not in moviepilot_info
assert str(settings.CONFIG_PATH / "agent" / "activity") not in moviepilot_info
assert "主机名" not in moviepilot_info
assert "IP地址" not in moviepilot_info
assert "API端口" not in moviepilot_info
assert "数据库类型" not in moviepilot_info
assert "PostgreSQL" not in moviepilot_info
assert "query_doctor_report" in moviepilot_info
assert "query_system_settings" in moviepilot_info
def test_moviepilot_info_lists_command_names_without_paths(monkeypatch) -> None:
"""系统提示词可注入命令名称,但不应暴露命令绝对路径。"""
command_paths = {
"git": "/usr/bin/git",
"rg": "/opt/homebrew/bin/rg",
"ffmpeg": "/usr/local/bin/ffmpeg",
}
monkeypatch.setattr(
"app.agent.prompt.shutil.which",
lambda command: command_paths.get(command),
)
manager = PromptManager()
moviepilot_info = manager._get_moviepilot_info()
assert "`git`" in moviepilot_info
assert "`rg`" in moviepilot_info
assert "`ffmpeg`" in moviepilot_info
assert "/usr/bin/git" not in moviepilot_info
assert "/opt/homebrew/bin/rg" not in moviepilot_info
assert "/usr/local/bin/ffmpeg" not in moviepilot_info
assert "rg --files" in moviepilot_info

View File

@@ -124,9 +124,10 @@ class TestAgentRuntimeConfig(unittest.TestCase):
self.assertIn("<agent_persona>", sections)
self.assertIn("Active persona: `default`", sections)
self.assertIn("`guide`", sections)
self.assertIn("Available subagents:", sections)
self.assertIn("`media-researcher`", sections)
self.assertIn("query_personas", sections)
self.assertNotIn("Available personas:", sections)
self.assertNotIn("Available subagents:", sections)
self.assertNotIn("`media-researcher`", sections)
def test_set_active_persona_supports_id_and_alias(self):
manager = self._manager()

View File

@@ -1,33 +1,113 @@
import tempfile
import unittest
from pathlib import Path
import json
import pytest
from anyio import Path as AsyncPath
from langchain.agents.middleware.types import ModelRequest
from langchain_core.messages import SystemMessage
from app.agent.middleware.skills import _alist_skills
from app.agent.middleware.skills import (
SKILL_TOOL_NAME,
SkillsMiddleware,
_alist_skills,
)
from app.agent.tools.tags import ToolTag
class SkillsMiddlewareAsyncTest(unittest.IsolatedAsyncioTestCase):
async def test_alist_skills_sorts_skill_directories_by_name(self):
with tempfile.TemporaryDirectory() as tempdir:
root = Path(tempdir)
@pytest.fixture
def anyio_backend():
"""使用 asyncio 后端运行 anyio 异步测试。"""
return "asyncio"
for skill_id in ("z-skill", "a-skill", "m-skill"):
skill_dir = root / skill_id
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text(
f"""---
name: {skill_id}
description: test
def _write_skill(root, skill_id: str, name: str | None = None) -> None:
"""写入测试用 Skill 文件。"""
skill_dir = root / skill_id
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text(
f"""---
name: {name or skill_id}
description: test skill {skill_id}
allowed-tools: "read_file execute_command"
---
# {skill_id}
Use this skill carefully.
""",
encoding="utf-8",
)
encoding="utf-8",
)
skills = await _alist_skills(AsyncPath(str(root)))
self.assertEqual(
["a-skill", "m-skill", "z-skill"],
[skill["id"] for skill in skills],
)
@pytest.mark.anyio
async def test_alist_skills_sorts_skill_directories_by_name(tmp_path):
"""异步扫描技能目录时应按目录名稳定排序。"""
for skill_id in ("z-skill", "a-skill", "m-skill"):
_write_skill(tmp_path, skill_id)
skills = await _alist_skills(AsyncPath(str(tmp_path)))
assert ["a-skill", "m-skill", "z-skill"] == [
skill["id"] for skill in skills
]
def test_skills_middleware_exposes_skill_tool(tmp_path):
"""SkillsMiddleware 应以中间件工具形式暴露 skill。"""
_write_skill(tmp_path, "moviepilot-cli")
middleware = SkillsMiddleware(sources=[str(tmp_path)])
assert [tool.name for tool in middleware.tools] == [SKILL_TOOL_NAME]
assert ToolTag.Read in middleware.tools[0].tags
assert ToolTag.Skill in middleware.tools[0].tags
assert "moviepilot-cli" in middleware.tools[0].description
@pytest.mark.anyio
async def test_skill_tool_loads_skill_by_id_and_name(tmp_path):
"""skill 工具应支持按 id 或 name 加载完整 SKILL.md。"""
_write_skill(tmp_path, "moviepilot-cli", name="MoviePilot CLI")
middleware = SkillsMiddleware(sources=[str(tmp_path)])
skill_tool = middleware.tools[0]
by_id = json.loads(await skill_tool.ainvoke({"name": "moviepilot-cli"}))
by_name = json.loads(await skill_tool.ainvoke({"name": "MoviePilot CLI"}))
assert by_id["success"] is True
assert by_id["skill"]["id"] == "moviepilot-cli"
assert "# moviepilot-cli" in by_id["content"]
assert by_name["success"] is True
assert by_name["skill"]["name"] == "MoviePilot CLI"
@pytest.mark.anyio
async def test_skill_tool_returns_not_found_for_unknown_skill(tmp_path):
"""skill 工具找不到技能时应返回结构化失败信息。"""
middleware = SkillsMiddleware(sources=[str(tmp_path)])
skill_tool = middleware.tools[0]
result = json.loads(await skill_tool.ainvoke({"name": "missing-skill"}))
assert result["success"] is False
assert "missing-skill" in result["message"]
def test_modify_request_instructs_model_to_use_skill_tool_without_paths(tmp_path):
"""系统提示应要求通过 skill 工具加载,而不是直接暴露文件读取路径。"""
_write_skill(tmp_path, "moviepilot-cli")
middleware = SkillsMiddleware(sources=[str(tmp_path)])
skills_metadata = middleware._load_skills_metadata()
request = ModelRequest(
model=None,
messages=[],
system_message=SystemMessage(content="BASE"),
state={"skills_metadata": skills_metadata},
runtime=None,
)
modified = middleware.modify_request(request)
system_content = str(modified.system_message.content)
assert "`skill` tool" in system_content
assert "moviepilot-cli" in system_content
assert "Read `" not in system_content
assert str(tmp_path) not in system_content