feat: introduce unified agent runtime config and system task prompt framework

- Add structured runtime config files (AGENT_PROFILE.md, AGENT_WORKFLOW.md, AGENT_HOOKS.md, USER_PREFERENCES.md, SYSTEM_TASKS.md, CURRENT_PERSONA.md) for persona, workflow, hooks, and system tasks
- Implement agent_runtime_manager to load, validate, and render runtime config and system task prompts
- Refactor agent initialization to use runtime-managed directories for skills, jobs, memory, and activity logs
- Add AgentHooksMiddleware for structured pre/in/post hooks injection
- Replace hardcoded system task prompts with template-driven rendering from SYSTEM_TASKS.md
- Update tests to cover runtime config loading, migration, and system task prompt rendering
- Update .gitignore to exclude config/agent/
This commit is contained in:
jxxghp
2026-04-28 13:04:28 +08:00
parent 483fe55372
commit c5b716c231
16 changed files with 1335 additions and 200 deletions

1
.gitignore vendored
View File

@@ -19,6 +19,7 @@ config/cookies/**
config/app.env
config/user.db*
config/sites/**
config/agent/
config/logs/
config/temp/
config/cache/

View File

@@ -21,12 +21,14 @@ from langgraph.checkpoint.memory import InMemorySaver
from app.agent.callback import StreamingHandler
from app.agent.memory import memory_manager
from app.agent.middleware.activity_log import ActivityLogMiddleware
from app.agent.middleware.hooks import AgentHooksMiddleware
from app.agent.middleware.jobs import JobsMiddleware
from app.agent.middleware.memory import MemoryMiddleware
from app.agent.middleware.patch_tool_calls import PatchToolCallsMiddleware
from app.agent.middleware.skills import SkillsMiddleware
from app.agent.middleware.usage import UsageMiddleware
from app.agent.prompt import prompt_manager
from app.agent.runtime import agent_runtime_manager
from app.agent.tools.factory import MoviePilotToolFactory
from app.chain import ChainBase
from app.core.config import settings
@@ -389,18 +391,20 @@ class MoviePilotAgent:
middlewares = [
# Skills
SkillsMiddleware(
sources=[str(settings.CONFIG_PATH / "agent" / "skills")],
sources=[str(agent_runtime_manager.skills_dir)],
bundled_skills_dir=str(settings.ROOT_PATH / "skills"),
),
# Jobs 任务管理
JobsMiddleware(
sources=[str(settings.CONFIG_PATH / "agent" / "jobs")],
sources=[str(agent_runtime_manager.jobs_dir)],
),
# 记忆管理(自动扫描 agent 目录下所有 .md 文件)
MemoryMiddleware(memory_dir=str(settings.CONFIG_PATH / "agent")),
# 结构化 hooks
AgentHooksMiddleware(),
# 记忆管理(仅扫描 memory 目录,避免与根层 persona/workflow 配置混写)
MemoryMiddleware(memory_dir=str(agent_runtime_manager.memory_dir)),
# 活动日志
ActivityLogMiddleware(
activity_dir=str(settings.CONFIG_PATH / "agent" / "activity"),
activity_dir=str(agent_runtime_manager.activity_dir),
),
# 用量统计
UsageMiddleware(on_usage=self._record_usage),
@@ -987,6 +991,69 @@ class AgentManager:
memory_manager.clear_memory(session_id, user_id)
logger.info(f"会话 {session_id} 的记忆已清空")
@staticmethod
def _build_heartbeat_prompt() -> str:
"""使用统一 wake 模板源构建心跳任务提示词。"""
runtime_config = agent_runtime_manager.load_runtime_config()
return runtime_config.render_system_task_message("heartbeat")
@staticmethod
def _build_retry_transfer_template_context(
history_ids: list[int],
) -> tuple[str, dict[str, int | str]]:
"""仅负责把失败重试任务的动态数据映射成模板变量。"""
is_batch = len(history_ids) > 1
task_type = (
"batch_transfer_failed_retry" if is_batch else "transfer_failed_retry"
)
template_context: dict[str, int | str] = {
"history_ids_csv": ", ".join(str(item) for item in history_ids),
"history_count": len(history_ids),
}
if not is_batch:
template_context["history_id"] = history_ids[0]
return task_type, template_context
@staticmethod
def _build_retry_transfer_prompt(
history_ids: list[int],
) -> str:
"""根据失败记录数量构建统一的重试整理后台任务提示词。"""
runtime_config = agent_runtime_manager.load_runtime_config()
task_type, template_context = AgentManager._build_retry_transfer_template_context(
history_ids
)
return runtime_config.render_system_task_message(
task_type,
template_context=template_context,
)
@staticmethod
def _build_manual_redo_template_context(history) -> dict[str, int | str]:
"""仅负责把整理历史对象映射成 SYSTEM_TASKS 需要的模板变量。"""
src_fileitem = history.src_fileitem or {}
source_path = src_fileitem.get("path") if isinstance(src_fileitem, dict) else ""
source_path = source_path or history.src or ""
season_episode = f"{history.seasons or ''}{history.episodes or ''}".strip()
# 这里故意只做数据整形,具体行为定义全部交给 SYSTEM_TASKS。
return {
"history_id": history.id,
"current_status": "success" if history.status else "failed",
"recognized_title": history.title or "unknown",
"media_type": history.type or "unknown",
"category": history.category or "unknown",
"year": history.year or "unknown",
"season_episode": season_episode or "unknown",
"source_path": source_path or "unknown",
"source_storage": history.src_storage or "local",
"destination_path": history.dest or "unknown",
"destination_storage": history.dest_storage or "unknown",
"transfer_mode": history.mode or "unknown",
"tmdbid": history.tmdbid or "none",
"doubanid": history.doubanid or "none",
"error_message": history.errmsg or "none",
}
async def heartbeat_check_jobs(self):
"""
心跳唤醒检查并执行待处理的定时任务Jobs
@@ -998,22 +1065,7 @@ class AgentManager:
user_id = SYSTEM_INTERNAL_USER_ID
logger.info("智能体心跳唤醒:开始检查待处理任务...")
# 英文提示词,便于大模型理解
heartbeat_message = (
"[System Heartbeat] Check all jobs in your jobs directory and process pending tasks:\n"
"1. List all jobs with status 'pending' or 'in_progress'\n"
"2. For 'recurring' jobs, check 'last_run' to determine if it's time to run again\n"
"3. For 'once' jobs with status 'pending', execute them now\n"
"4. After executing each job, update its status, 'last_run' time, and execution log in the JOB.md file\n"
"5. If there are no pending jobs, do NOT generate any response\n\n"
"IMPORTANT: This is a background system task, NOT a user conversation. "
"Your final response will be broadcast as a notification. "
"Only output a brief completion summary listing each executed job and its result. "
"Do NOT include greetings, explanations, or conversational text. "
"If no jobs were executed, output nothing. "
"Respond in Chinese (中文)."
)
heartbeat_message = self._build_heartbeat_prompt()
await self.process_message(
session_id=session_id,
@@ -1096,57 +1148,7 @@ class AgentManager:
logger.info(
f"智能体重试整理:开始批量处理失败记录 IDs=[{ids_str}] (group={group_key})"
)
if len(history_ids) == 1:
# 单条记录,使用原有逻辑
retry_message = (
f"[System Task - Transfer Failed Retry] A file transfer/organization has failed. "
f"Please use the 'transfer-failed-retry' skill to retry the failed transfer.\n\n"
f"Failed transfer history record ID: {history_ids[0]}\n\n"
f"Follow these steps:\n"
f"1. Use `query_transfer_history` with status='failed' to find the record with id={history_ids[0]} "
f"and understand the failure details (source path, error message, media info)\n"
f"2. Analyze the error message to determine the best retry strategy\n"
f"3. If the source file no longer exists, skip this retry and report that the file is missing\n"
f"4. Delete the failed history record using `delete_transfer_history` with history_id={history_ids[0]}\n"
f"5. Re-identify the media using `recognize_media` with the source file path\n"
f"6. If recognition fails, try `search_media` with keywords from the filename\n"
f"7. Re-transfer using `transfer_file` with the source path and any identified media info (tmdbid, media_type)\n"
f"8. Report the final result\n\n"
f"IMPORTANT: This is a background system task, NOT a user conversation. "
f"Your final response will be broadcast as a notification. "
f"Only output a brief result summary. "
f"Do NOT include greetings, explanations, or conversational text. "
f"Respond in Chinese (中文)."
)
else:
# 多条记录,使用批量处理逻辑
retry_message = (
f"[System Task - Batch Transfer Failed Retry] Multiple file transfers from the same source "
f"have failed. These files likely belong to the SAME media (e.g., multiple episodes of the same TV show). "
f"Please use the 'transfer-failed-retry' skill to retry them efficiently.\n\n"
f"Failed transfer history record IDs: {ids_str}\n"
f"Total failed records: {len(history_ids)}\n\n"
f"Follow these steps:\n"
f"1. Use `query_transfer_history` with status='failed' to find ALL records with these IDs "
f"and understand the failure details\n"
f"2. Since these files are likely from the same media, analyze the FIRST record to determine "
f"the media identity and the best retry strategy. The root cause is usually the same for all files.\n"
f"3. If the error is about media recognition (e.g., '未识别到媒体信息'), identify the media ONCE "
f"using `recognize_media` or `search_media`, then reuse that result (tmdbid, media_type) for all files\n"
f"4. For EACH failed record:\n"
f" a. Delete the failed history record using `delete_transfer_history`\n"
f" b. Re-transfer using `transfer_file` with the source path and the identified media info\n"
f"5. Report a summary of results (how many succeeded, how many failed)\n\n"
f"IMPORTANT OPTIMIZATION: These files share the same media identity. "
f"Do NOT call `recognize_media` or `search_media` repeatedly for each file. "
f"Identify the media ONCE, then apply to all files.\n\n"
f"IMPORTANT: This is a background system task, NOT a user conversation. "
f"Your final response will be broadcast as a notification. "
f"Only output a brief result summary. "
f"Do NOT include greetings, explanations, or conversational text. "
f"Respond in Chinese (中文)."
)
retry_message = self._build_retry_transfer_prompt(history_ids)
try:
await self.process_message(
@@ -1186,57 +1188,10 @@ class AgentManager:
"""
构建手动 AI 整理提示词。
"""
src_fileitem = history.src_fileitem or {}
source_path = src_fileitem.get("path") if isinstance(src_fileitem, dict) else ""
source_path = source_path or history.src or ""
season_episode = f"{history.seasons or ''}{history.episodes or ''}".strip()
return "\n".join(
[
"[System Task - Manual Transfer Re-Organize]",
"A user manually triggered an AI re-organize task from the transfer history page.",
"Your goal is to directly fix ONE transfer history record by using MoviePilot tools to analyze, clean up the old history entry if necessary, and organize the source file again.",
"",
"IMPORTANT:",
"1. This is NOT a normal conversation. It is a background execution task.",
"2. Do NOT rely on previous chat context. Work only from the record below.",
"3. You should complete the re-organize by directly using tools such as `query_transfer_history`, `recognize_media`, `search_media`, `delete_transfer_history`, and `transfer_file`.",
"4. Your final response must be a brief Chinese result summary only.",
"",
"Transfer history record:",
f"- History ID: {history.id}",
f"- Current status: {'success' if history.status else 'failed'}",
f"- Current recognized title: {history.title or 'unknown'}",
f"- Media type: {history.type or 'unknown'}",
f"- Category: {history.category or 'unknown'}",
f"- Year: {history.year or 'unknown'}",
f"- Season/Episode: {season_episode or 'unknown'}",
f"- Source path: {source_path or 'unknown'}",
f"- Source storage: {history.src_storage or 'local'}",
f"- Destination path: {history.dest or 'unknown'}",
f"- Destination storage: {history.dest_storage or 'unknown'}",
f"- Transfer mode: {history.mode or 'unknown'}",
f"- Current TMDB ID: {history.tmdbid or 'none'}",
f"- Current Douban ID: {history.doubanid or 'none'}",
f"- Error message: {history.errmsg or 'none'}",
"",
"Required workflow:",
f"1. Use `query_transfer_history` to locate and inspect the record with id={history.id}, and verify the source path, status, media info, and failure context.",
"2. Decide whether the current recognition is trustworthy.",
"3. If the source file no longer exists or cannot be safely processed, stop and report the reason.",
"4. If the current recognition is wrong or the record should be reorganized, determine the correct media identity first.",
"5. Prefer `recognize_media` with the source path. If recognition is not reliable, use `search_media` with keywords from filename/title/year.",
"6. Only continue when you have high confidence in the target media.",
"7. Before re-organizing, delete the old transfer history record with `delete_transfer_history` so the system will not skip the source file.",
"8. Then use `transfer_file` to organize the source path directly.",
"9. When calling `transfer_file`, reuse known context when appropriate: source storage, target path, target storage, transfer mode, season, tmdbid/doubanid, and media_type.",
"10. If this record is already correct and no re-organize is needed, do not perform destructive actions; simply report that no change is necessary.",
"",
"Important execution rules:",
"- Do NOT reorganize blindly when media identity is uncertain.",
"- If the previous record was successful but obviously identified as the wrong media, still use the tool-based flow above instead of `/redo`.",
"- Keep the final response short, in Chinese, and focused on outcome.",
]
runtime_config = agent_runtime_manager.load_runtime_config()
return runtime_config.render_system_task_message(
"manual_transfer_redo",
template_context=AgentManager._build_manual_redo_template_context(history),
)
async def manual_redo_transfer(

View File

@@ -0,0 +1,68 @@
"""结构化 Agent hooks 中间件。"""
from collections.abc import Awaitable, Callable
from typing import Annotated, NotRequired, TypedDict
from langchain.agents.middleware.types import (
AgentMiddleware,
AgentState,
ContextT,
ModelRequest,
ModelResponse,
PrivateStateAttr, # noqa
ResponseT,
)
from langchain_core.runnables import RunnableConfig
from langgraph.runtime import Runtime
from app.agent.middleware.utils import append_to_system_message
from app.agent.runtime import agent_runtime_manager
class HooksState(AgentState):
"""hooks 中间件状态。"""
hooks_prompt: NotRequired[Annotated[str, PrivateStateAttr]]
class HooksStateUpdate(TypedDict):
"""hooks 状态更新。"""
hooks_prompt: str
class AgentHooksMiddleware(AgentMiddleware[HooksState, ContextT, ResponseT]): # noqa
"""在固定生命周期点注入结构化 pre/in/post hooks。"""
state_schema = HooksState
async def abefore_agent( # noqa
self, state: HooksState, runtime: Runtime, config: RunnableConfig
) -> HooksStateUpdate | None:
if "hooks_prompt" in state:
return None
runtime_config = agent_runtime_manager.load_runtime_config()
return HooksStateUpdate(hooks_prompt=runtime_config.render_hooks_prompt())
def modify_request(self, request: ModelRequest[ContextT]) -> ModelRequest[ContextT]: # noqa
hooks_prompt = request.state.get("hooks_prompt", "") # noqa
if not hooks_prompt:
return request
new_system_message = append_to_system_message(
request.system_message, hooks_prompt
)
return request.override(system_message=new_system_message)
async def awrap_model_call(
self,
request: ModelRequest[ContextT],
handler: Callable[
[ModelRequest[ContextT]], Awaitable[ModelResponse[ResponseT]]
],
) -> ModelResponse[ResponseT]:
return await handler(self.modify_request(request))
__all__ = ["AgentHooksMiddleware"]

View File

@@ -188,7 +188,8 @@ class MemoryMiddleware(AgentMiddleware[MemoryState, ContextT, ResponseT]): # no
支持多文件记忆组织:用户可以创建多个 `.md` 文件来按主题组织知识。
参数:
memory_dir: 记忆文件目录路径。
memory_dir: 记忆文件目录路径。建议使用独立的 `config/agent/memory`
目录,避免与 persona/workflow 等根层配置混写。
"""
state_schema = MemoryState
@@ -201,7 +202,7 @@ class MemoryMiddleware(AgentMiddleware[MemoryState, ContextT, ResponseT]): # no
"""初始化记忆中间件。
参数:
memory_dir: 记忆文件目录路径(例如,`"/config/agent"`)。
memory_dir: 记忆文件目录路径(例如,`"/config/agent/memory"`)。
该目录下所有 `.md` 文件都会被自动加载为记忆。
"""
self.memory_dir = memory_dir

View File

@@ -1,73 +0,0 @@
You are an AI media assistant powered by MoviePilot. You specialize in managing home media ecosystems: searching for movies/TV shows, managing subscriptions, overseeing downloads, and organizing media libraries.
All your responses must be in **Chinese (中文)**.
You act as a proactive agent. Your goal is to fully resolve the user's media-related requests autonomously. Do not end your turn until the task is complete or you are blocked and require user feedback.
Core Capabilities:
1. Media Search & Recognition — Identify movies, TV shows, and anime; recognize media from fuzzy filenames or incomplete titles.
2. Subscription Management — Create rules for automated downloading; monitor trending content.
3. Download Control — Search torrents across trackers; filter by quality, codec, and release group.
4. System Status & Organization — Monitor downloads, server health, file transfers, renaming, and library cleanup.
5. Visual Input Handling — Users may attach images from supported channels; analyze them together with the text when relevant.
6. File Context Handling — User messages may arrive as structured JSON. Treat the `message` field as the user's text. Attachments appear in `files`; when `local_path` is present, use local file tools to inspect the uploaded file directly. When image input is disabled for the current model, user images may also be delivered through `files`.
<communication>
{verbose_spec}
- Tone: professional, concise, restrained.
- Be direct. NO unnecessary preamble, NO repeating user's words, NO explaining your thinking.
- Prioritize task progress over conversation. Answer only what is necessary to move the task forward.
- Do NOT flatter the user, praise the question, or use overly eager/service-oriented phrases.
- Do NOT use emojis, exclamation marks, cute language, or excessive apology.
- Prefer short declarative sentences. Default to one or two short paragraphs; use lists only when they improve scanability.
- Use Markdown for structured data. Use `inline code` for media titles/paths.
- Include key details (year, rating, resolution) but do NOT over-explain.
- Do not stop for approval on read-only operations. Only confirm before critical actions (starting downloads, deleting subscriptions).
- If the current channel supports image sending and an image would materially help, you may use the `send_message` tool with `image_url` to send it.
- If the current channel supports file sending and you need to return a local image/file for the user to download, use `send_local_file`.
{button_choice_spec}
- Voice replies: {voice_reply_spec}
- NOT a coding assistant. Do not offer code snippets.
- If user has set preferred communication style in memory, follow that strictly.
</communication>
<response_format>
- Responses MUST be short and punchy: one sentence for confirmations, brief list for search results.
- NO filler phrases like "Let me help you", "Here are the results", "I found..." — skip all unnecessary preamble.
- NO repeating what user said.
- NO narrating your internal reasoning.
- NO praise, emotional cushioning, or unnecessary politeness padding.
- After task completion: one line summary only.
- When error occurs: brief acknowledgment + suggestion, then move on.
</response_format>
<flow>
1. Media Discovery: Identify exact media metadata (TMDB ID, Season/Episode) using search tools.
2. Context Checking: Verify current status (already in library? already subscribed?).
3. Action Execution: Perform the task with a brief status update only if the operation takes time.
4. Final Confirmation: State the result concisely.
</flow>
<tool_calling_strategy>
- Call independent tools in parallel whenever possible.
- If search results are ambiguous, use `query_media_detail` or `recognize_media` to clarify before proceeding.
- If `search_media` fails, fall back to `search_web` or `recognize_media`. Only ask the user when all automated methods are exhausted.
</tool_calling_strategy>
<media_management_rules>
1. Download Safety: Present found torrents (size, seeds, quality) and get explicit consent before downloading.
2. Subscription Logic: Check for the best matching quality profile based on user history or defaults.
3. Library Awareness: Check if content already exists in the library to avoid duplicates.
4. Error Handling: If a tool or site fails, briefly explain what went wrong and suggest an alternative.
5. TV Subscription Rule: When calling `add_subscribe` for a TV show, omitting `season` means subscribe to season 1 only. To subscribe multiple seasons or the full series, call `add_subscribe` separately for each season.
</media_management_rules>
<markdown_spec>
Specific markdown rules:
{markdown_spec}
</markdown_spec>
<system_info>
{moviepilot_info}
</system_info>

View File

@@ -0,0 +1,37 @@
You are the MoviePilot agent runtime. Follow the injected root configuration to determine the active persona, workflow, and operator preferences.
All your responses must be in **Chinese (中文)**.
You act as a proactive agent. Your goal is to fully resolve the user's media-related requests autonomously. Do not end your turn until the task is complete or you are blocked and require user feedback.
<agent_runtime>
{runtime_sections}
</agent_runtime>
<communication_runtime>
{verbose_spec}
- Channel-aware formatting: Follow the capability rules below for Markdown, plain text, buttons, and voice replies.
{button_choice_spec}
- Voice replies: {voice_reply_spec}
- If the current channel supports image sending and an image would materially help, you may use the `send_message` tool with `image_url` to send it.
- If the current channel supports file sending and you need to return a local image or file for the user to download, use `send_local_file`.
</communication_runtime>
<core_capabilities>
1. Media Search and Recognition - Identify movies, TV shows, and anime; recognize media from fuzzy filenames or incomplete titles.
2. Subscription Management - Create rules for automated downloading and monitor trending content.
3. Download Control - Search torrents across trackers and filter by quality, codec, and release group.
4. System Status and Organization - Monitor downloads, server health, file transfers, renaming, and library cleanup.
5. Visual Input Handling - Users may attach images from supported channels; analyze them together with the text when relevant.
6. File Context Handling - User messages may arrive as structured JSON. Treat the `message` field as the user's text. Attachments appear in `files`; when `local_path` is present, use local file tools to inspect the uploaded file directly. When image input is disabled for the current model, user images may also be delivered through `files`.
</core_capabilities>
<markdown_spec>
Specific markdown rules:
{markdown_spec}
</markdown_spec>
<system_info>
{moviepilot_info}
</system_info>

View File

@@ -7,6 +7,7 @@ from typing import Dict
from app.core.config import settings
from app.log import logger
from app.agent.runtime import agent_runtime_manager
from app.schemas import (
ChannelCapability,
ChannelCapabilities,
@@ -59,8 +60,12 @@ class PromptManager:
:param prefer_voice_reply: 是否优先使用语音回复
:return: 提示词内容
"""
# 基础提示词
base_prompt = self.load_prompt("Agent Prompt.txt")
# 根层运行时配置由独立装配器负责,避免人格/工作流继续硬编码在单文件 prompt 中。
runtime_config = agent_runtime_manager.load_runtime_config()
runtime_sections = runtime_config.render_prompt_sections()
# 基础提示词只保留 MoviePilot 运行时和渠道能力相关约束。
base_prompt = self.load_prompt("System Core Prompt.txt")
# 识别渠道
markdown_spec = ""
@@ -104,6 +109,7 @@ class PromptManager:
moviepilot_info=moviepilot_info,
voice_reply_spec=voice_reply_spec,
button_choice_spec=button_choice_spec,
runtime_sections=runtime_sections,
)
return base_prompt

721
app/agent/runtime.py Normal file
View File

@@ -0,0 +1,721 @@
"""Agent 根层运行时配置管理。"""
from __future__ import annotations
import re
import shutil
import threading
from dataclasses import dataclass, field
from pathlib import Path
from string import Formatter
from typing import Any, Iterable, Optional
import yaml
from app.core.config import settings
from app.log import logger
CURRENT_PERSONA_FILE = "CURRENT_PERSONA.md"
USER_PREFERENCES_FILE = "USER_PREFERENCES.md"
SYSTEM_TASKS_FILE = "SYSTEM_TASKS.md"
LEGACY_WAKE_FORMAT_FILE = "WAKE_FORMAT.md"
SYSTEM_RUNTIME_DIR = "runtime"
MEMORY_DIR = "memory"
SKILLS_DIR = "skills"
JOBS_DIR = "jobs"
ACTIVITY_DIR = "activity"
SYSTEM_TASKS_SCHEMA_VERSION = 2
ROOT_LEVEL_RUNTIME_FILES = {
CURRENT_PERSONA_FILE,
"AGENT_PROFILE.md",
"AGENT_WORKFLOW.md",
"AGENT_HOOKS.md",
USER_PREFERENCES_FILE,
SYSTEM_TASKS_FILE,
LEGACY_WAKE_FORMAT_FILE,
}
FRONTMATTER_PATTERN = re.compile(r"^---\s*\n(.*?)\n---\s*\n?", re.DOTALL)
class AgentRuntimeConfigError(ValueError):
"""根层配置加载异常。"""
@dataclass
class ParsedMarkdownDocument:
"""解析后的 Markdown 文档。"""
metadata: dict[str, Any]
body: str
@dataclass
class HookDefinition:
"""结构化执行钩子定义。"""
path: Path
pre_task: list[str]
in_task: list[str]
post_task: list[str]
@dataclass
class SystemTaskTypeDefinition:
"""单个后台系统任务定义。"""
header: str
objective: str
context_title: Optional[str] = None
context_lines: list[str] = field(default_factory=list)
steps_title: Optional[str] = None
steps: list[str] = field(default_factory=list)
task_rules: list[str] = field(default_factory=list)
empty_result: Optional[str] = None
@dataclass
class SystemTasksDefinition:
"""统一的后台系统任务定义源。"""
path: Path
version: int
shared_rules: list[str]
task_types: dict[str, SystemTaskTypeDefinition]
@dataclass
class AgentRuntimeConfig:
"""一次加载后的根层配置快照。"""
source_root: Path
active_persona: str
current_persona_path: Path
profile_path: Path
workflow_path: Path
hooks_path: Path
user_preferences_path: Optional[Path]
system_tasks_path: Path
extra_context_paths: list[Path]
profile_text: str
workflow_text: str
user_preferences_text: str
extra_contexts: list[tuple[Path, str]]
hooks: HookDefinition
system_tasks: SystemTasksDefinition
warnings: list[str] = field(default_factory=list)
used_fallback: bool = False
def render_prompt_sections(self) -> str:
"""渲染进入系统提示词的根层配置片段。"""
sections: list[str] = [
"<agent_root_config>",
f"- Active persona: `{self.active_persona}`",
f"- Profile source: `{self.profile_path}`",
f"- Workflow source: `{self.workflow_path}`",
]
if self.user_preferences_path:
sections.append(f"- Root preferences source: `{self.user_preferences_path}`")
sections.append(f"- System task source: `{self.system_tasks_path}`")
sections.append("</agent_root_config>")
sections.append("")
sections.append("<agent_profile>")
sections.append(self.profile_text.strip() or "(No agent profile configured.)")
sections.append("</agent_profile>")
sections.append("")
sections.append("<agent_workflow>")
sections.append(self.workflow_text.strip() or "(No agent workflow configured.)")
sections.append("</agent_workflow>")
if self.user_preferences_text.strip():
sections.append("")
sections.append("<agent_user_preferences>")
sections.append(self.user_preferences_text.strip())
sections.append("</agent_user_preferences>")
for path, text in self.extra_contexts:
if not text.strip():
continue
sections.append("")
sections.append(f'<agent_extra_context source="{path.name}">')
sections.append(text.strip())
sections.append("</agent_extra_context>")
return "\n".join(sections).strip()
def render_hooks_prompt(self) -> str:
"""渲染结构化 hooks 提示词。"""
blocks = [
"<agent_execution_hooks>",
f"- Hook source: `{self.hooks.path}`",
"- These hooks are loaded structurally by the runtime and must be followed at the matching lifecycle stage.",
"",
"Pre-Task Hooks:",
self._format_hook_list(self.hooks.pre_task),
"",
"In-Task Hooks:",
self._format_hook_list(self.hooks.in_task),
"",
"Post-Task Hooks:",
self._format_hook_list(self.hooks.post_task),
"</agent_execution_hooks>",
]
return "\n".join(blocks)
def render_system_task_message(
self,
task_type: str,
*,
template_context: Optional[dict[str, Any]] = None,
extra_rules: Optional[list[str]] = None,
) -> str:
"""根据统一的后台系统任务定义渲染提示词。"""
task_definition = self.system_tasks.task_types.get(task_type)
if not task_definition:
raise AgentRuntimeConfigError(f"未定义的后台系统任务类型: {task_type}")
rendered_context = self._render_template_lines(
task_definition.context_lines,
template_context,
task_type,
"context_lines",
)
rendered_steps = self._render_template_lines(
task_definition.steps,
template_context,
task_type,
"steps",
)
rendered_task_rules = self._render_template_lines(
task_definition.task_rules,
template_context,
task_type,
"task_rules",
)
sections = [
self._render_template_text(
task_definition.header,
template_context,
task_type,
"header",
).strip(),
self._render_template_text(
task_definition.objective,
template_context,
task_type,
"objective",
).strip(),
]
if rendered_context:
sections.append(
self._format_titled_lines(
task_definition.context_title or "Task context",
rendered_context,
)
)
if rendered_steps:
sections.append(
self._format_titled_lines(
task_definition.steps_title or "Follow these steps",
rendered_steps,
)
)
rules = list(self.system_tasks.shared_rules)
if task_definition.empty_result:
rules.append(task_definition.empty_result)
rules.extend(rendered_task_rules)
if extra_rules:
rules.extend(rule.strip() for rule in extra_rules if rule and rule.strip())
if rules:
sections.append(self._format_numbered_rules("IMPORTANT", rules))
return "\n\n".join(section for section in sections if section).strip()
@classmethod
def _render_template_text(
cls,
text: str,
template_context: Optional[dict[str, Any]],
task_type: str,
field_name: str,
) -> str:
if not text:
return ""
formatter = Formatter()
required_fields = {
placeholder_name
for _, placeholder_name, _, _ in formatter.parse(text)
if placeholder_name
}
if not required_fields:
return text
context = cls._normalize_template_context(template_context)
missing_fields = sorted(field for field in required_fields if field not in context)
if missing_fields:
raise AgentRuntimeConfigError(
f"系统任务定义 `{task_type}` 的 `{field_name}` 缺少变量: "
+ ", ".join(f"`{field}`" for field in missing_fields)
)
# 这里统一做字符串替换,让模板文件成为后台任务文案的唯一行为来源。
return text.format_map(context)
@classmethod
def _render_template_lines(
cls,
items: list[str],
template_context: Optional[dict[str, Any]],
task_type: str,
field_name: str,
) -> list[str]:
return [
cls._render_template_text(
item,
template_context,
task_type,
f"{field_name}[{index}]",
).rstrip()
for index, item in enumerate(items, start=1)
if item and item.rstrip()
]
@staticmethod
def _normalize_template_context(
template_context: Optional[dict[str, Any]],
) -> dict[str, str]:
if not template_context:
return {}
return {
str(key): "" if value is None else str(value)
for key, value in template_context.items()
}
@staticmethod
def _format_hook_list(items: list[str]) -> str:
if not items:
return "(No hooks configured.)"
return "\n".join(f"{index}. {item}" for index, item in enumerate(items, start=1))
@staticmethod
def _format_numbered_rules(title: str, items: list[str]) -> str:
return "\n".join(
[f"{title}:"]
+ [f"{index}. {item}" for index, item in enumerate(items, start=1)]
)
@staticmethod
def _format_titled_lines(title: str, items: list[str]) -> str:
cleaned = [item.rstrip() for item in items if item and item.rstrip()]
return "\n".join([f"{title}:"] + cleaned)
class AgentRuntimeManager:
"""统一管理 agent 根层配置目录、迁移、校验与模板渲染。"""
def __init__(
self,
*,
agent_root_dir: Optional[Path] = None,
bundled_runtime_dir: Optional[Path] = None,
) -> None:
self.agent_root_dir = agent_root_dir or (settings.CONFIG_PATH / "agent")
self.runtime_dir = self.agent_root_dir / SYSTEM_RUNTIME_DIR
self.memory_dir = self.agent_root_dir / MEMORY_DIR
self.skills_dir = self.agent_root_dir / SKILLS_DIR
self.jobs_dir = self.agent_root_dir / JOBS_DIR
self.activity_dir = self.agent_root_dir / ACTIVITY_DIR
self.bundled_runtime_dir = bundled_runtime_dir or (
Path(__file__).parent / "runtime_defaults"
)
self._cache_lock = threading.Lock()
self._cached_signature: Optional[tuple[tuple[str, int, int], ...]] = None
self._cached_config: Optional[AgentRuntimeConfig] = None
def ensure_layout(self) -> None:
"""创建目录、同步默认文件,并迁移旧版 memory/runtime 文件。"""
self.agent_root_dir.mkdir(parents=True, exist_ok=True)
self.runtime_dir.mkdir(parents=True, exist_ok=True)
self.memory_dir.mkdir(parents=True, exist_ok=True)
self.skills_dir.mkdir(parents=True, exist_ok=True)
self.jobs_dir.mkdir(parents=True, exist_ok=True)
self.activity_dir.mkdir(parents=True, exist_ok=True)
self._migrate_root_runtime_files()
self._sync_bundled_runtime_defaults()
self._migrate_root_memory_files()
def load_runtime_config(self) -> AgentRuntimeConfig:
"""加载配置。用户目录损坏时自动回退到内置默认配置。"""
self.ensure_layout()
signature = self._build_signature()
with self._cache_lock:
if self._cached_signature == signature and self._cached_config:
return self._cached_config
try:
config = self._load_from_root(self.runtime_dir)
except AgentRuntimeConfigError as err:
logger.warning("Agent 根层配置无效,回退到内置默认配置: %s", err)
config = self._load_from_root(self.bundled_runtime_dir)
config.used_fallback = True
config.warnings.insert(
0, f"用户运行时配置加载失败,已回退到内置默认配置: {err}"
)
self._cached_signature = signature
self._cached_config = config
return config
def invalidate_cache(self) -> None:
"""供测试或手动刷新时清理缓存。"""
with self._cache_lock:
self._cached_signature = None
self._cached_config = None
def _build_signature(self) -> tuple[tuple[str, int, int], ...]:
"""基于运行时配置和内置默认配置生成文件签名。"""
entries: list[tuple[str, int, int]] = []
for prefix, root in (("runtime", self.runtime_dir), ("bundled", self.bundled_runtime_dir)):
if not root.exists():
continue
for path in sorted(root.rglob("*")):
if not path.is_file():
continue
stat = path.stat()
relative = path.relative_to(root).as_posix()
entries.append((f"{prefix}:{relative}", stat.st_mtime_ns, stat.st_size))
return tuple(entries)
def _sync_bundled_runtime_defaults(self) -> None:
"""仅复制缺失的默认根层配置,避免覆盖用户自定义。"""
if not self.bundled_runtime_dir.exists():
return
for path in sorted(self.bundled_runtime_dir.rglob("*")):
relative = path.relative_to(self.bundled_runtime_dir)
target = self.runtime_dir / relative
if path.is_dir():
target.mkdir(parents=True, exist_ok=True)
continue
if target.exists():
continue
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(path, target)
logger.info("已同步默认 Agent 运行时文件: %s", target)
def _migrate_root_runtime_files(self) -> None:
"""兼容早期直接放在 `config/agent` 根目录的 RFC 文件。"""
migration_targets = {
CURRENT_PERSONA_FILE: self.runtime_dir / CURRENT_PERSONA_FILE,
USER_PREFERENCES_FILE: self.runtime_dir / USER_PREFERENCES_FILE,
SYSTEM_TASKS_FILE: self.runtime_dir / "system_tasks" / SYSTEM_TASKS_FILE,
LEGACY_WAKE_FORMAT_FILE: self.runtime_dir / "system_tasks" / SYSTEM_TASKS_FILE,
"AGENT_PROFILE.md": self.runtime_dir / "personas" / "default" / "AGENT_PROFILE.md",
"AGENT_WORKFLOW.md": self.runtime_dir / "personas" / "default" / "AGENT_WORKFLOW.md",
"AGENT_HOOKS.md": self.runtime_dir / "personas" / "default" / "AGENT_HOOKS.md",
}
for filename, target in migration_targets.items():
source = self.agent_root_dir / filename
if not source.exists() or target.exists():
continue
target.parent.mkdir(parents=True, exist_ok=True)
source.rename(target)
logger.info("已迁移旧版 Agent 根配置文件: %s -> %s", source, target)
def _migrate_root_memory_files(self) -> None:
"""将旧版根目录 memory 文件移入 `config/agent/memory`。"""
for path in sorted(self.agent_root_dir.glob("*.md")):
if path.name in ROOT_LEVEL_RUNTIME_FILES:
continue
target = self.memory_dir / path.name
if target.exists():
continue
path.rename(target)
logger.info("已迁移旧版 Agent memory 文件: %s -> %s", path, target)
def _load_from_root(self, root: Path) -> AgentRuntimeConfig:
current_persona_path = root / CURRENT_PERSONA_FILE
current_doc = self._read_markdown(current_persona_path)
current_meta = current_doc.metadata
active_persona = str(current_meta.get("active_persona") or "default").strip()
if not active_persona:
raise AgentRuntimeConfigError("CURRENT_PERSONA.md 缺少 active_persona")
profile_path = self._resolve_required_path(root, current_meta, "profile")
workflow_path = self._resolve_required_path(root, current_meta, "workflow")
hooks_path = self._resolve_required_path(root, current_meta, "hooks")
system_tasks_path = self._resolve_required_path(root, current_meta, "system_tasks")
user_preferences_path = self._resolve_optional_path(
root, current_meta.get("user_preferences")
)
extra_context_paths = self._resolve_optional_paths(
root, current_meta.get("extra_context_files", [])
)
profile_doc = self._read_markdown(profile_path)
workflow_doc = self._read_markdown(workflow_path)
hooks_doc = self._read_markdown(hooks_path)
system_tasks_doc = self._read_markdown(system_tasks_path)
preferences_doc = (
self._read_markdown(user_preferences_path)
if user_preferences_path and user_preferences_path.exists()
else ParsedMarkdownDocument(metadata={}, body="")
)
extra_contexts = [
(path, self._read_markdown(path).body)
for path in extra_context_paths
]
hooks = self._parse_hooks_document(hooks_path, hooks_doc)
system_tasks = self._parse_system_tasks_document(
system_tasks_path,
system_tasks_doc,
)
warnings = self._validate_runtime_config(
current_meta=current_meta,
profile_path=profile_path,
workflow_path=workflow_path,
hooks_path=hooks_path,
user_preferences_path=user_preferences_path,
system_tasks_path=system_tasks_path,
extra_context_paths=extra_context_paths,
profile_text=profile_doc.body,
workflow_text=workflow_doc.body,
preferences_text=preferences_doc.body,
)
return AgentRuntimeConfig(
source_root=root,
active_persona=active_persona,
current_persona_path=current_persona_path,
profile_path=profile_path,
workflow_path=workflow_path,
hooks_path=hooks_path,
user_preferences_path=user_preferences_path,
system_tasks_path=system_tasks_path,
extra_context_paths=extra_context_paths,
profile_text=profile_doc.body,
workflow_text=workflow_doc.body,
user_preferences_text=preferences_doc.body,
extra_contexts=extra_contexts,
hooks=hooks,
system_tasks=system_tasks,
warnings=warnings,
)
@staticmethod
def _read_markdown(path: Path) -> ParsedMarkdownDocument:
if not path.exists():
raise AgentRuntimeConfigError(f"缺少配置文件: {path}")
try:
content = path.read_text(encoding="utf-8")
except Exception as err: # noqa: BLE001
raise AgentRuntimeConfigError(f"读取配置文件失败 {path}: {err}") from err
metadata: dict[str, Any] = {}
body = content
match = FRONTMATTER_PATTERN.match(content)
if match:
try:
metadata = yaml.safe_load(match.group(1)) or {}
except yaml.YAMLError as err:
raise AgentRuntimeConfigError(f"YAML frontmatter 解析失败 {path}: {err}") from err
if not isinstance(metadata, dict):
raise AgentRuntimeConfigError(f"frontmatter 必须是映射类型: {path}")
body = content[match.end():]
return ParsedMarkdownDocument(metadata=metadata, body=body.strip())
@staticmethod
def _resolve_required_path(root: Path, metadata: dict[str, Any], field_name: str) -> Path:
raw = metadata.get(field_name)
if not raw or not str(raw).strip():
raise AgentRuntimeConfigError(f"CURRENT_PERSONA.md 缺少必填字段 `{field_name}`")
return AgentRuntimeManager._resolve_relative_path(root, str(raw))
@staticmethod
def _resolve_optional_path(root: Path, raw: Any) -> Optional[Path]:
if not raw or not str(raw).strip():
return None
return AgentRuntimeManager._resolve_relative_path(root, str(raw))
@staticmethod
def _resolve_optional_paths(root: Path, values: Any) -> list[Path]:
if not values:
return []
if not isinstance(values, list):
raise AgentRuntimeConfigError("extra_context_files 必须是数组")
return [AgentRuntimeManager._resolve_relative_path(root, str(value)) for value in values]
@staticmethod
def _resolve_relative_path(root: Path, value: str) -> Path:
candidate = Path(value)
return candidate if candidate.is_absolute() else (root / candidate).resolve()
@staticmethod
def _normalize_string_list(values: Any, field_name: str) -> list[str]:
if values is None:
return []
if not isinstance(values, list):
raise AgentRuntimeConfigError(f"{field_name} 必须是字符串数组")
normalized: list[str] = []
for value in values:
text = str(value).strip()
if text:
normalized.append(text)
return normalized
def _parse_hooks_document(
self, path: Path, document: ParsedMarkdownDocument
) -> HookDefinition:
pre_task = self._normalize_string_list(document.metadata.get("pre_task"), "pre_task")
in_task = self._normalize_string_list(document.metadata.get("in_task"), "in_task")
post_task = self._normalize_string_list(
document.metadata.get("post_task"), "post_task"
)
if not (pre_task or in_task or post_task):
raise AgentRuntimeConfigError(f"{path} 未定义任何结构化 hooks")
return HookDefinition(
path=path,
pre_task=pre_task,
in_task=in_task,
post_task=post_task,
)
def _parse_system_tasks_document(
self, path: Path, document: ParsedMarkdownDocument
) -> SystemTasksDefinition:
"""解析后台系统任务定义文件。"""
version = self._normalize_positive_int(
document.metadata.get("version"),
"version",
default=1,
)
if version < SYSTEM_TASKS_SCHEMA_VERSION:
raise AgentRuntimeConfigError(
f"{path} 的 version={version} 过旧,"
f"当前要求 SYSTEM_TASKS schema v{SYSTEM_TASKS_SCHEMA_VERSION} 或更高版本"
)
shared_rules = self._normalize_string_list(
document.metadata.get("shared_rules"), "shared_rules"
)
if not shared_rules:
raise AgentRuntimeConfigError(f"{path} 缺少 shared_rules")
raw_task_types = document.metadata.get("task_types")
if not isinstance(raw_task_types, dict) or not raw_task_types:
raise AgentRuntimeConfigError(f"{path} 缺少 task_types 映射")
task_types: dict[str, SystemTaskTypeDefinition] = {}
for key, raw in raw_task_types.items():
if not isinstance(raw, dict):
raise AgentRuntimeConfigError(f"task_types.{key} 必须是映射")
header = str(raw.get("header") or "").strip()
objective = str(raw.get("objective") or "").strip()
if not header or not objective:
raise AgentRuntimeConfigError(
f"task_types.{key} 缺少 header 或 objective"
)
context_lines = self._normalize_string_list(
raw.get("context_lines"),
f"task_types.{key}.context_lines",
)
steps = self._normalize_string_list(
raw.get("steps"),
f"task_types.{key}.steps",
)
task_rules = self._normalize_string_list(
raw.get("task_rules"),
f"task_types.{key}.task_rules",
)
empty_result = str(raw.get("empty_result") or "").strip() or None
context_title = str(raw.get("context_title") or "").strip() or None
steps_title = str(raw.get("steps_title") or "").strip() or None
task_types[str(key)] = SystemTaskTypeDefinition(
header=header,
objective=objective,
context_title=context_title,
context_lines=context_lines,
steps_title=steps_title,
steps=steps,
task_rules=task_rules,
empty_result=empty_result,
)
return SystemTasksDefinition(
path=path,
version=version,
shared_rules=shared_rules,
task_types=task_types,
)
@staticmethod
def _normalize_positive_int(
value: Any,
field_name: str,
*,
default: int,
) -> int:
if value in (None, ""):
return default
try:
normalized = int(value)
except (TypeError, ValueError) as err:
raise AgentRuntimeConfigError(f"{field_name} 必须是正整数") from err
if normalized <= 0:
raise AgentRuntimeConfigError(f"{field_name} 必须是正整数")
return normalized
def _validate_runtime_config(
self,
*,
current_meta: dict[str, Any],
profile_path: Path,
workflow_path: Path,
hooks_path: Path,
user_preferences_path: Optional[Path],
system_tasks_path: Path,
extra_context_paths: list[Path],
profile_text: str,
workflow_text: str,
preferences_text: str,
) -> list[str]:
warnings: list[str] = []
required_paths = [profile_path, workflow_path, hooks_path, system_tasks_path]
if user_preferences_path:
required_paths.append(user_preferences_path)
duplicates = self._find_duplicate_paths(required_paths + extra_context_paths)
if duplicates:
warnings.append(
"检测到重复引用的根层配置文件: "
+ ", ".join(path.as_posix() for path in duplicates)
)
deprecated_phrases = self._normalize_string_list(
current_meta.get("deprecated_phrases"), "deprecated_phrases"
)
if deprecated_phrases:
scan_targets = {
"profile": profile_text,
"workflow": workflow_text,
"user_preferences": preferences_text,
}
for phrase in deprecated_phrases:
for target_name, text in scan_targets.items():
if phrase and phrase in text:
warnings.append(
f"检测到已废弃短语 `{phrase}` 仍出现在 {target_name}"
)
return warnings
@staticmethod
def _find_duplicate_paths(paths: Iterable[Path]) -> list[Path]:
seen: set[Path] = set()
duplicates: list[Path] = []
for path in paths:
resolved = path.resolve()
if resolved in seen and resolved not in duplicates:
duplicates.append(resolved)
seen.add(resolved)
return duplicates
agent_runtime_manager = AgentRuntimeManager()

View File

@@ -0,0 +1,24 @@
---
version: 1
active_persona: default
profile: personas/default/AGENT_PROFILE.md
workflow: personas/default/AGENT_WORKFLOW.md
hooks: personas/default/AGENT_HOOKS.md
user_preferences: USER_PREFERENCES.md
system_tasks: system_tasks/SYSTEM_TASKS.md
extra_context_files: []
deprecated_phrases: []
---
# CURRENT_PERSONA
当前激活人格:`default`
加载顺序固定如下:
1. `AGENT_PROFILE.md`
2. `AGENT_WORKFLOW.md`
3. `AGENT_HOOKS.md`
4. `USER_PREFERENCES.md`
5. `SYSTEM_TASKS.md`
如果需要扩展额外上下文,请使用 `extra_context_files` 显式声明,而不是把额外规则散落到 memory 中。

View File

@@ -0,0 +1,10 @@
---
version: 1
---
# USER_PREFERENCES
这是根层的运维偏好文件,不是用户长期记忆。
- 这里只放稳定的系统级输出规则或部署方偏好。
- 用户在对话中形成的长期习惯,仍应写入 `config/agent/memory/*.md`
- 默认保持精简,避免与 `AGENT_PROFILE.md``AGENT_WORKFLOW.md` 重复。

View File

@@ -0,0 +1,26 @@
---
version: 1
pre_task:
- Identify whether the request is a normal user conversation or a background system task before choosing a workflow.
- Classify intent before acting, then prefer an existing skill or dedicated workflow over ad-hoc prompting.
- Check read-only context first so the final action is based on current library, subscription, or history state.
- Only stop for confirmation when the next action is destructive, high-impact, or user-facing.
- Keep the final delivery target explicit before calling tools.
in_task:
- Execute in small, outcome-oriented steps and prefer tool calls over long explanations when the task is actionable.
- Reuse known media identity, prior tool results, and shared context instead of repeating expensive recognition or search calls.
- When a tool fails, try one narrower fallback path before escalating to the user.
- Keep intermediate user-facing output minimal; when verbose mode is disabled, stay silent until the final result.
- Treat progress reporting as task-specific glue, not a shared abstraction to leak into every tool.
post_task:
- Perform the minimum validation needed to confirm the result actually landed.
- Summarize only the outcome, key media facts, and the remaining blocker if something still failed.
- If the task established a reusable workflow, prefer encoding it in skills or root config instead of relying on prompt residue.
---
# AGENT_HOOKS
这些 hooks 由运行时结构化加载,不依赖自由文本约定。
- `pre_task` 对应开始执行前的统一检查点。
- `in_task` 对应工具调用和失败降级阶段。
- `post_task` 对应最小验证与收口阶段。

View File

@@ -0,0 +1,27 @@
---
version: 1
---
# AGENT_PROFILE
- Identity: You are an AI media assistant powered by MoviePilot. You specialize in managing home media ecosystems: searching for movies and TV shows, managing subscriptions, overseeing downloads, and organizing media libraries.
- Tone: professional, concise, restrained.
- Be direct. NO unnecessary preamble, NO repeating user's words, NO explaining your thinking.
- Prioritize task progress over conversation. Answer only what is necessary to move the task forward.
- Do NOT flatter the user, praise the question, or use overly eager service phrases.
- Do NOT use emojis, exclamation marks, cute language, or excessive apology.
- Prefer short declarative sentences. Default to one or two short paragraphs; use lists only when they improve scanability.
- Use Markdown for structured data. Use `inline code` for media titles and paths.
- Include key details such as year, rating, and resolution, but do NOT over-explain.
- Do not stop for approval on read-only operations. Only confirm before critical actions such as starting downloads or deleting subscriptions.
- NOT a coding assistant. Do not offer code snippets.
- If user has set preferred communication style in memory, follow that strictly.
# RESPONSE_FORMAT
- Responses MUST be short and punchy: one sentence for confirmations, brief list for search results.
- NO filler phrases like "Let me help you", "Here are the results", "I found..." - skip all unnecessary preamble.
- NO repeating what user said.
- NO narrating your internal reasoning.
- NO praise, emotional cushioning, or unnecessary politeness padding.
- After task completion: one line summary only.
- When error occurs: brief acknowledgment plus suggestion, then move on.

View File

@@ -0,0 +1,25 @@
---
version: 1
---
# AGENT_WORKFLOW
## FLOW
1. Media Discovery: Identify exact media metadata such as TMDB ID and Season or Episode using search tools.
2. Context Checking: Verify current status such as whether the media is already in the library or already subscribed.
3. Action Execution: Perform the task with a brief status update only if the operation takes time.
4. Final Confirmation: State the result concisely.
## TOOL_CALLING_STRATEGY
- Call independent tools in parallel whenever possible.
- If search results are ambiguous, use `query_media_detail` or `recognize_media` to clarify before proceeding.
- If `search_media` fails, fall back to `search_web` or `recognize_media`. Only ask the user when all automated methods are exhausted.
## MEDIA_MANAGEMENT_RULES
1. Download Safety: Present found torrents with size, seeds, and quality, then get explicit consent before downloading.
2. Subscription Logic: Check for the best matching quality profile based on user history or defaults.
3. Library Awareness: Check if content already exists in the library to avoid duplicates.
4. Error Handling: If a tool or site fails, briefly explain what went wrong and suggest an alternative.
5. TV Subscription Rule: When calling `add_subscribe` for a TV show, omitting `season` means subscribe to season 1 only. To subscribe multiple seasons or the full series, call `add_subscribe` separately for each season.

View File

@@ -0,0 +1,108 @@
---
version: 2
shared_rules:
- This is a background system task, NOT a user conversation.
- Your final response will be broadcast as a notification.
- Do NOT include greetings, explanations, or conversational text.
- Respond in Chinese (中文).
task_types:
heartbeat:
header: "[System Heartbeat]"
objective: "Check all jobs in your jobs directory and process pending tasks."
steps_title: "Follow these steps"
steps:
- "List all jobs with status 'pending' or 'in_progress'."
- "For 'recurring' jobs, check 'last_run' to determine if it's time to run again."
- "For 'once' jobs with status 'pending', execute them now."
- "After executing each job, update its status, 'last_run' time, and execution log in the JOB.md file."
empty_result: "If no jobs were executed, output nothing."
health_check:
header: "[System Health Check]"
objective: "Verify that the agent execution pipeline is alive."
steps_title: "Follow these steps"
steps:
- "Verify that runtime config, tools, and jobs can all be accessed normally."
- "If a real issue is detected, report the failing subsystem and the immediate blocking reason."
empty_result: "If there is nothing meaningful to report, output OK only."
transfer_failed_retry:
header: "[System Task - Transfer Failed Retry]"
objective: "A file transfer or organization has failed. Please use the `transfer-failed-retry` skill to retry the failed transfer."
context_title: "Task context"
context_lines:
- "Failed transfer history record IDs: {history_ids_csv}"
- "Total failed records: {history_count}"
steps_title: "Follow these steps"
steps:
- "Use `query_transfer_history` with status='failed' to find the record with id={history_id} and understand the failure details such as source path, error message, and media info."
- "Analyze the error message to determine the best retry strategy."
- "If the source file no longer exists, skip this retry and report that the file is missing."
- "Delete the failed history record using `delete_transfer_history` with history_id={history_id}."
- "Re-identify the media using `recognize_media` with the source file path."
- "If recognition fails, try `search_media` with keywords from the filename."
- "Re-transfer using `transfer_file` with the source path and any identified media info such as tmdbid and media_type."
- "Report the final result."
batch_transfer_failed_retry:
header: "[System Task - Batch Transfer Failed Retry]"
objective: "Multiple file transfers from the same source have failed. These files likely belong to the same media. Please use the `transfer-failed-retry` skill to retry them efficiently."
context_title: "Task context"
context_lines:
- "Failed transfer history record IDs: {history_ids_csv}"
- "Total failed records: {history_count}"
steps_title: "Follow these steps"
steps:
- "Use `query_transfer_history` with status='failed' to find all records with these IDs and understand the failure details."
- "Analyze the first record to determine the shared media identity and the best retry strategy because the root cause is usually the same for all files."
- "If the error is about media recognition, identify the media once using `recognize_media` or `search_media`, then reuse that result for all files."
- "For each failed record, delete the old history entry with `delete_transfer_history` and re-transfer using `transfer_file`."
- "Report how many retries succeeded and how many still failed."
task_rules:
- "These files share the same media identity. Do NOT call `recognize_media` or `search_media` repeatedly for each file."
manual_transfer_redo:
header: "[System Task - Manual Transfer Re-Organize]"
objective: "A user manually triggered an AI re-organize task from the transfer history page."
context_title: "Transfer history record"
context_lines:
- "- History ID: {history_id}"
- "- Current status: {current_status}"
- "- Current recognized title: {recognized_title}"
- "- Media type: {media_type}"
- "- Category: {category}"
- "- Year: {year}"
- "- Season/Episode: {season_episode}"
- "- Source path: {source_path}"
- "- Source storage: {source_storage}"
- "- Destination path: {destination_path}"
- "- Destination storage: {destination_storage}"
- "- Transfer mode: {transfer_mode}"
- "- Current TMDB ID: {tmdbid}"
- "- Current Douban ID: {doubanid}"
- "- Error message: {error_message}"
steps_title: "Required workflow"
steps:
- "Use `query_transfer_history` to locate and inspect the record with id={history_id}, and verify the source path, status, media info, and failure context."
- "Decide whether the current recognition is trustworthy."
- "If the source file no longer exists or cannot be safely processed, stop and report the reason."
- "If the current recognition is wrong or the record should be reorganized, determine the correct media identity first."
- "Prefer `recognize_media` with the source path. If recognition is not reliable, use `search_media` with keywords from filename, title, or year."
- "Only continue when you have high confidence in the target media."
- "Before re-organizing, delete the old transfer history record with `delete_transfer_history` so the system will not skip the source file."
- "Then use `transfer_file` to organize the source path directly."
- "When calling `transfer_file`, reuse known context when appropriate: source storage, target path, target storage, transfer mode, season, tmdbid or doubanid, and media_type."
- "If this record is already correct and no re-organize is needed, do not perform destructive actions; simply report that no change is necessary."
task_rules:
- "Do NOT rely on previous chat context. Work only from the record above."
- "Your goal is to directly fix one transfer history record by using MoviePilot tools to analyze, clean up the old history entry if necessary, and organize the source file again."
- "You should complete the re-organize by directly using tools such as `query_transfer_history`, `recognize_media`, `search_media`, `delete_transfer_history`, and `transfer_file`."
- "Do NOT reorganize blindly when media identity is uncertain."
- "If the previous record was successful but obviously identified as the wrong media, still use the tool-based flow above instead of `/redo`."
- "Keep the final response short and focused on outcome."
---
# SYSTEM_TASKS
这是后台系统任务的唯一定义源。
- `shared_rules` 负责统一口径。
- `task_types.<type>.context_lines` 负责定义上下文字段展示。
- `task_types.<type>.steps` 负责定义任务执行步骤。
- `task_types.<type>.task_rules` 负责定义该任务独有的补充约束。
- 代码侧只负责触发任务并提供模板变量,不再保存具体行为提示词。

View File

@@ -26,6 +26,13 @@ class TestAgentPromptStyle(unittest.TestCase):
prompt,
)
def test_prompt_uses_root_runtime_sections(self):
prompt = prompt_manager.get_agent_prompt()
self.assertIn("<agent_profile>", prompt)
self.assertIn("<agent_workflow>", prompt)
self.assertIn("Active persona: `default`", prompt)
def test_non_verbose_prompt_requires_silence_until_all_tools_finish(self):
with patch.object(settings, "AI_AGENT_VERBOSE", False):
prompt = prompt_manager.get_agent_prompt()

192
tests/test_agent_runtime.py Normal file
View File

@@ -0,0 +1,192 @@
import shutil
import tempfile
import textwrap
import unittest
from pathlib import Path
from app.agent.runtime import AgentRuntimeConfigError, AgentRuntimeManager
class TestAgentRuntimeConfig(unittest.TestCase):
def setUp(self):
self._tempdir = tempfile.TemporaryDirectory()
self.addCleanup(self._tempdir.cleanup)
self.temp_root = Path(self._tempdir.name)
self.agent_root = self.temp_root / "agent"
self.bundled_root = (
Path(__file__).resolve().parents[1] / "app" / "agent" / "runtime_defaults"
)
def _manager(self) -> AgentRuntimeManager:
return AgentRuntimeManager(
agent_root_dir=self.agent_root,
bundled_runtime_dir=self.bundled_root,
)
def test_load_runtime_config_syncs_defaults_and_parses_sections(self):
manager = self._manager()
runtime_config = manager.load_runtime_config()
self.assertEqual(runtime_config.active_persona, "default")
self.assertIn("professional, concise, restrained", runtime_config.profile_text)
self.assertIn(
"omitting `season` means subscribe to season 1 only",
runtime_config.workflow_text,
)
self.assertTrue((self.agent_root / "runtime" / "CURRENT_PERSONA.md").exists())
def test_legacy_root_markdown_is_migrated_to_memory_directory(self):
self.agent_root.mkdir(parents=True, exist_ok=True)
legacy_memory = self.agent_root / "MEMORY.md"
legacy_memory.write_text("# Legacy Memory\n", encoding="utf-8")
legacy_persona = self.agent_root / "CURRENT_PERSONA.md"
legacy_persona.write_text(
textwrap.dedent(
"""\
---
active_persona: default
profile: personas/default/AGENT_PROFILE.md
workflow: personas/default/AGENT_WORKFLOW.md
hooks: personas/default/AGENT_HOOKS.md
system_tasks: system_tasks/SYSTEM_TASKS.md
user_preferences: USER_PREFERENCES.md
---
"""
),
encoding="utf-8",
)
manager = self._manager()
manager.ensure_layout()
self.assertFalse(legacy_memory.exists())
self.assertTrue((self.agent_root / "memory" / "MEMORY.md").exists())
self.assertFalse(legacy_persona.exists())
self.assertTrue((self.agent_root / "runtime" / "CURRENT_PERSONA.md").exists())
def test_render_system_task_message_uses_unified_system_tasks_definition(self):
manager = self._manager()
runtime_config = manager.load_runtime_config()
message = runtime_config.render_system_task_message("heartbeat")
self.assertIn("[System Heartbeat]", message)
self.assertIn("List all jobs with status 'pending' or 'in_progress'.", message)
self.assertIn("Do NOT include greetings, explanations, or conversational text.", message)
self.assertIn("If no jobs were executed, output nothing.", message)
def test_render_system_task_message_renders_template_context(self):
manager = self._manager()
runtime_config = manager.load_runtime_config()
message = runtime_config.render_system_task_message(
"transfer_failed_retry",
template_context={
"history_ids_csv": "7",
"history_count": 1,
"history_id": 7,
},
)
self.assertIn("Failed transfer history record IDs: 7", message)
self.assertIn("Total failed records: 1", message)
self.assertIn("history_id=7", message)
def test_missing_template_context_raises_clear_error(self):
manager = self._manager()
runtime_config = manager.load_runtime_config()
with self.assertRaises(AgentRuntimeConfigError):
runtime_config.render_system_task_message("transfer_failed_retry")
def test_invalid_user_runtime_config_falls_back_to_bundled_defaults(self):
manager = self._manager()
manager.ensure_layout()
invalid_current = self.agent_root / "runtime" / "CURRENT_PERSONA.md"
invalid_current.write_text(
textwrap.dedent(
"""\
---
active_persona: broken
profile: personas/default/AGENT_PROFILE.md
hooks: personas/default/AGENT_HOOKS.md
system_tasks: system_tasks/SYSTEM_TASKS.md
---
"""
),
encoding="utf-8",
)
manager.invalidate_cache()
runtime_config = manager.load_runtime_config()
self.assertTrue(runtime_config.used_fallback)
self.assertEqual(runtime_config.active_persona, "default")
self.assertIn("已回退到内置默认配置", runtime_config.warnings[0])
def test_deprecated_phrase_warning_is_reported(self):
self.agent_root.mkdir(parents=True, exist_ok=True)
runtime_root = self.agent_root / "runtime"
shutil.copytree(self.bundled_root, runtime_root)
current_persona = runtime_root / "CURRENT_PERSONA.md"
current_persona.write_text(
textwrap.dedent(
"""\
---
version: 1
active_persona: default
profile: personas/default/AGENT_PROFILE.md
workflow: personas/default/AGENT_WORKFLOW.md
hooks: personas/default/AGENT_HOOKS.md
user_preferences: USER_PREFERENCES.md
system_tasks: system_tasks/SYSTEM_TASKS.md
extra_context_files: []
deprecated_phrases:
- professional, concise, restrained
---
"""
),
encoding="utf-8",
)
manager = self._manager()
manager.invalidate_cache()
runtime_config = manager.load_runtime_config()
self.assertTrue(
any("professional, concise, restrained" in warning for warning in runtime_config.warnings)
)
def test_outdated_system_tasks_definition_falls_back_to_bundled_defaults(self):
self.agent_root.mkdir(parents=True, exist_ok=True)
runtime_root = self.agent_root / "runtime"
shutil.copytree(self.bundled_root, runtime_root)
system_tasks = runtime_root / "system_tasks" / "SYSTEM_TASKS.md"
system_tasks.write_text(
textwrap.dedent(
"""\
---
version: 1
shared_rules:
- legacy system tasks
task_types:
heartbeat:
header: "[Legacy Heartbeat]"
objective: "legacy"
---
"""
),
encoding="utf-8",
)
manager = self._manager()
manager.invalidate_cache()
runtime_config = manager.load_runtime_config()
self.assertTrue(runtime_config.used_fallback)
self.assertEqual(runtime_config.system_tasks.version, 2)
if __name__ == "__main__":
unittest.main()