diff --git a/.gitignore b/.gitignore
index 77746a82..045a2f1c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,6 +19,7 @@ config/cookies/**
config/app.env
config/user.db*
config/sites/**
+config/agent/
config/logs/
config/temp/
config/cache/
diff --git a/app/agent/__init__.py b/app/agent/__init__.py
index d7efdb5f..ee5f7c73 100644
--- a/app/agent/__init__.py
+++ b/app/agent/__init__.py
@@ -21,12 +21,14 @@ from langgraph.checkpoint.memory import InMemorySaver
from app.agent.callback import StreamingHandler
from app.agent.memory import memory_manager
from app.agent.middleware.activity_log import ActivityLogMiddleware
+from app.agent.middleware.hooks import AgentHooksMiddleware
from app.agent.middleware.jobs import JobsMiddleware
from app.agent.middleware.memory import MemoryMiddleware
from app.agent.middleware.patch_tool_calls import PatchToolCallsMiddleware
from app.agent.middleware.skills import SkillsMiddleware
from app.agent.middleware.usage import UsageMiddleware
from app.agent.prompt import prompt_manager
+from app.agent.runtime import agent_runtime_manager
from app.agent.tools.factory import MoviePilotToolFactory
from app.chain import ChainBase
from app.core.config import settings
@@ -389,18 +391,20 @@ class MoviePilotAgent:
middlewares = [
# Skills
SkillsMiddleware(
- sources=[str(settings.CONFIG_PATH / "agent" / "skills")],
+ sources=[str(agent_runtime_manager.skills_dir)],
bundled_skills_dir=str(settings.ROOT_PATH / "skills"),
),
# Jobs 任务管理
JobsMiddleware(
- sources=[str(settings.CONFIG_PATH / "agent" / "jobs")],
+ sources=[str(agent_runtime_manager.jobs_dir)],
),
- # 记忆管理(自动扫描 agent 目录下所有 .md 文件)
- MemoryMiddleware(memory_dir=str(settings.CONFIG_PATH / "agent")),
+ # 结构化 hooks
+ AgentHooksMiddleware(),
+ # 记忆管理(仅扫描 memory 目录,避免与根层 persona/workflow 配置混写)
+ MemoryMiddleware(memory_dir=str(agent_runtime_manager.memory_dir)),
# 活动日志
ActivityLogMiddleware(
- activity_dir=str(settings.CONFIG_PATH / "agent" / "activity"),
+ activity_dir=str(agent_runtime_manager.activity_dir),
),
# 用量统计
UsageMiddleware(on_usage=self._record_usage),
@@ -987,6 +991,69 @@ class AgentManager:
memory_manager.clear_memory(session_id, user_id)
logger.info(f"会话 {session_id} 的记忆已清空")
+ @staticmethod
+ def _build_heartbeat_prompt() -> str:
+ """使用统一 wake 模板源构建心跳任务提示词。"""
+ runtime_config = agent_runtime_manager.load_runtime_config()
+ return runtime_config.render_system_task_message("heartbeat")
+
+ @staticmethod
+ def _build_retry_transfer_template_context(
+ history_ids: list[int],
+ ) -> tuple[str, dict[str, int | str]]:
+ """仅负责把失败重试任务的动态数据映射成模板变量。"""
+ is_batch = len(history_ids) > 1
+ task_type = (
+ "batch_transfer_failed_retry" if is_batch else "transfer_failed_retry"
+ )
+ template_context: dict[str, int | str] = {
+ "history_ids_csv": ", ".join(str(item) for item in history_ids),
+ "history_count": len(history_ids),
+ }
+ if not is_batch:
+ template_context["history_id"] = history_ids[0]
+ return task_type, template_context
+
+ @staticmethod
+ def _build_retry_transfer_prompt(
+ history_ids: list[int],
+ ) -> str:
+ """根据失败记录数量构建统一的重试整理后台任务提示词。"""
+ runtime_config = agent_runtime_manager.load_runtime_config()
+ task_type, template_context = AgentManager._build_retry_transfer_template_context(
+ history_ids
+ )
+ return runtime_config.render_system_task_message(
+ task_type,
+ template_context=template_context,
+ )
+
+ @staticmethod
+ def _build_manual_redo_template_context(history) -> dict[str, int | str]:
+ """仅负责把整理历史对象映射成 SYSTEM_TASKS 需要的模板变量。"""
+ src_fileitem = history.src_fileitem or {}
+ source_path = src_fileitem.get("path") if isinstance(src_fileitem, dict) else ""
+ source_path = source_path or history.src or ""
+ season_episode = f"{history.seasons or ''}{history.episodes or ''}".strip()
+ # 这里故意只做数据整形,具体行为定义全部交给 SYSTEM_TASKS。
+ return {
+ "history_id": history.id,
+ "current_status": "success" if history.status else "failed",
+ "recognized_title": history.title or "unknown",
+ "media_type": history.type or "unknown",
+ "category": history.category or "unknown",
+ "year": history.year or "unknown",
+ "season_episode": season_episode or "unknown",
+ "source_path": source_path or "unknown",
+ "source_storage": history.src_storage or "local",
+ "destination_path": history.dest or "unknown",
+ "destination_storage": history.dest_storage or "unknown",
+ "transfer_mode": history.mode or "unknown",
+ "tmdbid": history.tmdbid or "none",
+ "doubanid": history.doubanid or "none",
+ "error_message": history.errmsg or "none",
+ }
+
async def heartbeat_check_jobs(self):
"""
心跳唤醒:检查并执行待处理的定时任务(Jobs)。
@@ -998,22 +1065,7 @@ class AgentManager:
user_id = SYSTEM_INTERNAL_USER_ID
logger.info("智能体心跳唤醒:开始检查待处理任务...")
-
- # 英文提示词,便于大模型理解
- heartbeat_message = (
- "[System Heartbeat] Check all jobs in your jobs directory and process pending tasks:\n"
- "1. List all jobs with status 'pending' or 'in_progress'\n"
- "2. For 'recurring' jobs, check 'last_run' to determine if it's time to run again\n"
- "3. For 'once' jobs with status 'pending', execute them now\n"
- "4. After executing each job, update its status, 'last_run' time, and execution log in the JOB.md file\n"
- "5. If there are no pending jobs, do NOT generate any response\n\n"
- "IMPORTANT: This is a background system task, NOT a user conversation. "
- "Your final response will be broadcast as a notification. "
- "Only output a brief completion summary listing each executed job and its result. "
- "Do NOT include greetings, explanations, or conversational text. "
- "If no jobs were executed, output nothing. "
- "Respond in Chinese (中文)."
- )
+ heartbeat_message = self._build_heartbeat_prompt()
await self.process_message(
session_id=session_id,
@@ -1096,57 +1148,7 @@ class AgentManager:
logger.info(
f"智能体重试整理:开始批量处理失败记录 IDs=[{ids_str}] (group={group_key})"
)
-
- if len(history_ids) == 1:
- # 单条记录,使用原有逻辑
- retry_message = (
- f"[System Task - Transfer Failed Retry] A file transfer/organization has failed. "
- f"Please use the 'transfer-failed-retry' skill to retry the failed transfer.\n\n"
- f"Failed transfer history record ID: {history_ids[0]}\n\n"
- f"Follow these steps:\n"
- f"1. Use `query_transfer_history` with status='failed' to find the record with id={history_ids[0]} "
- f"and understand the failure details (source path, error message, media info)\n"
- f"2. Analyze the error message to determine the best retry strategy\n"
- f"3. If the source file no longer exists, skip this retry and report that the file is missing\n"
- f"4. Delete the failed history record using `delete_transfer_history` with history_id={history_ids[0]}\n"
- f"5. Re-identify the media using `recognize_media` with the source file path\n"
- f"6. If recognition fails, try `search_media` with keywords from the filename\n"
- f"7. Re-transfer using `transfer_file` with the source path and any identified media info (tmdbid, media_type)\n"
- f"8. Report the final result\n\n"
- f"IMPORTANT: This is a background system task, NOT a user conversation. "
- f"Your final response will be broadcast as a notification. "
- f"Only output a brief result summary. "
- f"Do NOT include greetings, explanations, or conversational text. "
- f"Respond in Chinese (中文)."
- )
- else:
- # 多条记录,使用批量处理逻辑
- retry_message = (
- f"[System Task - Batch Transfer Failed Retry] Multiple file transfers from the same source "
- f"have failed. These files likely belong to the SAME media (e.g., multiple episodes of the same TV show). "
- f"Please use the 'transfer-failed-retry' skill to retry them efficiently.\n\n"
- f"Failed transfer history record IDs: {ids_str}\n"
- f"Total failed records: {len(history_ids)}\n\n"
- f"Follow these steps:\n"
- f"1. Use `query_transfer_history` with status='failed' to find ALL records with these IDs "
- f"and understand the failure details\n"
- f"2. Since these files are likely from the same media, analyze the FIRST record to determine "
- f"the media identity and the best retry strategy. The root cause is usually the same for all files.\n"
- f"3. If the error is about media recognition (e.g., '未识别到媒体信息'), identify the media ONCE "
- f"using `recognize_media` or `search_media`, then reuse that result (tmdbid, media_type) for all files\n"
- f"4. For EACH failed record:\n"
- f" a. Delete the failed history record using `delete_transfer_history`\n"
- f" b. Re-transfer using `transfer_file` with the source path and the identified media info\n"
- f"5. Report a summary of results (how many succeeded, how many failed)\n\n"
- f"IMPORTANT OPTIMIZATION: These files share the same media identity. "
- f"Do NOT call `recognize_media` or `search_media` repeatedly for each file. "
- f"Identify the media ONCE, then apply to all files.\n\n"
- f"IMPORTANT: This is a background system task, NOT a user conversation. "
- f"Your final response will be broadcast as a notification. "
- f"Only output a brief result summary. "
- f"Do NOT include greetings, explanations, or conversational text. "
- f"Respond in Chinese (中文)."
- )
+ retry_message = self._build_retry_transfer_prompt(history_ids)
try:
await self.process_message(
@@ -1186,57 +1188,10 @@ class AgentManager:
"""
构建手动 AI 整理提示词。
"""
- src_fileitem = history.src_fileitem or {}
- source_path = src_fileitem.get("path") if isinstance(src_fileitem, dict) else ""
- source_path = source_path or history.src or ""
- season_episode = f"{history.seasons or ''}{history.episodes or ''}".strip()
-
- return "\n".join(
- [
- "[System Task - Manual Transfer Re-Organize]",
- "A user manually triggered an AI re-organize task from the transfer history page.",
- "Your goal is to directly fix ONE transfer history record by using MoviePilot tools to analyze, clean up the old history entry if necessary, and organize the source file again.",
- "",
- "IMPORTANT:",
- "1. This is NOT a normal conversation. It is a background execution task.",
- "2. Do NOT rely on previous chat context. Work only from the record below.",
- "3. You should complete the re-organize by directly using tools such as `query_transfer_history`, `recognize_media`, `search_media`, `delete_transfer_history`, and `transfer_file`.",
- "4. Your final response must be a brief Chinese result summary only.",
- "",
- "Transfer history record:",
- f"- History ID: {history.id}",
- f"- Current status: {'success' if history.status else 'failed'}",
- f"- Current recognized title: {history.title or 'unknown'}",
- f"- Media type: {history.type or 'unknown'}",
- f"- Category: {history.category or 'unknown'}",
- f"- Year: {history.year or 'unknown'}",
- f"- Season/Episode: {season_episode or 'unknown'}",
- f"- Source path: {source_path or 'unknown'}",
- f"- Source storage: {history.src_storage or 'local'}",
- f"- Destination path: {history.dest or 'unknown'}",
- f"- Destination storage: {history.dest_storage or 'unknown'}",
- f"- Transfer mode: {history.mode or 'unknown'}",
- f"- Current TMDB ID: {history.tmdbid or 'none'}",
- f"- Current Douban ID: {history.doubanid or 'none'}",
- f"- Error message: {history.errmsg or 'none'}",
- "",
- "Required workflow:",
- f"1. Use `query_transfer_history` to locate and inspect the record with id={history.id}, and verify the source path, status, media info, and failure context.",
- "2. Decide whether the current recognition is trustworthy.",
- "3. If the source file no longer exists or cannot be safely processed, stop and report the reason.",
- "4. If the current recognition is wrong or the record should be reorganized, determine the correct media identity first.",
- "5. Prefer `recognize_media` with the source path. If recognition is not reliable, use `search_media` with keywords from filename/title/year.",
- "6. Only continue when you have high confidence in the target media.",
- "7. Before re-organizing, delete the old transfer history record with `delete_transfer_history` so the system will not skip the source file.",
- "8. Then use `transfer_file` to organize the source path directly.",
- "9. When calling `transfer_file`, reuse known context when appropriate: source storage, target path, target storage, transfer mode, season, tmdbid/doubanid, and media_type.",
- "10. If this record is already correct and no re-organize is needed, do not perform destructive actions; simply report that no change is necessary.",
- "",
- "Important execution rules:",
- "- Do NOT reorganize blindly when media identity is uncertain.",
- "- If the previous record was successful but obviously identified as the wrong media, still use the tool-based flow above instead of `/redo`.",
- "- Keep the final response short, in Chinese, and focused on outcome.",
- ]
+ runtime_config = agent_runtime_manager.load_runtime_config()
+ return runtime_config.render_system_task_message(
+ "manual_transfer_redo",
+ template_context=AgentManager._build_manual_redo_template_context(history),
)
async def manual_redo_transfer(
diff --git a/app/agent/middleware/hooks.py b/app/agent/middleware/hooks.py
new file mode 100644
index 00000000..57c7fce2
--- /dev/null
+++ b/app/agent/middleware/hooks.py
@@ -0,0 +1,68 @@
+"""结构化 Agent hooks 中间件。"""
+
+from collections.abc import Awaitable, Callable
+from typing import Annotated, NotRequired, TypedDict
+
+from langchain.agents.middleware.types import (
+ AgentMiddleware,
+ AgentState,
+ ContextT,
+ ModelRequest,
+ ModelResponse,
+ PrivateStateAttr, # noqa
+ ResponseT,
+)
+from langchain_core.runnables import RunnableConfig
+from langgraph.runtime import Runtime
+
+from app.agent.middleware.utils import append_to_system_message
+from app.agent.runtime import agent_runtime_manager
+
+
+class HooksState(AgentState):
+ """hooks 中间件状态。"""
+
+ hooks_prompt: NotRequired[Annotated[str, PrivateStateAttr]]
+
+
+class HooksStateUpdate(TypedDict):
+ """hooks 状态更新。"""
+
+ hooks_prompt: str
+
+
+class AgentHooksMiddleware(AgentMiddleware[HooksState, ContextT, ResponseT]): # noqa
+ """在固定生命周期点注入结构化 pre/in/post hooks。"""
+
+ state_schema = HooksState
+
+ async def abefore_agent( # noqa
+ self, state: HooksState, runtime: Runtime, config: RunnableConfig
+ ) -> HooksStateUpdate | None:
+ if "hooks_prompt" in state:
+ return None
+
+ runtime_config = agent_runtime_manager.load_runtime_config()
+ return HooksStateUpdate(hooks_prompt=runtime_config.render_hooks_prompt())
+
+ def modify_request(self, request: ModelRequest[ContextT]) -> ModelRequest[ContextT]: # noqa
+ hooks_prompt = request.state.get("hooks_prompt", "") # noqa
+ if not hooks_prompt:
+ return request
+
+ new_system_message = append_to_system_message(
+ request.system_message, hooks_prompt
+ )
+ return request.override(system_message=new_system_message)
+
+ async def awrap_model_call(
+ self,
+ request: ModelRequest[ContextT],
+ handler: Callable[
+ [ModelRequest[ContextT]], Awaitable[ModelResponse[ResponseT]]
+ ],
+ ) -> ModelResponse[ResponseT]:
+ return await handler(self.modify_request(request))
+
+
+__all__ = ["AgentHooksMiddleware"]
diff --git a/app/agent/middleware/memory.py b/app/agent/middleware/memory.py
index 6f0dcd37..3a768d4b 100644
--- a/app/agent/middleware/memory.py
+++ b/app/agent/middleware/memory.py
@@ -188,7 +188,8 @@ class MemoryMiddleware(AgentMiddleware[MemoryState, ContextT, ResponseT]): # no
支持多文件记忆组织:用户可以创建多个 `.md` 文件来按主题组织知识。
参数:
- memory_dir: 记忆文件目录路径。
+ memory_dir: 记忆文件目录路径。建议使用独立的 `config/agent/memory`
+ 目录,避免与 persona/workflow 等根层配置混写。
"""
state_schema = MemoryState
@@ -201,7 +202,7 @@ class MemoryMiddleware(AgentMiddleware[MemoryState, ContextT, ResponseT]): # no
"""初始化记忆中间件。
参数:
- memory_dir: 记忆文件目录路径(例如,`"/config/agent"`)。
+ memory_dir: 记忆文件目录路径(例如,`"/config/agent/memory"`)。
该目录下所有 `.md` 文件都会被自动加载为记忆。
"""
self.memory_dir = memory_dir
diff --git a/app/agent/prompt/Agent Prompt.txt b/app/agent/prompt/Agent Prompt.txt
deleted file mode 100644
index 1351c0e4..00000000
--- a/app/agent/prompt/Agent Prompt.txt
+++ /dev/null
@@ -1,73 +0,0 @@
-You are an AI media assistant powered by MoviePilot. You specialize in managing home media ecosystems: searching for movies/TV shows, managing subscriptions, overseeing downloads, and organizing media libraries.
-
-All your responses must be in **Chinese (中文)**.
-
-You act as a proactive agent. Your goal is to fully resolve the user's media-related requests autonomously. Do not end your turn until the task is complete or you are blocked and require user feedback.
-
-Core Capabilities:
-1. Media Search & Recognition — Identify movies, TV shows, and anime; recognize media from fuzzy filenames or incomplete titles.
-2. Subscription Management — Create rules for automated downloading; monitor trending content.
-3. Download Control — Search torrents across trackers; filter by quality, codec, and release group.
-4. System Status & Organization — Monitor downloads, server health, file transfers, renaming, and library cleanup.
-5. Visual Input Handling — Users may attach images from supported channels; analyze them together with the text when relevant.
-6. File Context Handling — User messages may arrive as structured JSON. Treat the `message` field as the user's text. Attachments appear in `files`; when `local_path` is present, use local file tools to inspect the uploaded file directly. When image input is disabled for the current model, user images may also be delivered through `files`.
-
-
-{verbose_spec}
-
-- Tone: professional, concise, restrained.
-- Be direct. NO unnecessary preamble, NO repeating user's words, NO explaining your thinking.
-- Prioritize task progress over conversation. Answer only what is necessary to move the task forward.
-- Do NOT flatter the user, praise the question, or use overly eager/service-oriented phrases.
-- Do NOT use emojis, exclamation marks, cute language, or excessive apology.
-- Prefer short declarative sentences. Default to one or two short paragraphs; use lists only when they improve scanability.
-- Use Markdown for structured data. Use `inline code` for media titles/paths.
-- Include key details (year, rating, resolution) but do NOT over-explain.
-- Do not stop for approval on read-only operations. Only confirm before critical actions (starting downloads, deleting subscriptions).
-- If the current channel supports image sending and an image would materially help, you may use the `send_message` tool with `image_url` to send it.
-- If the current channel supports file sending and you need to return a local image/file for the user to download, use `send_local_file`.
-{button_choice_spec}
-- Voice replies: {voice_reply_spec}
-- NOT a coding assistant. Do not offer code snippets.
-- If user has set preferred communication style in memory, follow that strictly.
-
-
-
-- Responses MUST be short and punchy: one sentence for confirmations, brief list for search results.
-- NO filler phrases like "Let me help you", "Here are the results", "I found..." — skip all unnecessary preamble.
-- NO repeating what user said.
-- NO narrating your internal reasoning.
-- NO praise, emotional cushioning, or unnecessary politeness padding.
-- After task completion: one line summary only.
-- When error occurs: brief acknowledgment + suggestion, then move on.
-
-
-
-1. Media Discovery: Identify exact media metadata (TMDB ID, Season/Episode) using search tools.
-2. Context Checking: Verify current status (already in library? already subscribed?).
-3. Action Execution: Perform the task with a brief status update only if the operation takes time.
-4. Final Confirmation: State the result concisely.
-
-
-
-- Call independent tools in parallel whenever possible.
-- If search results are ambiguous, use `query_media_detail` or `recognize_media` to clarify before proceeding.
-- If `search_media` fails, fall back to `search_web` or `recognize_media`. Only ask the user when all automated methods are exhausted.
-
-
-
-1. Download Safety: Present found torrents (size, seeds, quality) and get explicit consent before downloading.
-2. Subscription Logic: Check for the best matching quality profile based on user history or defaults.
-3. Library Awareness: Check if content already exists in the library to avoid duplicates.
-4. Error Handling: If a tool or site fails, briefly explain what went wrong and suggest an alternative.
-5. TV Subscription Rule: When calling `add_subscribe` for a TV show, omitting `season` means subscribe to season 1 only. To subscribe multiple seasons or the full series, call `add_subscribe` separately for each season.
-
-
-
-Specific markdown rules:
-{markdown_spec}
-
-
-
-{moviepilot_info}
-
diff --git a/app/agent/prompt/System Core Prompt.txt b/app/agent/prompt/System Core Prompt.txt
new file mode 100644
index 00000000..5e980abc
--- /dev/null
+++ b/app/agent/prompt/System Core Prompt.txt
@@ -0,0 +1,37 @@
+You are the MoviePilot agent runtime. Follow the injected root configuration to determine the active persona, workflow, and operator preferences.
+
+All your responses must be in **Chinese (中文)**.
+
+You act as a proactive agent. Your goal is to fully resolve the user's media-related requests autonomously. Do not end your turn until the task is complete or you are blocked and require user feedback.
+
+
+{runtime_sections}
+
+
+
+{verbose_spec}
+
+- Channel-aware formatting: Follow the capability rules below for Markdown, plain text, buttons, and voice replies.
+{button_choice_spec}
+- Voice replies: {voice_reply_spec}
+- If the current channel supports image sending and an image would materially help, you may use the `send_message` tool with `image_url` to send it.
+- If the current channel supports file sending and you need to return a local image or file for the user to download, use `send_local_file`.
+
+
+
+1. Media Search and Recognition - Identify movies, TV shows, and anime; recognize media from fuzzy filenames or incomplete titles.
+2. Subscription Management - Create rules for automated downloading and monitor trending content.
+3. Download Control - Search torrents across trackers and filter by quality, codec, and release group.
+4. System Status and Organization - Monitor downloads, server health, file transfers, renaming, and library cleanup.
+5. Visual Input Handling - Users may attach images from supported channels; analyze them together with the text when relevant.
+6. File Context Handling - User messages may arrive as structured JSON. Treat the `message` field as the user's text. Attachments appear in `files`; when `local_path` is present, use local file tools to inspect the uploaded file directly. When image input is disabled for the current model, user images may also be delivered through `files`.
+
+
+
+Specific markdown rules:
+{markdown_spec}
+
+
+
+{moviepilot_info}
+
diff --git a/app/agent/prompt/__init__.py b/app/agent/prompt/__init__.py
index e1ea7b19..f8e4d7c8 100644
--- a/app/agent/prompt/__init__.py
+++ b/app/agent/prompt/__init__.py
@@ -7,6 +7,7 @@ from typing import Dict
from app.core.config import settings
from app.log import logger
+from app.agent.runtime import agent_runtime_manager
from app.schemas import (
ChannelCapability,
ChannelCapabilities,
@@ -59,8 +60,12 @@ class PromptManager:
:param prefer_voice_reply: 是否优先使用语音回复
:return: 提示词内容
"""
- # 基础提示词
- base_prompt = self.load_prompt("Agent Prompt.txt")
+ # 根层运行时配置由独立装配器负责,避免人格/工作流继续硬编码在单文件 prompt 中。
+ runtime_config = agent_runtime_manager.load_runtime_config()
+ runtime_sections = runtime_config.render_prompt_sections()
+
+ # 基础提示词只保留 MoviePilot 运行时和渠道能力相关约束。
+ base_prompt = self.load_prompt("System Core Prompt.txt")
# 识别渠道
markdown_spec = ""
@@ -104,6 +109,7 @@ class PromptManager:
moviepilot_info=moviepilot_info,
voice_reply_spec=voice_reply_spec,
button_choice_spec=button_choice_spec,
+ runtime_sections=runtime_sections,
)
return base_prompt
diff --git a/app/agent/runtime.py b/app/agent/runtime.py
new file mode 100644
index 00000000..a3ae4ffc
--- /dev/null
+++ b/app/agent/runtime.py
@@ -0,0 +1,721 @@
+"""Agent 根层运行时配置管理。"""
+
+from __future__ import annotations
+
+import re
+import shutil
+import threading
+from dataclasses import dataclass, field
+from pathlib import Path
+from string import Formatter
+from typing import Any, Iterable, Optional
+
+import yaml
+
+from app.core.config import settings
+from app.log import logger
+
+CURRENT_PERSONA_FILE = "CURRENT_PERSONA.md"
+USER_PREFERENCES_FILE = "USER_PREFERENCES.md"
+SYSTEM_TASKS_FILE = "SYSTEM_TASKS.md"
+LEGACY_WAKE_FORMAT_FILE = "WAKE_FORMAT.md"
+SYSTEM_RUNTIME_DIR = "runtime"
+MEMORY_DIR = "memory"
+SKILLS_DIR = "skills"
+JOBS_DIR = "jobs"
+ACTIVITY_DIR = "activity"
+SYSTEM_TASKS_SCHEMA_VERSION = 2
+
+ROOT_LEVEL_RUNTIME_FILES = {
+ CURRENT_PERSONA_FILE,
+ "AGENT_PROFILE.md",
+ "AGENT_WORKFLOW.md",
+ "AGENT_HOOKS.md",
+ USER_PREFERENCES_FILE,
+ SYSTEM_TASKS_FILE,
+ LEGACY_WAKE_FORMAT_FILE,
+}
+
+FRONTMATTER_PATTERN = re.compile(r"^---\s*\n(.*?)\n---\s*\n?", re.DOTALL)
+
+
+class AgentRuntimeConfigError(ValueError):
+ """根层配置加载异常。"""
+
+
+@dataclass
+class ParsedMarkdownDocument:
+ """解析后的 Markdown 文档。"""
+
+ metadata: dict[str, Any]
+ body: str
+
+
+@dataclass
+class HookDefinition:
+ """结构化执行钩子定义。"""
+
+ path: Path
+ pre_task: list[str]
+ in_task: list[str]
+ post_task: list[str]
+
+
+@dataclass
+class SystemTaskTypeDefinition:
+ """单个后台系统任务定义。"""
+
+ header: str
+ objective: str
+ context_title: Optional[str] = None
+ context_lines: list[str] = field(default_factory=list)
+ steps_title: Optional[str] = None
+ steps: list[str] = field(default_factory=list)
+ task_rules: list[str] = field(default_factory=list)
+ empty_result: Optional[str] = None
+
+
+@dataclass
+class SystemTasksDefinition:
+ """统一的后台系统任务定义源。"""
+
+ path: Path
+ version: int
+ shared_rules: list[str]
+ task_types: dict[str, SystemTaskTypeDefinition]
+
+
+@dataclass
+class AgentRuntimeConfig:
+ """一次加载后的根层配置快照。"""
+
+ source_root: Path
+ active_persona: str
+ current_persona_path: Path
+ profile_path: Path
+ workflow_path: Path
+ hooks_path: Path
+ user_preferences_path: Optional[Path]
+ system_tasks_path: Path
+ extra_context_paths: list[Path]
+ profile_text: str
+ workflow_text: str
+ user_preferences_text: str
+ extra_contexts: list[tuple[Path, str]]
+ hooks: HookDefinition
+ system_tasks: SystemTasksDefinition
+ warnings: list[str] = field(default_factory=list)
+ used_fallback: bool = False
+
+ def render_prompt_sections(self) -> str:
+ """渲染进入系统提示词的根层配置片段。"""
+ sections: list[str] = [
+ "",
+ f"- Active persona: `{self.active_persona}`",
+ f"- Profile source: `{self.profile_path}`",
+ f"- Workflow source: `{self.workflow_path}`",
+ ]
+ if self.user_preferences_path:
+ sections.append(f"- Root preferences source: `{self.user_preferences_path}`")
+ sections.append(f"- System task source: `{self.system_tasks_path}`")
+ sections.append("")
+ sections.append("")
+ sections.append("")
+ sections.append(self.profile_text.strip() or "(No agent profile configured.)")
+ sections.append("")
+ sections.append("")
+ sections.append("")
+ sections.append(self.workflow_text.strip() or "(No agent workflow configured.)")
+ sections.append("")
+ if self.user_preferences_text.strip():
+ sections.append("")
+ sections.append("")
+ sections.append(self.user_preferences_text.strip())
+ sections.append("")
+ for path, text in self.extra_contexts:
+ if not text.strip():
+ continue
+ sections.append("")
+ sections.append(f'')
+ sections.append(text.strip())
+ sections.append("")
+ return "\n".join(sections).strip()
+
+ def render_hooks_prompt(self) -> str:
+ """渲染结构化 hooks 提示词。"""
+ blocks = [
+ "",
+ f"- Hook source: `{self.hooks.path}`",
+ "- These hooks are loaded structurally by the runtime and must be followed at the matching lifecycle stage.",
+ "",
+ "Pre-Task Hooks:",
+ self._format_hook_list(self.hooks.pre_task),
+ "",
+ "In-Task Hooks:",
+ self._format_hook_list(self.hooks.in_task),
+ "",
+ "Post-Task Hooks:",
+ self._format_hook_list(self.hooks.post_task),
+ "",
+ ]
+ return "\n".join(blocks)
+
+ def render_system_task_message(
+ self,
+ task_type: str,
+ *,
+ template_context: Optional[dict[str, Any]] = None,
+ extra_rules: Optional[list[str]] = None,
+ ) -> str:
+ """根据统一的后台系统任务定义渲染提示词。"""
+ task_definition = self.system_tasks.task_types.get(task_type)
+ if not task_definition:
+ raise AgentRuntimeConfigError(f"未定义的后台系统任务类型: {task_type}")
+
+ rendered_context = self._render_template_lines(
+ task_definition.context_lines,
+ template_context,
+ task_type,
+ "context_lines",
+ )
+ rendered_steps = self._render_template_lines(
+ task_definition.steps,
+ template_context,
+ task_type,
+ "steps",
+ )
+ rendered_task_rules = self._render_template_lines(
+ task_definition.task_rules,
+ template_context,
+ task_type,
+ "task_rules",
+ )
+
+ sections = [
+ self._render_template_text(
+ task_definition.header,
+ template_context,
+ task_type,
+ "header",
+ ).strip(),
+ self._render_template_text(
+ task_definition.objective,
+ template_context,
+ task_type,
+ "objective",
+ ).strip(),
+ ]
+ if rendered_context:
+ sections.append(
+ self._format_titled_lines(
+ task_definition.context_title or "Task context",
+ rendered_context,
+ )
+ )
+ if rendered_steps:
+ sections.append(
+ self._format_titled_lines(
+ task_definition.steps_title or "Follow these steps",
+ rendered_steps,
+ )
+ )
+
+ rules = list(self.system_tasks.shared_rules)
+ if task_definition.empty_result:
+ rules.append(task_definition.empty_result)
+ rules.extend(rendered_task_rules)
+ if extra_rules:
+ rules.extend(rule.strip() for rule in extra_rules if rule and rule.strip())
+ if rules:
+ sections.append(self._format_numbered_rules("IMPORTANT", rules))
+ return "\n\n".join(section for section in sections if section).strip()
+
+ @classmethod
+ def _render_template_text(
+ cls,
+ text: str,
+ template_context: Optional[dict[str, Any]],
+ task_type: str,
+ field_name: str,
+ ) -> str:
+ if not text:
+ return ""
+
+ formatter = Formatter()
+ required_fields = {
+ placeholder_name
+ for _, placeholder_name, _, _ in formatter.parse(text)
+ if placeholder_name
+ }
+ if not required_fields:
+ return text
+
+ context = cls._normalize_template_context(template_context)
+ missing_fields = sorted(field for field in required_fields if field not in context)
+ if missing_fields:
+ raise AgentRuntimeConfigError(
+ f"系统任务定义 `{task_type}` 的 `{field_name}` 缺少变量: "
+ + ", ".join(f"`{field}`" for field in missing_fields)
+ )
+
+ # 这里统一做字符串替换,让模板文件成为后台任务文案的唯一行为来源。
+ return text.format_map(context)
+
+ @classmethod
+ def _render_template_lines(
+ cls,
+ items: list[str],
+ template_context: Optional[dict[str, Any]],
+ task_type: str,
+ field_name: str,
+ ) -> list[str]:
+ return [
+ cls._render_template_text(
+ item,
+ template_context,
+ task_type,
+ f"{field_name}[{index}]",
+ ).rstrip()
+ for index, item in enumerate(items, start=1)
+ if item and item.rstrip()
+ ]
+
+ @staticmethod
+ def _normalize_template_context(
+ template_context: Optional[dict[str, Any]],
+ ) -> dict[str, str]:
+ if not template_context:
+ return {}
+ return {
+ str(key): "" if value is None else str(value)
+ for key, value in template_context.items()
+ }
+
+ @staticmethod
+ def _format_hook_list(items: list[str]) -> str:
+ if not items:
+ return "(No hooks configured.)"
+ return "\n".join(f"{index}. {item}" for index, item in enumerate(items, start=1))
+
+ @staticmethod
+ def _format_numbered_rules(title: str, items: list[str]) -> str:
+ return "\n".join(
+ [f"{title}:"]
+ + [f"{index}. {item}" for index, item in enumerate(items, start=1)]
+ )
+
+ @staticmethod
+ def _format_titled_lines(title: str, items: list[str]) -> str:
+ cleaned = [item.rstrip() for item in items if item and item.rstrip()]
+ return "\n".join([f"{title}:"] + cleaned)
+
+
+class AgentRuntimeManager:
+ """统一管理 agent 根层配置目录、迁移、校验与模板渲染。"""
+
+ def __init__(
+ self,
+ *,
+ agent_root_dir: Optional[Path] = None,
+ bundled_runtime_dir: Optional[Path] = None,
+ ) -> None:
+ self.agent_root_dir = agent_root_dir or (settings.CONFIG_PATH / "agent")
+ self.runtime_dir = self.agent_root_dir / SYSTEM_RUNTIME_DIR
+ self.memory_dir = self.agent_root_dir / MEMORY_DIR
+ self.skills_dir = self.agent_root_dir / SKILLS_DIR
+ self.jobs_dir = self.agent_root_dir / JOBS_DIR
+ self.activity_dir = self.agent_root_dir / ACTIVITY_DIR
+ self.bundled_runtime_dir = bundled_runtime_dir or (
+ Path(__file__).parent / "runtime_defaults"
+ )
+ self._cache_lock = threading.Lock()
+ self._cached_signature: Optional[tuple[tuple[str, int, int], ...]] = None
+ self._cached_config: Optional[AgentRuntimeConfig] = None
+
+ def ensure_layout(self) -> None:
+ """创建目录、同步默认文件,并迁移旧版 memory/runtime 文件。"""
+ self.agent_root_dir.mkdir(parents=True, exist_ok=True)
+ self.runtime_dir.mkdir(parents=True, exist_ok=True)
+ self.memory_dir.mkdir(parents=True, exist_ok=True)
+ self.skills_dir.mkdir(parents=True, exist_ok=True)
+ self.jobs_dir.mkdir(parents=True, exist_ok=True)
+ self.activity_dir.mkdir(parents=True, exist_ok=True)
+ self._migrate_root_runtime_files()
+ self._sync_bundled_runtime_defaults()
+ self._migrate_root_memory_files()
+
+ def load_runtime_config(self) -> AgentRuntimeConfig:
+ """加载配置。用户目录损坏时自动回退到内置默认配置。"""
+ self.ensure_layout()
+ signature = self._build_signature()
+ with self._cache_lock:
+ if self._cached_signature == signature and self._cached_config:
+ return self._cached_config
+
+ try:
+ config = self._load_from_root(self.runtime_dir)
+ except AgentRuntimeConfigError as err:
+ logger.warning("Agent 根层配置无效,回退到内置默认配置: %s", err)
+ config = self._load_from_root(self.bundled_runtime_dir)
+ config.used_fallback = True
+ config.warnings.insert(
+ 0, f"用户运行时配置加载失败,已回退到内置默认配置: {err}"
+ )
+
+ self._cached_signature = signature
+ self._cached_config = config
+ return config
+
+ def invalidate_cache(self) -> None:
+ """供测试或手动刷新时清理缓存。"""
+ with self._cache_lock:
+ self._cached_signature = None
+ self._cached_config = None
+
+ def _build_signature(self) -> tuple[tuple[str, int, int], ...]:
+ """基于运行时配置和内置默认配置生成文件签名。"""
+ entries: list[tuple[str, int, int]] = []
+ for prefix, root in (("runtime", self.runtime_dir), ("bundled", self.bundled_runtime_dir)):
+ if not root.exists():
+ continue
+ for path in sorted(root.rglob("*")):
+ if not path.is_file():
+ continue
+ stat = path.stat()
+ relative = path.relative_to(root).as_posix()
+ entries.append((f"{prefix}:{relative}", stat.st_mtime_ns, stat.st_size))
+ return tuple(entries)
+
+ def _sync_bundled_runtime_defaults(self) -> None:
+ """仅复制缺失的默认根层配置,避免覆盖用户自定义。"""
+ if not self.bundled_runtime_dir.exists():
+ return
+ for path in sorted(self.bundled_runtime_dir.rglob("*")):
+ relative = path.relative_to(self.bundled_runtime_dir)
+ target = self.runtime_dir / relative
+ if path.is_dir():
+ target.mkdir(parents=True, exist_ok=True)
+ continue
+ if target.exists():
+ continue
+ target.parent.mkdir(parents=True, exist_ok=True)
+ shutil.copy2(path, target)
+ logger.info("已同步默认 Agent 运行时文件: %s", target)
+
+ def _migrate_root_runtime_files(self) -> None:
+ """兼容早期直接放在 `config/agent` 根目录的 RFC 文件。"""
+ migration_targets = {
+ CURRENT_PERSONA_FILE: self.runtime_dir / CURRENT_PERSONA_FILE,
+ USER_PREFERENCES_FILE: self.runtime_dir / USER_PREFERENCES_FILE,
+ SYSTEM_TASKS_FILE: self.runtime_dir / "system_tasks" / SYSTEM_TASKS_FILE,
+ LEGACY_WAKE_FORMAT_FILE: self.runtime_dir / "system_tasks" / SYSTEM_TASKS_FILE,
+ "AGENT_PROFILE.md": self.runtime_dir / "personas" / "default" / "AGENT_PROFILE.md",
+ "AGENT_WORKFLOW.md": self.runtime_dir / "personas" / "default" / "AGENT_WORKFLOW.md",
+ "AGENT_HOOKS.md": self.runtime_dir / "personas" / "default" / "AGENT_HOOKS.md",
+ }
+ for filename, target in migration_targets.items():
+ source = self.agent_root_dir / filename
+ if not source.exists() or target.exists():
+ continue
+ target.parent.mkdir(parents=True, exist_ok=True)
+ source.rename(target)
+ logger.info("已迁移旧版 Agent 根配置文件: %s -> %s", source, target)
+
+ def _migrate_root_memory_files(self) -> None:
+ """将旧版根目录 memory 文件移入 `config/agent/memory`。"""
+ for path in sorted(self.agent_root_dir.glob("*.md")):
+ if path.name in ROOT_LEVEL_RUNTIME_FILES:
+ continue
+ target = self.memory_dir / path.name
+ if target.exists():
+ continue
+ path.rename(target)
+ logger.info("已迁移旧版 Agent memory 文件: %s -> %s", path, target)
+
+ def _load_from_root(self, root: Path) -> AgentRuntimeConfig:
+ current_persona_path = root / CURRENT_PERSONA_FILE
+ current_doc = self._read_markdown(current_persona_path)
+ current_meta = current_doc.metadata
+
+ active_persona = str(current_meta.get("active_persona") or "default").strip()
+ if not active_persona:
+ raise AgentRuntimeConfigError("CURRENT_PERSONA.md 缺少 active_persona")
+
+ profile_path = self._resolve_required_path(root, current_meta, "profile")
+ workflow_path = self._resolve_required_path(root, current_meta, "workflow")
+ hooks_path = self._resolve_required_path(root, current_meta, "hooks")
+ system_tasks_path = self._resolve_required_path(root, current_meta, "system_tasks")
+ user_preferences_path = self._resolve_optional_path(
+ root, current_meta.get("user_preferences")
+ )
+ extra_context_paths = self._resolve_optional_paths(
+ root, current_meta.get("extra_context_files", [])
+ )
+
+ profile_doc = self._read_markdown(profile_path)
+ workflow_doc = self._read_markdown(workflow_path)
+ hooks_doc = self._read_markdown(hooks_path)
+ system_tasks_doc = self._read_markdown(system_tasks_path)
+ preferences_doc = (
+ self._read_markdown(user_preferences_path)
+ if user_preferences_path and user_preferences_path.exists()
+ else ParsedMarkdownDocument(metadata={}, body="")
+ )
+ extra_contexts = [
+ (path, self._read_markdown(path).body)
+ for path in extra_context_paths
+ ]
+
+ hooks = self._parse_hooks_document(hooks_path, hooks_doc)
+ system_tasks = self._parse_system_tasks_document(
+ system_tasks_path,
+ system_tasks_doc,
+ )
+
+ warnings = self._validate_runtime_config(
+ current_meta=current_meta,
+ profile_path=profile_path,
+ workflow_path=workflow_path,
+ hooks_path=hooks_path,
+ user_preferences_path=user_preferences_path,
+ system_tasks_path=system_tasks_path,
+ extra_context_paths=extra_context_paths,
+ profile_text=profile_doc.body,
+ workflow_text=workflow_doc.body,
+ preferences_text=preferences_doc.body,
+ )
+ return AgentRuntimeConfig(
+ source_root=root,
+ active_persona=active_persona,
+ current_persona_path=current_persona_path,
+ profile_path=profile_path,
+ workflow_path=workflow_path,
+ hooks_path=hooks_path,
+ user_preferences_path=user_preferences_path,
+ system_tasks_path=system_tasks_path,
+ extra_context_paths=extra_context_paths,
+ profile_text=profile_doc.body,
+ workflow_text=workflow_doc.body,
+ user_preferences_text=preferences_doc.body,
+ extra_contexts=extra_contexts,
+ hooks=hooks,
+ system_tasks=system_tasks,
+ warnings=warnings,
+ )
+
+ @staticmethod
+ def _read_markdown(path: Path) -> ParsedMarkdownDocument:
+ if not path.exists():
+ raise AgentRuntimeConfigError(f"缺少配置文件: {path}")
+ try:
+ content = path.read_text(encoding="utf-8")
+ except Exception as err: # noqa: BLE001
+ raise AgentRuntimeConfigError(f"读取配置文件失败 {path}: {err}") from err
+
+ metadata: dict[str, Any] = {}
+ body = content
+ match = FRONTMATTER_PATTERN.match(content)
+ if match:
+ try:
+ metadata = yaml.safe_load(match.group(1)) or {}
+ except yaml.YAMLError as err:
+ raise AgentRuntimeConfigError(f"YAML frontmatter 解析失败 {path}: {err}") from err
+ if not isinstance(metadata, dict):
+ raise AgentRuntimeConfigError(f"frontmatter 必须是映射类型: {path}")
+ body = content[match.end():]
+ return ParsedMarkdownDocument(metadata=metadata, body=body.strip())
+
+ @staticmethod
+ def _resolve_required_path(root: Path, metadata: dict[str, Any], field_name: str) -> Path:
+ raw = metadata.get(field_name)
+ if not raw or not str(raw).strip():
+ raise AgentRuntimeConfigError(f"CURRENT_PERSONA.md 缺少必填字段 `{field_name}`")
+ return AgentRuntimeManager._resolve_relative_path(root, str(raw))
+
+ @staticmethod
+ def _resolve_optional_path(root: Path, raw: Any) -> Optional[Path]:
+ if not raw or not str(raw).strip():
+ return None
+ return AgentRuntimeManager._resolve_relative_path(root, str(raw))
+
+ @staticmethod
+ def _resolve_optional_paths(root: Path, values: Any) -> list[Path]:
+ if not values:
+ return []
+ if not isinstance(values, list):
+ raise AgentRuntimeConfigError("extra_context_files 必须是数组")
+ return [AgentRuntimeManager._resolve_relative_path(root, str(value)) for value in values]
+
+ @staticmethod
+ def _resolve_relative_path(root: Path, value: str) -> Path:
+ candidate = Path(value)
+ return candidate if candidate.is_absolute() else (root / candidate).resolve()
+
+ @staticmethod
+ def _normalize_string_list(values: Any, field_name: str) -> list[str]:
+ if values is None:
+ return []
+ if not isinstance(values, list):
+ raise AgentRuntimeConfigError(f"{field_name} 必须是字符串数组")
+ normalized: list[str] = []
+ for value in values:
+ text = str(value).strip()
+ if text:
+ normalized.append(text)
+ return normalized
+
+ def _parse_hooks_document(
+ self, path: Path, document: ParsedMarkdownDocument
+ ) -> HookDefinition:
+ pre_task = self._normalize_string_list(document.metadata.get("pre_task"), "pre_task")
+ in_task = self._normalize_string_list(document.metadata.get("in_task"), "in_task")
+ post_task = self._normalize_string_list(
+ document.metadata.get("post_task"), "post_task"
+ )
+ if not (pre_task or in_task or post_task):
+ raise AgentRuntimeConfigError(f"{path} 未定义任何结构化 hooks")
+ return HookDefinition(
+ path=path,
+ pre_task=pre_task,
+ in_task=in_task,
+ post_task=post_task,
+ )
+
+ def _parse_system_tasks_document(
+ self, path: Path, document: ParsedMarkdownDocument
+ ) -> SystemTasksDefinition:
+ """解析后台系统任务定义文件。"""
+ version = self._normalize_positive_int(
+ document.metadata.get("version"),
+ "version",
+ default=1,
+ )
+ if version < SYSTEM_TASKS_SCHEMA_VERSION:
+ raise AgentRuntimeConfigError(
+ f"{path} 的 version={version} 过旧,"
+ f"当前要求 SYSTEM_TASKS schema v{SYSTEM_TASKS_SCHEMA_VERSION} 或更高版本"
+ )
+ shared_rules = self._normalize_string_list(
+ document.metadata.get("shared_rules"), "shared_rules"
+ )
+ if not shared_rules:
+ raise AgentRuntimeConfigError(f"{path} 缺少 shared_rules")
+
+ raw_task_types = document.metadata.get("task_types")
+ if not isinstance(raw_task_types, dict) or not raw_task_types:
+ raise AgentRuntimeConfigError(f"{path} 缺少 task_types 映射")
+
+ task_types: dict[str, SystemTaskTypeDefinition] = {}
+ for key, raw in raw_task_types.items():
+ if not isinstance(raw, dict):
+ raise AgentRuntimeConfigError(f"task_types.{key} 必须是映射")
+ header = str(raw.get("header") or "").strip()
+ objective = str(raw.get("objective") or "").strip()
+ if not header or not objective:
+ raise AgentRuntimeConfigError(
+ f"task_types.{key} 缺少 header 或 objective"
+ )
+ context_lines = self._normalize_string_list(
+ raw.get("context_lines"),
+ f"task_types.{key}.context_lines",
+ )
+ steps = self._normalize_string_list(
+ raw.get("steps"),
+ f"task_types.{key}.steps",
+ )
+ task_rules = self._normalize_string_list(
+ raw.get("task_rules"),
+ f"task_types.{key}.task_rules",
+ )
+ empty_result = str(raw.get("empty_result") or "").strip() or None
+ context_title = str(raw.get("context_title") or "").strip() or None
+ steps_title = str(raw.get("steps_title") or "").strip() or None
+ task_types[str(key)] = SystemTaskTypeDefinition(
+ header=header,
+ objective=objective,
+ context_title=context_title,
+ context_lines=context_lines,
+ steps_title=steps_title,
+ steps=steps,
+ task_rules=task_rules,
+ empty_result=empty_result,
+ )
+ return SystemTasksDefinition(
+ path=path,
+ version=version,
+ shared_rules=shared_rules,
+ task_types=task_types,
+ )
+
+ @staticmethod
+ def _normalize_positive_int(
+ value: Any,
+ field_name: str,
+ *,
+ default: int,
+ ) -> int:
+ if value in (None, ""):
+ return default
+ try:
+ normalized = int(value)
+ except (TypeError, ValueError) as err:
+ raise AgentRuntimeConfigError(f"{field_name} 必须是正整数") from err
+ if normalized <= 0:
+ raise AgentRuntimeConfigError(f"{field_name} 必须是正整数")
+ return normalized
+
+ def _validate_runtime_config(
+ self,
+ *,
+ current_meta: dict[str, Any],
+ profile_path: Path,
+ workflow_path: Path,
+ hooks_path: Path,
+ user_preferences_path: Optional[Path],
+ system_tasks_path: Path,
+ extra_context_paths: list[Path],
+ profile_text: str,
+ workflow_text: str,
+ preferences_text: str,
+ ) -> list[str]:
+ warnings: list[str] = []
+ required_paths = [profile_path, workflow_path, hooks_path, system_tasks_path]
+ if user_preferences_path:
+ required_paths.append(user_preferences_path)
+ duplicates = self._find_duplicate_paths(required_paths + extra_context_paths)
+ if duplicates:
+ warnings.append(
+ "检测到重复引用的根层配置文件: "
+ + ", ".join(path.as_posix() for path in duplicates)
+ )
+
+ deprecated_phrases = self._normalize_string_list(
+ current_meta.get("deprecated_phrases"), "deprecated_phrases"
+ )
+ if deprecated_phrases:
+ scan_targets = {
+ "profile": profile_text,
+ "workflow": workflow_text,
+ "user_preferences": preferences_text,
+ }
+ for phrase in deprecated_phrases:
+ for target_name, text in scan_targets.items():
+ if phrase and phrase in text:
+ warnings.append(
+ f"检测到已废弃短语 `{phrase}` 仍出现在 {target_name} 中"
+ )
+ return warnings
+
+ @staticmethod
+ def _find_duplicate_paths(paths: Iterable[Path]) -> list[Path]:
+ seen: set[Path] = set()
+ duplicates: list[Path] = []
+ for path in paths:
+ resolved = path.resolve()
+ if resolved in seen and resolved not in duplicates:
+ duplicates.append(resolved)
+ seen.add(resolved)
+ return duplicates
+
+
+agent_runtime_manager = AgentRuntimeManager()
diff --git a/app/agent/runtime_defaults/CURRENT_PERSONA.md b/app/agent/runtime_defaults/CURRENT_PERSONA.md
new file mode 100644
index 00000000..843b8a15
--- /dev/null
+++ b/app/agent/runtime_defaults/CURRENT_PERSONA.md
@@ -0,0 +1,24 @@
+---
+version: 1
+active_persona: default
+profile: personas/default/AGENT_PROFILE.md
+workflow: personas/default/AGENT_WORKFLOW.md
+hooks: personas/default/AGENT_HOOKS.md
+user_preferences: USER_PREFERENCES.md
+system_tasks: system_tasks/SYSTEM_TASKS.md
+extra_context_files: []
+deprecated_phrases: []
+---
+# CURRENT_PERSONA
+
+当前激活人格:`default`
+
+加载顺序固定如下:
+
+1. `AGENT_PROFILE.md`
+2. `AGENT_WORKFLOW.md`
+3. `AGENT_HOOKS.md`
+4. `USER_PREFERENCES.md`
+5. `SYSTEM_TASKS.md`
+
+如果需要扩展额外上下文,请使用 `extra_context_files` 显式声明,而不是把额外规则散落到 memory 中。
diff --git a/app/agent/runtime_defaults/USER_PREFERENCES.md b/app/agent/runtime_defaults/USER_PREFERENCES.md
new file mode 100644
index 00000000..adec6b67
--- /dev/null
+++ b/app/agent/runtime_defaults/USER_PREFERENCES.md
@@ -0,0 +1,10 @@
+---
+version: 1
+---
+# USER_PREFERENCES
+
+这是根层的运维偏好文件,不是用户长期记忆。
+
+- 这里只放稳定的系统级输出规则或部署方偏好。
+- 用户在对话中形成的长期习惯,仍应写入 `config/agent/memory/*.md`。
+- 默认保持精简,避免与 `AGENT_PROFILE.md` 或 `AGENT_WORKFLOW.md` 重复。
diff --git a/app/agent/runtime_defaults/personas/default/AGENT_HOOKS.md b/app/agent/runtime_defaults/personas/default/AGENT_HOOKS.md
new file mode 100644
index 00000000..8c3a957b
--- /dev/null
+++ b/app/agent/runtime_defaults/personas/default/AGENT_HOOKS.md
@@ -0,0 +1,26 @@
+---
+version: 1
+pre_task:
+ - Identify whether the request is a normal user conversation or a background system task before choosing a workflow.
+ - Classify intent before acting, then prefer an existing skill or dedicated workflow over ad-hoc prompting.
+ - Check read-only context first so the final action is based on current library, subscription, or history state.
+ - Only stop for confirmation when the next action is destructive, high-impact, or user-facing.
+ - Keep the final delivery target explicit before calling tools.
+in_task:
+ - Execute in small, outcome-oriented steps and prefer tool calls over long explanations when the task is actionable.
+ - Reuse known media identity, prior tool results, and shared context instead of repeating expensive recognition or search calls.
+ - When a tool fails, try one narrower fallback path before escalating to the user.
+ - Keep intermediate user-facing output minimal; when verbose mode is disabled, stay silent until the final result.
+ - Treat progress reporting as task-specific glue, not a shared abstraction to leak into every tool.
+post_task:
+ - Perform the minimum validation needed to confirm the result actually landed.
+ - Summarize only the outcome, key media facts, and the remaining blocker if something still failed.
+ - If the task established a reusable workflow, prefer encoding it in skills or root config instead of relying on prompt residue.
+---
+# AGENT_HOOKS
+
+这些 hooks 由运行时结构化加载,不依赖自由文本约定。
+
+- `pre_task` 对应开始执行前的统一检查点。
+- `in_task` 对应工具调用和失败降级阶段。
+- `post_task` 对应最小验证与收口阶段。
diff --git a/app/agent/runtime_defaults/personas/default/AGENT_PROFILE.md b/app/agent/runtime_defaults/personas/default/AGENT_PROFILE.md
new file mode 100644
index 00000000..c15d94b7
--- /dev/null
+++ b/app/agent/runtime_defaults/personas/default/AGENT_PROFILE.md
@@ -0,0 +1,27 @@
+---
+version: 1
+---
+# AGENT_PROFILE
+
+- Identity: You are an AI media assistant powered by MoviePilot. You specialize in managing home media ecosystems: searching for movies and TV shows, managing subscriptions, overseeing downloads, and organizing media libraries.
+- Tone: professional, concise, restrained.
+- Be direct. NO unnecessary preamble, NO repeating user's words, NO explaining your thinking.
+- Prioritize task progress over conversation. Answer only what is necessary to move the task forward.
+- Do NOT flatter the user, praise the question, or use overly eager service phrases.
+- Do NOT use emojis, exclamation marks, cute language, or excessive apology.
+- Prefer short declarative sentences. Default to one or two short paragraphs; use lists only when they improve scanability.
+- Use Markdown for structured data. Use `inline code` for media titles and paths.
+- Include key details such as year, rating, and resolution, but do NOT over-explain.
+- Do not stop for approval on read-only operations. Only confirm before critical actions such as starting downloads or deleting subscriptions.
+- NOT a coding assistant. Do not offer code snippets.
+- If user has set preferred communication style in memory, follow that strictly.
+
+# RESPONSE_FORMAT
+
+- Responses MUST be short and punchy: one sentence for confirmations, brief list for search results.
+- NO filler phrases like "Let me help you", "Here are the results", "I found..." - skip all unnecessary preamble.
+- NO repeating what user said.
+- NO narrating your internal reasoning.
+- NO praise, emotional cushioning, or unnecessary politeness padding.
+- After task completion: one line summary only.
+- When error occurs: brief acknowledgment plus suggestion, then move on.
diff --git a/app/agent/runtime_defaults/personas/default/AGENT_WORKFLOW.md b/app/agent/runtime_defaults/personas/default/AGENT_WORKFLOW.md
new file mode 100644
index 00000000..1eb8c685
--- /dev/null
+++ b/app/agent/runtime_defaults/personas/default/AGENT_WORKFLOW.md
@@ -0,0 +1,25 @@
+---
+version: 1
+---
+# AGENT_WORKFLOW
+
+## FLOW
+
+1. Media Discovery: Identify exact media metadata such as TMDB ID and Season or Episode using search tools.
+2. Context Checking: Verify current status such as whether the media is already in the library or already subscribed.
+3. Action Execution: Perform the task with a brief status update only if the operation takes time.
+4. Final Confirmation: State the result concisely.
+
+## TOOL_CALLING_STRATEGY
+
+- Call independent tools in parallel whenever possible.
+- If search results are ambiguous, use `query_media_detail` or `recognize_media` to clarify before proceeding.
+- If `search_media` fails, fall back to `search_web` or `recognize_media`. Only ask the user when all automated methods are exhausted.
+
+## MEDIA_MANAGEMENT_RULES
+
+1. Download Safety: Present found torrents with size, seeds, and quality, then get explicit consent before downloading.
+2. Subscription Logic: Check for the best matching quality profile based on user history or defaults.
+3. Library Awareness: Check if content already exists in the library to avoid duplicates.
+4. Error Handling: If a tool or site fails, briefly explain what went wrong and suggest an alternative.
+5. TV Subscription Rule: When calling `add_subscribe` for a TV show, omitting `season` means subscribe to season 1 only. To subscribe multiple seasons or the full series, call `add_subscribe` separately for each season.
diff --git a/app/agent/runtime_defaults/system_tasks/SYSTEM_TASKS.md b/app/agent/runtime_defaults/system_tasks/SYSTEM_TASKS.md
new file mode 100644
index 00000000..a1bc0b31
--- /dev/null
+++ b/app/agent/runtime_defaults/system_tasks/SYSTEM_TASKS.md
@@ -0,0 +1,108 @@
+---
+version: 2
+shared_rules:
+ - This is a background system task, NOT a user conversation.
+ - Your final response will be broadcast as a notification.
+ - Do NOT include greetings, explanations, or conversational text.
+ - Respond in Chinese (中文).
+task_types:
+ heartbeat:
+ header: "[System Heartbeat]"
+ objective: "Check all jobs in your jobs directory and process pending tasks."
+ steps_title: "Follow these steps"
+ steps:
+ - "List all jobs with status 'pending' or 'in_progress'."
+ - "For 'recurring' jobs, check 'last_run' to determine if it's time to run again."
+ - "For 'once' jobs with status 'pending', execute them now."
+ - "After executing each job, update its status, 'last_run' time, and execution log in the JOB.md file."
+ empty_result: "If no jobs were executed, output nothing."
+ health_check:
+ header: "[System Health Check]"
+ objective: "Verify that the agent execution pipeline is alive."
+ steps_title: "Follow these steps"
+ steps:
+ - "Verify that runtime config, tools, and jobs can all be accessed normally."
+ - "If a real issue is detected, report the failing subsystem and the immediate blocking reason."
+ empty_result: "If there is nothing meaningful to report, output OK only."
+ transfer_failed_retry:
+ header: "[System Task - Transfer Failed Retry]"
+ objective: "A file transfer or organization has failed. Please use the `transfer-failed-retry` skill to retry the failed transfer."
+ context_title: "Task context"
+ context_lines:
+ - "Failed transfer history record IDs: {history_ids_csv}"
+ - "Total failed records: {history_count}"
+ steps_title: "Follow these steps"
+ steps:
+ - "Use `query_transfer_history` with status='failed' to find the record with id={history_id} and understand the failure details such as source path, error message, and media info."
+ - "Analyze the error message to determine the best retry strategy."
+ - "If the source file no longer exists, skip this retry and report that the file is missing."
+ - "Delete the failed history record using `delete_transfer_history` with history_id={history_id}."
+ - "Re-identify the media using `recognize_media` with the source file path."
+ - "If recognition fails, try `search_media` with keywords from the filename."
+ - "Re-transfer using `transfer_file` with the source path and any identified media info such as tmdbid and media_type."
+ - "Report the final result."
+ batch_transfer_failed_retry:
+ header: "[System Task - Batch Transfer Failed Retry]"
+ objective: "Multiple file transfers from the same source have failed. These files likely belong to the same media. Please use the `transfer-failed-retry` skill to retry them efficiently."
+ context_title: "Task context"
+ context_lines:
+ - "Failed transfer history record IDs: {history_ids_csv}"
+ - "Total failed records: {history_count}"
+ steps_title: "Follow these steps"
+ steps:
+ - "Use `query_transfer_history` with status='failed' to find all records with these IDs and understand the failure details."
+ - "Analyze the first record to determine the shared media identity and the best retry strategy because the root cause is usually the same for all files."
+ - "If the error is about media recognition, identify the media once using `recognize_media` or `search_media`, then reuse that result for all files."
+ - "For each failed record, delete the old history entry with `delete_transfer_history` and re-transfer using `transfer_file`."
+ - "Report how many retries succeeded and how many still failed."
+ task_rules:
+ - "These files share the same media identity. Do NOT call `recognize_media` or `search_media` repeatedly for each file."
+ manual_transfer_redo:
+ header: "[System Task - Manual Transfer Re-Organize]"
+ objective: "A user manually triggered an AI re-organize task from the transfer history page."
+ context_title: "Transfer history record"
+ context_lines:
+ - "- History ID: {history_id}"
+ - "- Current status: {current_status}"
+ - "- Current recognized title: {recognized_title}"
+ - "- Media type: {media_type}"
+ - "- Category: {category}"
+ - "- Year: {year}"
+ - "- Season/Episode: {season_episode}"
+ - "- Source path: {source_path}"
+ - "- Source storage: {source_storage}"
+ - "- Destination path: {destination_path}"
+ - "- Destination storage: {destination_storage}"
+ - "- Transfer mode: {transfer_mode}"
+ - "- Current TMDB ID: {tmdbid}"
+ - "- Current Douban ID: {doubanid}"
+ - "- Error message: {error_message}"
+ steps_title: "Required workflow"
+ steps:
+ - "Use `query_transfer_history` to locate and inspect the record with id={history_id}, and verify the source path, status, media info, and failure context."
+ - "Decide whether the current recognition is trustworthy."
+ - "If the source file no longer exists or cannot be safely processed, stop and report the reason."
+ - "If the current recognition is wrong or the record should be reorganized, determine the correct media identity first."
+ - "Prefer `recognize_media` with the source path. If recognition is not reliable, use `search_media` with keywords from filename, title, or year."
+ - "Only continue when you have high confidence in the target media."
+ - "Before re-organizing, delete the old transfer history record with `delete_transfer_history` so the system will not skip the source file."
+ - "Then use `transfer_file` to organize the source path directly."
+ - "When calling `transfer_file`, reuse known context when appropriate: source storage, target path, target storage, transfer mode, season, tmdbid or doubanid, and media_type."
+ - "If this record is already correct and no re-organize is needed, do not perform destructive actions; simply report that no change is necessary."
+ task_rules:
+ - "Do NOT rely on previous chat context. Work only from the record above."
+ - "Your goal is to directly fix one transfer history record by using MoviePilot tools to analyze, clean up the old history entry if necessary, and organize the source file again."
+ - "You should complete the re-organize by directly using tools such as `query_transfer_history`, `recognize_media`, `search_media`, `delete_transfer_history`, and `transfer_file`."
+ - "Do NOT reorganize blindly when media identity is uncertain."
+ - "If the previous record was successful but obviously identified as the wrong media, still use the tool-based flow above instead of `/redo`."
+ - "Keep the final response short and focused on outcome."
+---
+# SYSTEM_TASKS
+
+这是后台系统任务的唯一定义源。
+
+- `shared_rules` 负责统一口径。
+- `task_types..context_lines` 负责定义上下文字段展示。
+- `task_types..steps` 负责定义任务执行步骤。
+- `task_types..task_rules` 负责定义该任务独有的补充约束。
+- 代码侧只负责触发任务并提供模板变量,不再保存具体行为提示词。
diff --git a/tests/test_agent_prompt_style.py b/tests/test_agent_prompt_style.py
index 93a463a6..59f9e1a0 100644
--- a/tests/test_agent_prompt_style.py
+++ b/tests/test_agent_prompt_style.py
@@ -26,6 +26,13 @@ class TestAgentPromptStyle(unittest.TestCase):
prompt,
)
+ def test_prompt_uses_root_runtime_sections(self):
+ prompt = prompt_manager.get_agent_prompt()
+
+ self.assertIn("", prompt)
+ self.assertIn("", prompt)
+ self.assertIn("Active persona: `default`", prompt)
+
def test_non_verbose_prompt_requires_silence_until_all_tools_finish(self):
with patch.object(settings, "AI_AGENT_VERBOSE", False):
prompt = prompt_manager.get_agent_prompt()
diff --git a/tests/test_agent_runtime.py b/tests/test_agent_runtime.py
new file mode 100644
index 00000000..2720da40
--- /dev/null
+++ b/tests/test_agent_runtime.py
@@ -0,0 +1,192 @@
+import shutil
+import tempfile
+import textwrap
+import unittest
+from pathlib import Path
+
+from app.agent.runtime import AgentRuntimeConfigError, AgentRuntimeManager
+
+
+class TestAgentRuntimeConfig(unittest.TestCase):
+ def setUp(self):
+ self._tempdir = tempfile.TemporaryDirectory()
+ self.addCleanup(self._tempdir.cleanup)
+ self.temp_root = Path(self._tempdir.name)
+ self.agent_root = self.temp_root / "agent"
+ self.bundled_root = (
+ Path(__file__).resolve().parents[1] / "app" / "agent" / "runtime_defaults"
+ )
+
+ def _manager(self) -> AgentRuntimeManager:
+ return AgentRuntimeManager(
+ agent_root_dir=self.agent_root,
+ bundled_runtime_dir=self.bundled_root,
+ )
+
+ def test_load_runtime_config_syncs_defaults_and_parses_sections(self):
+ manager = self._manager()
+
+ runtime_config = manager.load_runtime_config()
+
+ self.assertEqual(runtime_config.active_persona, "default")
+ self.assertIn("professional, concise, restrained", runtime_config.profile_text)
+ self.assertIn(
+ "omitting `season` means subscribe to season 1 only",
+ runtime_config.workflow_text,
+ )
+ self.assertTrue((self.agent_root / "runtime" / "CURRENT_PERSONA.md").exists())
+
+ def test_legacy_root_markdown_is_migrated_to_memory_directory(self):
+ self.agent_root.mkdir(parents=True, exist_ok=True)
+ legacy_memory = self.agent_root / "MEMORY.md"
+ legacy_memory.write_text("# Legacy Memory\n", encoding="utf-8")
+ legacy_persona = self.agent_root / "CURRENT_PERSONA.md"
+ legacy_persona.write_text(
+ textwrap.dedent(
+ """\
+ ---
+ active_persona: default
+ profile: personas/default/AGENT_PROFILE.md
+ workflow: personas/default/AGENT_WORKFLOW.md
+ hooks: personas/default/AGENT_HOOKS.md
+ system_tasks: system_tasks/SYSTEM_TASKS.md
+ user_preferences: USER_PREFERENCES.md
+ ---
+ """
+ ),
+ encoding="utf-8",
+ )
+
+ manager = self._manager()
+ manager.ensure_layout()
+
+ self.assertFalse(legacy_memory.exists())
+ self.assertTrue((self.agent_root / "memory" / "MEMORY.md").exists())
+ self.assertFalse(legacy_persona.exists())
+ self.assertTrue((self.agent_root / "runtime" / "CURRENT_PERSONA.md").exists())
+
+ def test_render_system_task_message_uses_unified_system_tasks_definition(self):
+ manager = self._manager()
+ runtime_config = manager.load_runtime_config()
+
+ message = runtime_config.render_system_task_message("heartbeat")
+
+ self.assertIn("[System Heartbeat]", message)
+ self.assertIn("List all jobs with status 'pending' or 'in_progress'.", message)
+ self.assertIn("Do NOT include greetings, explanations, or conversational text.", message)
+ self.assertIn("If no jobs were executed, output nothing.", message)
+
+ def test_render_system_task_message_renders_template_context(self):
+ manager = self._manager()
+ runtime_config = manager.load_runtime_config()
+
+ message = runtime_config.render_system_task_message(
+ "transfer_failed_retry",
+ template_context={
+ "history_ids_csv": "7",
+ "history_count": 1,
+ "history_id": 7,
+ },
+ )
+
+ self.assertIn("Failed transfer history record IDs: 7", message)
+ self.assertIn("Total failed records: 1", message)
+ self.assertIn("history_id=7", message)
+
+ def test_missing_template_context_raises_clear_error(self):
+ manager = self._manager()
+ runtime_config = manager.load_runtime_config()
+
+ with self.assertRaises(AgentRuntimeConfigError):
+ runtime_config.render_system_task_message("transfer_failed_retry")
+
+ def test_invalid_user_runtime_config_falls_back_to_bundled_defaults(self):
+ manager = self._manager()
+ manager.ensure_layout()
+ invalid_current = self.agent_root / "runtime" / "CURRENT_PERSONA.md"
+ invalid_current.write_text(
+ textwrap.dedent(
+ """\
+ ---
+ active_persona: broken
+ profile: personas/default/AGENT_PROFILE.md
+ hooks: personas/default/AGENT_HOOKS.md
+ system_tasks: system_tasks/SYSTEM_TASKS.md
+ ---
+ """
+ ),
+ encoding="utf-8",
+ )
+ manager.invalidate_cache()
+
+ runtime_config = manager.load_runtime_config()
+
+ self.assertTrue(runtime_config.used_fallback)
+ self.assertEqual(runtime_config.active_persona, "default")
+ self.assertIn("已回退到内置默认配置", runtime_config.warnings[0])
+
+ def test_deprecated_phrase_warning_is_reported(self):
+ self.agent_root.mkdir(parents=True, exist_ok=True)
+ runtime_root = self.agent_root / "runtime"
+ shutil.copytree(self.bundled_root, runtime_root)
+ current_persona = runtime_root / "CURRENT_PERSONA.md"
+ current_persona.write_text(
+ textwrap.dedent(
+ """\
+ ---
+ version: 1
+ active_persona: default
+ profile: personas/default/AGENT_PROFILE.md
+ workflow: personas/default/AGENT_WORKFLOW.md
+ hooks: personas/default/AGENT_HOOKS.md
+ user_preferences: USER_PREFERENCES.md
+ system_tasks: system_tasks/SYSTEM_TASKS.md
+ extra_context_files: []
+ deprecated_phrases:
+ - professional, concise, restrained
+ ---
+ """
+ ),
+ encoding="utf-8",
+ )
+
+ manager = self._manager()
+ manager.invalidate_cache()
+ runtime_config = manager.load_runtime_config()
+
+ self.assertTrue(
+ any("professional, concise, restrained" in warning for warning in runtime_config.warnings)
+ )
+
+ def test_outdated_system_tasks_definition_falls_back_to_bundled_defaults(self):
+ self.agent_root.mkdir(parents=True, exist_ok=True)
+ runtime_root = self.agent_root / "runtime"
+ shutil.copytree(self.bundled_root, runtime_root)
+ system_tasks = runtime_root / "system_tasks" / "SYSTEM_TASKS.md"
+ system_tasks.write_text(
+ textwrap.dedent(
+ """\
+ ---
+ version: 1
+ shared_rules:
+ - legacy system tasks
+ task_types:
+ heartbeat:
+ header: "[Legacy Heartbeat]"
+ objective: "legacy"
+ ---
+ """
+ ),
+ encoding="utf-8",
+ )
+
+ manager = self._manager()
+ manager.invalidate_cache()
+ runtime_config = manager.load_runtime_config()
+
+ self.assertTrue(runtime_config.used_fallback)
+ self.assertEqual(runtime_config.system_tasks.version, 2)
+
+
+if __name__ == "__main__":
+ unittest.main()