mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-04-29 21:23:14 +08:00
feat: 整理失败时AI智能体自动重试
- 新增 delete_transfer_history 工具供智能体删除失败历史记录 - 新增 transfer-failed-retry 技能引导智能体执行重试流程 - 新增 AI_AGENT_RETRY_TRANSFER 配置项控制是否启用 - AgentManager 新增 retry_failed_transfer() 方法创建独立会话执行重试 - 整理失败和媒体未识别时自动触发智能体重试
This commit is contained in:
@@ -42,12 +42,12 @@ class MoviePilotAgent:
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
session_id: str,
|
||||
user_id: str = None,
|
||||
channel: str = None,
|
||||
source: str = None,
|
||||
username: str = None,
|
||||
self,
|
||||
session_id: str,
|
||||
user_id: str = None,
|
||||
channel: str = None,
|
||||
source: str = None,
|
||||
username: str = None,
|
||||
):
|
||||
self.session_id = session_id
|
||||
self.user_id = user_id
|
||||
@@ -92,10 +92,10 @@ class MoviePilotAgent:
|
||||
if block.get("thought"):
|
||||
continue
|
||||
if block.get("type") in (
|
||||
"thinking",
|
||||
"reasoning_content",
|
||||
"reasoning",
|
||||
"thought",
|
||||
"thinking",
|
||||
"reasoning_content",
|
||||
"reasoning",
|
||||
"thought",
|
||||
):
|
||||
continue
|
||||
if block.get("type") == "text":
|
||||
@@ -209,7 +209,7 @@ class MoviePilotAgent:
|
||||
return error_message
|
||||
|
||||
async def _stream_agent_tokens(
|
||||
self, agent, messages: dict, config: dict, on_token: Callable[[str], None]
|
||||
self, agent, messages: dict, config: dict, on_token: Callable[[str], None]
|
||||
):
|
||||
"""
|
||||
流式运行智能体,过滤工具调用token和思考内容,将模型生成的内容通过回调输出。
|
||||
@@ -222,18 +222,18 @@ class MoviePilotAgent:
|
||||
buffer = ""
|
||||
|
||||
async for chunk in agent.astream(
|
||||
messages,
|
||||
stream_mode="messages",
|
||||
config=config,
|
||||
subgraphs=False,
|
||||
version="v2",
|
||||
messages,
|
||||
stream_mode="messages",
|
||||
config=config,
|
||||
subgraphs=False,
|
||||
version="v2",
|
||||
):
|
||||
if chunk["type"] == "messages":
|
||||
token, metadata = chunk["data"]
|
||||
if (
|
||||
token
|
||||
and hasattr(token, "tool_call_chunks")
|
||||
and not token.tool_call_chunks
|
||||
token
|
||||
and hasattr(token, "tool_call_chunks")
|
||||
and not token.tool_call_chunks
|
||||
):
|
||||
# 跳过模型思考/推理内容(如 DeepSeek R1 的 reasoning_content)
|
||||
additional = getattr(token, "additional_kwargs", None)
|
||||
@@ -251,7 +251,7 @@ class MoviePilotAgent:
|
||||
if start_idx > 0:
|
||||
on_token(buffer[:start_idx])
|
||||
in_think_tag = True
|
||||
buffer = buffer[start_idx + 7:]
|
||||
buffer = buffer[start_idx + 7 :]
|
||||
else:
|
||||
# 检查是否以 <think> 的前缀结尾
|
||||
partial_match = False
|
||||
@@ -269,7 +269,7 @@ class MoviePilotAgent:
|
||||
end_idx = buffer.find("</think>")
|
||||
if end_idx != -1:
|
||||
in_think_tag = False
|
||||
buffer = buffer[end_idx + 8:]
|
||||
buffer = buffer[end_idx + 8 :]
|
||||
else:
|
||||
# 检查是否以 </think> 的前缀结尾
|
||||
partial_match = False
|
||||
@@ -483,14 +483,14 @@ class AgentManager:
|
||||
self.active_agents.clear()
|
||||
|
||||
async def process_message(
|
||||
self,
|
||||
session_id: str,
|
||||
user_id: str,
|
||||
message: str,
|
||||
images: List[str] = None,
|
||||
channel: str = None,
|
||||
source: str = None,
|
||||
username: str = None,
|
||||
self,
|
||||
session_id: str,
|
||||
user_id: str,
|
||||
message: str,
|
||||
images: List[str] = None,
|
||||
channel: str = None,
|
||||
source: str = None,
|
||||
username: str = None,
|
||||
) -> str:
|
||||
"""
|
||||
处理用户消息:将消息放入会话队列,按顺序依次处理。
|
||||
@@ -515,8 +515,8 @@ class AgentManager:
|
||||
|
||||
# 如果队列中已有等待的消息,通知用户消息已排队
|
||||
if queue_size > 0 or (
|
||||
session_id in self._session_workers
|
||||
and not self._session_workers[session_id].done()
|
||||
session_id in self._session_workers
|
||||
and not self._session_workers[session_id].done()
|
||||
):
|
||||
logger.info(
|
||||
f"会话 {session_id} 有任务正在处理,消息已排队等待 "
|
||||
@@ -528,8 +528,8 @@ class AgentManager:
|
||||
|
||||
# 确保该会话有一个worker在运行
|
||||
if (
|
||||
session_id not in self._session_workers
|
||||
or self._session_workers[session_id].done()
|
||||
session_id not in self._session_workers
|
||||
or self._session_workers[session_id].done()
|
||||
):
|
||||
self._session_workers[session_id] = asyncio.create_task(
|
||||
self._session_worker(session_id)
|
||||
@@ -570,8 +570,8 @@ class AgentManager:
|
||||
self._session_workers.pop(session_id, None) # noqa
|
||||
# 如果队列为空,清理队列
|
||||
if (
|
||||
session_id in self._session_queues
|
||||
and self._session_queues[session_id].empty()
|
||||
session_id in self._session_queues
|
||||
and self._session_queues[session_id].empty()
|
||||
):
|
||||
self._session_queues.pop(session_id, None)
|
||||
|
||||
@@ -684,6 +684,69 @@ class AgentManager:
|
||||
except Exception as e:
|
||||
logger.error(f"智能体心跳唤醒失败: {e}")
|
||||
|
||||
async def retry_failed_transfer(self, history_id: int):
|
||||
"""
|
||||
触发智能体重新整理失败的历史记录。
|
||||
由文件整理模块在检测到整理失败后调用,使用独立会话执行。
|
||||
:param history_id: 失败的整理历史记录ID
|
||||
"""
|
||||
try:
|
||||
# 每次使用唯一的 session_id,避免共享上下文
|
||||
session_id = f"__agent_retry_transfer_{history_id}_{uuid.uuid4().hex[:8]}__"
|
||||
user_id = "system"
|
||||
|
||||
logger.info(f"智能体重试整理:开始处理失败记录 ID={history_id} ...")
|
||||
|
||||
# 英文提示词,便于大模型理解
|
||||
retry_message = (
|
||||
f"[System Task - Transfer Failed Retry] A file transfer/organization has failed. "
|
||||
f"Please use the 'transfer-failed-retry' skill to retry the failed transfer.\n\n"
|
||||
f"Failed transfer history record ID: {history_id}\n\n"
|
||||
f"Follow these steps:\n"
|
||||
f"1. Use `query_transfer_history` with status='failed' to find the record with id={history_id} "
|
||||
f"and understand the failure details (source path, error message, media info)\n"
|
||||
f"2. Analyze the error message to determine the best retry strategy\n"
|
||||
f"3. If the source file no longer exists, skip this retry and report that the file is missing\n"
|
||||
f"4. Delete the failed history record using `delete_transfer_history` with history_id={history_id}\n"
|
||||
f"5. Re-identify the media using `recognize_media` with the source file path\n"
|
||||
f"6. If recognition fails, try `search_media` with keywords from the filename\n"
|
||||
f"7. Re-transfer using `transfer_file` with the source path and any identified media info (tmdbid, media_type)\n"
|
||||
f"8. Report the final result\n\n"
|
||||
f"IMPORTANT: This is a background system task, NOT a user conversation. "
|
||||
f"Your final response will be broadcast as a notification. "
|
||||
f"Only output a brief result summary. "
|
||||
f"Do NOT include greetings, explanations, or conversational text. "
|
||||
f"Respond in Chinese (中文)."
|
||||
)
|
||||
|
||||
await self.process_message(
|
||||
session_id=session_id,
|
||||
user_id=user_id,
|
||||
message=retry_message,
|
||||
channel=None,
|
||||
source=None,
|
||||
username=settings.SUPERUSER,
|
||||
)
|
||||
|
||||
# 等待消息队列处理完成
|
||||
if session_id in self._session_queues:
|
||||
await self._session_queues[session_id].join()
|
||||
|
||||
# 等待worker结束
|
||||
if session_id in self._session_workers:
|
||||
try:
|
||||
await self._session_workers[session_id]
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
logger.info(f"智能体重试整理:记录 ID={history_id} 处理完成")
|
||||
|
||||
# 用完即弃,清理资源
|
||||
await self.clear_session(session_id, user_id)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"智能体重试整理失败 (ID={history_id}): {e}")
|
||||
|
||||
|
||||
# 全局智能体管理器实例
|
||||
agent_manager = AgentManager()
|
||||
|
||||
@@ -37,6 +37,7 @@ from app.agent.tools.impl.run_workflow import RunWorkflowTool
|
||||
from app.agent.tools.impl.update_site_cookie import UpdateSiteCookieTool
|
||||
from app.agent.tools.impl.delete_download import DeleteDownloadTool
|
||||
from app.agent.tools.impl.delete_download_history import DeleteDownloadHistoryTool
|
||||
from app.agent.tools.impl.delete_transfer_history import DeleteTransferHistoryTool
|
||||
from app.agent.tools.impl.modify_download import ModifyDownloadTool
|
||||
from app.agent.tools.impl.query_directory_settings import QueryDirectorySettingsTool
|
||||
from app.agent.tools.impl.list_directory import ListDirectoryTool
|
||||
@@ -97,6 +98,7 @@ class MoviePilotToolFactory:
|
||||
QueryDownloadTasksTool,
|
||||
DeleteDownloadTool,
|
||||
DeleteDownloadHistoryTool,
|
||||
DeleteTransferHistoryTool,
|
||||
ModifyDownloadTool,
|
||||
QueryDownloadersTool,
|
||||
QuerySitesTool,
|
||||
|
||||
57
app/agent/tools/impl/delete_transfer_history.py
Normal file
57
app/agent/tools/impl/delete_transfer_history.py
Normal file
@@ -0,0 +1,57 @@
|
||||
"""删除整理历史记录工具"""
|
||||
|
||||
from typing import Optional, Type
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from app.agent.tools.base import MoviePilotTool
|
||||
from app.db.transferhistory_oper import TransferHistoryOper
|
||||
from app.log import logger
|
||||
|
||||
|
||||
class DeleteTransferHistoryInput(BaseModel):
|
||||
"""删除整理历史记录工具的输入参数模型"""
|
||||
|
||||
explanation: str = Field(
|
||||
...,
|
||||
description="Clear explanation of why this tool is being used in the current context",
|
||||
)
|
||||
history_id: int = Field(
|
||||
..., description="The ID of the transfer history record to delete"
|
||||
)
|
||||
|
||||
|
||||
class DeleteTransferHistoryTool(MoviePilotTool):
|
||||
name: str = "delete_transfer_history"
|
||||
description: str = "Delete a specific transfer history record by its ID. This is useful when you need to remove a failed transfer record before retrying the transfer, as the system skips files that already have transfer history."
|
||||
args_schema: Type[BaseModel] = DeleteTransferHistoryInput
|
||||
require_admin: bool = True
|
||||
|
||||
def get_tool_message(self, **kwargs) -> Optional[str]:
|
||||
"""根据参数生成友好的提示消息"""
|
||||
history_id = kwargs.get("history_id")
|
||||
return f"正在删除整理历史记录: ID={history_id}"
|
||||
|
||||
async def run(self, history_id: int, **kwargs) -> str:
|
||||
logger.info(f"执行工具: {self.name}, 参数: history_id={history_id}")
|
||||
|
||||
try:
|
||||
transferhis = TransferHistoryOper()
|
||||
|
||||
# 查询历史记录是否存在
|
||||
history = transferhis.get(history_id)
|
||||
if not history:
|
||||
return f"错误:整理历史记录不存在,ID={history_id}"
|
||||
|
||||
# 保存信息用于返回
|
||||
title = history.title or "未知"
|
||||
src = history.src or "未知"
|
||||
status = "成功" if history.status else "失败"
|
||||
|
||||
# 删除记录
|
||||
transferhis.delete(history_id)
|
||||
|
||||
return f"已删除整理历史记录:ID={history_id},标题={title},源路径={src},状态={status}"
|
||||
except Exception as e:
|
||||
logger.error(f"删除整理历史记录失败: {e}", exc_info=True)
|
||||
return f"删除整理历史记录时发生错误: {str(e)}"
|
||||
File diff suppressed because it is too large
Load Diff
@@ -535,6 +535,8 @@ class ConfigModel(BaseModel):
|
||||
AI_AGENT_JOB_INTERVAL: int = 0
|
||||
# AI智能体啰嗦模式,开启后会回复工具调用过程
|
||||
AI_AGENT_VERBOSE: bool = False
|
||||
# AI智能体自动重试整理失败记录开关
|
||||
AI_AGENT_RETRY_TRANSFER: bool = False
|
||||
|
||||
|
||||
class Settings(BaseSettings, ConfigModel, LogConfigModel):
|
||||
|
||||
137
skills/transfer-failed-retry/SKILL.md
Normal file
137
skills/transfer-failed-retry/SKILL.md
Normal file
@@ -0,0 +1,137 @@
|
||||
---
|
||||
name: transfer-failed-retry
|
||||
description: Use this skill when you need to retry a failed file transfer/organization. Given a failed transfer history record ID, this skill guides you through querying the failure details, deleting the old record, and re-identifying and re-organizing the file. This skill is automatically triggered when the system detects a transfer failure and the AI agent retry feature is enabled.
|
||||
allowed-tools: query_transfer_history delete_transfer_history recognize_media transfer_file search_media
|
||||
---
|
||||
|
||||
# Transfer Failed Retry (整理失败重试)
|
||||
|
||||
This skill handles retrying failed file transfers/organizations. When a file transfer fails, you can use this skill to analyze the failure, remove the stale history record, and attempt to re-identify and re-organize the file.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
You need the following tools:
|
||||
- `query_transfer_history` - Query transfer history records
|
||||
- `delete_transfer_history` - Delete a transfer history record
|
||||
- `recognize_media` - Recognize media info from file path or title
|
||||
- `transfer_file` - Transfer/organize files to the media library
|
||||
- `search_media` - Search TMDB for media information
|
||||
|
||||
## Workflow
|
||||
|
||||
### Step 1: Query the Failed Transfer History
|
||||
|
||||
Use `query_transfer_history` to get details about the failed record. Filter by status `failed` to find the specific record.
|
||||
|
||||
If you are given a specific history record ID, query with that ID to understand the failure context:
|
||||
|
||||
```
|
||||
query_transfer_history(status="failed")
|
||||
```
|
||||
|
||||
From the record, extract the following key information:
|
||||
- **id**: The history record ID
|
||||
- **src**: Source file path
|
||||
- **title**: The recognized title (may be incorrect)
|
||||
- **errmsg**: The error message explaining why the transfer failed
|
||||
- **type**: Media type (movie/tv)
|
||||
- **tmdbid**: TMDB ID (if available)
|
||||
- **seasons/episodes**: Season/episode info (if TV show)
|
||||
- **downloader**: Which downloader was used
|
||||
- **download_hash**: The torrent hash
|
||||
|
||||
### Step 2: Analyze the Failure Reason
|
||||
|
||||
Common failure reasons and how to handle them:
|
||||
|
||||
| Error Message | Cause | Solution |
|
||||
|---------------|-------|----------|
|
||||
| 未识别到媒体信息 | File name couldn't be matched to any media | Use `search_media` to find the correct TMDB ID, then use `transfer_file` with explicit `tmdbid` |
|
||||
| 源目录不存在 | Source file was moved or deleted | Cannot retry - skip this record |
|
||||
| 目标路径不存在 | Target directory issue | Retry transfer - the directory config may have been fixed |
|
||||
| 文件已存在 | Target file already exists | May need to use `force` mode or skip |
|
||||
| 未找到有效的集数信息 | Episode number not recognized | Use `recognize_media` with the file path to get better metadata, or specify season/episode in `transfer_file` |
|
||||
| 未获取到转移目录设置 | No transfer directory configured for this media type | Cannot auto-fix - notify user about directory configuration |
|
||||
|
||||
### Step 3: Delete the Failed History Record
|
||||
|
||||
Before retrying, you **must** delete the old failed history record. The system skips files that already have a transfer history entry (even failed ones).
|
||||
|
||||
```
|
||||
delete_transfer_history(history_id=<record_id>)
|
||||
```
|
||||
|
||||
### Step 4: Re-identify and Re-organize
|
||||
|
||||
Based on the failure analysis in Step 2:
|
||||
|
||||
#### Case A: Unrecognized Media (未识别到媒体信息)
|
||||
|
||||
1. Try recognizing the media from file path:
|
||||
```
|
||||
recognize_media(path="<source_file_path>")
|
||||
```
|
||||
|
||||
2. If recognition fails, try searching TMDB with keywords extracted from the filename:
|
||||
```
|
||||
search_media(title="<extracted_title>", media_type="movie" or "tv")
|
||||
```
|
||||
|
||||
3. Once you have the correct TMDB ID, re-transfer with explicit identification:
|
||||
```
|
||||
transfer_file(file_path="<source_path>", tmdbid=<tmdb_id>, media_type="movie" or "tv")
|
||||
```
|
||||
|
||||
#### Case B: Transfer Error (file operation failed)
|
||||
|
||||
Simply retry the transfer:
|
||||
```
|
||||
transfer_file(file_path="<source_path>")
|
||||
```
|
||||
|
||||
#### Case C: Episode Recognition Issue
|
||||
|
||||
For TV shows where episode info couldn't be determined:
|
||||
1. Use `recognize_media` to get better metadata
|
||||
2. Re-transfer with explicit season info:
|
||||
```
|
||||
transfer_file(file_path="<source_path>", tmdbid=<tmdb_id>, media_type="tv", season=<season_number>)
|
||||
```
|
||||
|
||||
### Step 5: Report Result
|
||||
|
||||
After the retry attempt, report the result:
|
||||
- If successful: Confirm the file has been organized correctly
|
||||
- If failed again: Report the new error and suggest manual intervention
|
||||
|
||||
## Important Notes
|
||||
|
||||
- **Always delete the old history record first** before retrying. The system will skip files with existing history.
|
||||
- **Do not retry** if the source file no longer exists (源目录不存在).
|
||||
- **Do not retry** if the error is about missing directory configuration - this requires user intervention.
|
||||
- **For unrecognized media**, always try `recognize_media` with the file path first before falling back to `search_media`.
|
||||
- **Be cautious with TV shows** - ensure the correct season and episode information is used.
|
||||
- When this skill is triggered automatically by the system, it provides the `history_id` directly. Start from Step 1 with that specific ID.
|
||||
|
||||
## Example: Complete Retry Flow
|
||||
|
||||
```
|
||||
# 1. Query the failed record
|
||||
query_transfer_history(status="failed", page=1)
|
||||
# Found: id=42, src="/downloads/Movie.Name.2024.1080p.mkv", errmsg="未识别到媒体信息"
|
||||
|
||||
# 2. Try to recognize the media from path
|
||||
recognize_media(path="/downloads/Movie.Name.2024.1080p.mkv")
|
||||
# Recognition failed
|
||||
|
||||
# 3. Search TMDB
|
||||
search_media(title="Movie Name", year="2024", media_type="movie")
|
||||
# Found: tmdb_id=123456
|
||||
|
||||
# 4. Delete old history record
|
||||
delete_transfer_history(history_id=42)
|
||||
|
||||
# 5. Re-transfer with correct identification
|
||||
transfer_file(file_path="/downloads/Movie.Name.2024.1080p.mkv", tmdbid=123456, media_type="movie")
|
||||
# Success!
|
||||
```
|
||||
Reference in New Issue
Block a user