diff --git a/app/agent/__init__.py b/app/agent/__init__.py index 70f7e4c0..ff025d7e 100644 --- a/app/agent/__init__.py +++ b/app/agent/__init__.py @@ -42,12 +42,12 @@ class MoviePilotAgent: """ def __init__( - self, - session_id: str, - user_id: str = None, - channel: str = None, - source: str = None, - username: str = None, + self, + session_id: str, + user_id: str = None, + channel: str = None, + source: str = None, + username: str = None, ): self.session_id = session_id self.user_id = user_id @@ -92,10 +92,10 @@ class MoviePilotAgent: if block.get("thought"): continue if block.get("type") in ( - "thinking", - "reasoning_content", - "reasoning", - "thought", + "thinking", + "reasoning_content", + "reasoning", + "thought", ): continue if block.get("type") == "text": @@ -209,7 +209,7 @@ class MoviePilotAgent: return error_message async def _stream_agent_tokens( - self, agent, messages: dict, config: dict, on_token: Callable[[str], None] + self, agent, messages: dict, config: dict, on_token: Callable[[str], None] ): """ 流式运行智能体,过滤工具调用token和思考内容,将模型生成的内容通过回调输出。 @@ -222,18 +222,18 @@ class MoviePilotAgent: buffer = "" async for chunk in agent.astream( - messages, - stream_mode="messages", - config=config, - subgraphs=False, - version="v2", + messages, + stream_mode="messages", + config=config, + subgraphs=False, + version="v2", ): if chunk["type"] == "messages": token, metadata = chunk["data"] if ( - token - and hasattr(token, "tool_call_chunks") - and not token.tool_call_chunks + token + and hasattr(token, "tool_call_chunks") + and not token.tool_call_chunks ): # 跳过模型思考/推理内容(如 DeepSeek R1 的 reasoning_content) additional = getattr(token, "additional_kwargs", None) @@ -251,7 +251,7 @@ class MoviePilotAgent: if start_idx > 0: on_token(buffer[:start_idx]) in_think_tag = True - buffer = buffer[start_idx + 7:] + buffer = buffer[start_idx + 7 :] else: # 检查是否以 的前缀结尾 partial_match = False @@ -269,7 +269,7 @@ class MoviePilotAgent: end_idx = buffer.find("") if end_idx != -1: in_think_tag = False - buffer = buffer[end_idx + 8:] + buffer = buffer[end_idx + 8 :] else: # 检查是否以 的前缀结尾 partial_match = False @@ -483,14 +483,14 @@ class AgentManager: self.active_agents.clear() async def process_message( - self, - session_id: str, - user_id: str, - message: str, - images: List[str] = None, - channel: str = None, - source: str = None, - username: str = None, + self, + session_id: str, + user_id: str, + message: str, + images: List[str] = None, + channel: str = None, + source: str = None, + username: str = None, ) -> str: """ 处理用户消息:将消息放入会话队列,按顺序依次处理。 @@ -515,8 +515,8 @@ class AgentManager: # 如果队列中已有等待的消息,通知用户消息已排队 if queue_size > 0 or ( - session_id in self._session_workers - and not self._session_workers[session_id].done() + session_id in self._session_workers + and not self._session_workers[session_id].done() ): logger.info( f"会话 {session_id} 有任务正在处理,消息已排队等待 " @@ -528,8 +528,8 @@ class AgentManager: # 确保该会话有一个worker在运行 if ( - session_id not in self._session_workers - or self._session_workers[session_id].done() + session_id not in self._session_workers + or self._session_workers[session_id].done() ): self._session_workers[session_id] = asyncio.create_task( self._session_worker(session_id) @@ -570,8 +570,8 @@ class AgentManager: self._session_workers.pop(session_id, None) # noqa # 如果队列为空,清理队列 if ( - session_id in self._session_queues - and self._session_queues[session_id].empty() + session_id in self._session_queues + and self._session_queues[session_id].empty() ): self._session_queues.pop(session_id, None) @@ -684,6 +684,69 @@ class AgentManager: except Exception as e: logger.error(f"智能体心跳唤醒失败: {e}") + async def retry_failed_transfer(self, history_id: int): + """ + 触发智能体重新整理失败的历史记录。 + 由文件整理模块在检测到整理失败后调用,使用独立会话执行。 + :param history_id: 失败的整理历史记录ID + """ + try: + # 每次使用唯一的 session_id,避免共享上下文 + session_id = f"__agent_retry_transfer_{history_id}_{uuid.uuid4().hex[:8]}__" + user_id = "system" + + logger.info(f"智能体重试整理:开始处理失败记录 ID={history_id} ...") + + # 英文提示词,便于大模型理解 + retry_message = ( + f"[System Task - Transfer Failed Retry] A file transfer/organization has failed. " + f"Please use the 'transfer-failed-retry' skill to retry the failed transfer.\n\n" + f"Failed transfer history record ID: {history_id}\n\n" + f"Follow these steps:\n" + f"1. Use `query_transfer_history` with status='failed' to find the record with id={history_id} " + f"and understand the failure details (source path, error message, media info)\n" + f"2. Analyze the error message to determine the best retry strategy\n" + f"3. If the source file no longer exists, skip this retry and report that the file is missing\n" + f"4. Delete the failed history record using `delete_transfer_history` with history_id={history_id}\n" + f"5. Re-identify the media using `recognize_media` with the source file path\n" + f"6. If recognition fails, try `search_media` with keywords from the filename\n" + f"7. Re-transfer using `transfer_file` with the source path and any identified media info (tmdbid, media_type)\n" + f"8. Report the final result\n\n" + f"IMPORTANT: This is a background system task, NOT a user conversation. " + f"Your final response will be broadcast as a notification. " + f"Only output a brief result summary. " + f"Do NOT include greetings, explanations, or conversational text. " + f"Respond in Chinese (中文)." + ) + + await self.process_message( + session_id=session_id, + user_id=user_id, + message=retry_message, + channel=None, + source=None, + username=settings.SUPERUSER, + ) + + # 等待消息队列处理完成 + if session_id in self._session_queues: + await self._session_queues[session_id].join() + + # 等待worker结束 + if session_id in self._session_workers: + try: + await self._session_workers[session_id] + except asyncio.CancelledError: + pass + + logger.info(f"智能体重试整理:记录 ID={history_id} 处理完成") + + # 用完即弃,清理资源 + await self.clear_session(session_id, user_id) + + except Exception as e: + logger.error(f"智能体重试整理失败 (ID={history_id}): {e}") + # 全局智能体管理器实例 agent_manager = AgentManager() diff --git a/app/agent/tools/factory.py b/app/agent/tools/factory.py index bc034f6a..f6fb1eae 100644 --- a/app/agent/tools/factory.py +++ b/app/agent/tools/factory.py @@ -37,6 +37,7 @@ from app.agent.tools.impl.run_workflow import RunWorkflowTool from app.agent.tools.impl.update_site_cookie import UpdateSiteCookieTool from app.agent.tools.impl.delete_download import DeleteDownloadTool from app.agent.tools.impl.delete_download_history import DeleteDownloadHistoryTool +from app.agent.tools.impl.delete_transfer_history import DeleteTransferHistoryTool from app.agent.tools.impl.modify_download import ModifyDownloadTool from app.agent.tools.impl.query_directory_settings import QueryDirectorySettingsTool from app.agent.tools.impl.list_directory import ListDirectoryTool @@ -97,6 +98,7 @@ class MoviePilotToolFactory: QueryDownloadTasksTool, DeleteDownloadTool, DeleteDownloadHistoryTool, + DeleteTransferHistoryTool, ModifyDownloadTool, QueryDownloadersTool, QuerySitesTool, diff --git a/app/agent/tools/impl/delete_transfer_history.py b/app/agent/tools/impl/delete_transfer_history.py new file mode 100644 index 00000000..c42b72dd --- /dev/null +++ b/app/agent/tools/impl/delete_transfer_history.py @@ -0,0 +1,57 @@ +"""删除整理历史记录工具""" + +from typing import Optional, Type + +from pydantic import BaseModel, Field + +from app.agent.tools.base import MoviePilotTool +from app.db.transferhistory_oper import TransferHistoryOper +from app.log import logger + + +class DeleteTransferHistoryInput(BaseModel): + """删除整理历史记录工具的输入参数模型""" + + explanation: str = Field( + ..., + description="Clear explanation of why this tool is being used in the current context", + ) + history_id: int = Field( + ..., description="The ID of the transfer history record to delete" + ) + + +class DeleteTransferHistoryTool(MoviePilotTool): + name: str = "delete_transfer_history" + description: str = "Delete a specific transfer history record by its ID. This is useful when you need to remove a failed transfer record before retrying the transfer, as the system skips files that already have transfer history." + args_schema: Type[BaseModel] = DeleteTransferHistoryInput + require_admin: bool = True + + def get_tool_message(self, **kwargs) -> Optional[str]: + """根据参数生成友好的提示消息""" + history_id = kwargs.get("history_id") + return f"正在删除整理历史记录: ID={history_id}" + + async def run(self, history_id: int, **kwargs) -> str: + logger.info(f"执行工具: {self.name}, 参数: history_id={history_id}") + + try: + transferhis = TransferHistoryOper() + + # 查询历史记录是否存在 + history = transferhis.get(history_id) + if not history: + return f"错误:整理历史记录不存在,ID={history_id}" + + # 保存信息用于返回 + title = history.title or "未知" + src = history.src or "未知" + status = "成功" if history.status else "失败" + + # 删除记录 + transferhis.delete(history_id) + + return f"已删除整理历史记录:ID={history_id},标题={title},源路径={src},状态={status}" + except Exception as e: + logger.error(f"删除整理历史记录失败: {e}", exc_info=True) + return f"删除整理历史记录时发生错误: {str(e)}" diff --git a/app/chain/transfer.py b/app/chain/transfer.py index 781f9ab0..48fce582 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -1,3 +1,4 @@ +import asyncio import queue import re import threading @@ -27,11 +28,29 @@ from app.helper.format import FormatParser from app.helper.progress import ProgressHelper from app.log import logger from app.schemas import StorageOperSelectionEventData -from app.schemas import TransferInfo, Notification, EpisodeFormat, FileItem, TransferDirectoryConf, \ - TransferTask, TransferQueue, TransferJob, TransferJobTask +from app.schemas import ( + TransferInfo, + Notification, + EpisodeFormat, + FileItem, + TransferDirectoryConf, + TransferTask, + TransferQueue, + TransferJob, + TransferJobTask, +) from app.schemas.exception import OperationInterrupted -from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, NotificationType, MessageChannel, \ - SystemConfigKey, ChainEventType, ContentType +from app.schemas.types import ( + TorrentStatus, + EventType, + MediaType, + ProgressKey, + NotificationType, + MessageChannel, + SystemConfigKey, + ChainEventType, + ContentType, +) from app.utils.mixins import ConfigReloadMixin from app.utils.singleton import Singleton from app.utils.string import StringUtils @@ -81,7 +100,9 @@ class JobManager: 获取作业ID """ if task.mediainfo: - return self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) + return self.__get_media_id( + media=task.mediainfo, season=task.meta.begin_season + ) else: return self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) @@ -102,7 +123,7 @@ class JobManager: title=meta.name, year=meta.year, title_year=f"{meta.name} ({meta.year})", - type=meta.type.value if meta.type else None + type=meta.type.value if meta.type else None, ) @staticmethod @@ -125,17 +146,24 @@ class JobManager: self._job_view[__mediaid__] = TransferJob( media=self.__get_media(task), season=task.meta.begin_season, - tasks=[TransferJobTask( - fileitem=task.fileitem, - meta=self.__get_meta(task), - downloader=task.downloader, - download_hash=task.download_hash, - state=state - )] + tasks=[ + TransferJobTask( + fileitem=task.fileitem, + meta=self.__get_meta(task), + downloader=task.downloader, + download_hash=task.download_hash, + state=state, + ) + ], ) else: # 不重复添加任务 - if any([t.fileitem == task.fileitem for t in self._job_view[__mediaid__].tasks]): + if any( + [ + t.fileitem == task.fileitem + for t in self._job_view[__mediaid__].tasks + ] + ): logger.debug(f"任务 {task.fileitem.name} 已存在,跳过重复添加") return False self._job_view[__mediaid__].tasks.append( @@ -144,13 +172,15 @@ class JobManager: meta=self.__get_meta(task), downloader=task.downloader, download_hash=task.download_hash, - state=state + state=state, ) ) # 添加季集信息 if self._season_episodes.get(__mediaid__): self._season_episodes[__mediaid__].extend(task.meta.episode_list) - self._season_episodes[__mediaid__] = list(set(self._season_episodes[__mediaid__])) + self._season_episodes[__mediaid__] = list( + set(self._season_episodes[__mediaid__]) + ) else: self._season_episodes[__mediaid__] = task.meta.episode_list return True @@ -199,7 +229,8 @@ class JobManager: # 移除剧集信息 if __mediaid__ in self._season_episodes: self._season_episodes[__mediaid__] = list( - set(self._season_episodes[__mediaid__]) - set(task.meta.episode_list) + set(self._season_episodes[__mediaid__]) + - set(task.meta.episode_list) ) def remove_task(self, fileitem: FileItem) -> Optional[TransferJobTask]: @@ -218,7 +249,8 @@ class JobManager: # 移除季集信息 if mediaid in self._season_episodes: self._season_episodes[mediaid] = list( - set(self._season_episodes[mediaid]) - set(task.meta.episode_list) + set(self._season_episodes[mediaid]) + - set(task.meta.episode_list) ) return task return None @@ -241,19 +273,25 @@ class JobManager: 尝试移除任务对应的作业(严格检查未完成作业,线程安全) """ with job_lock: - __metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) - __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) + __metaid__ = self.__get_meta_id( + meta=task.meta, season=task.meta.begin_season + ) + __mediaid__ = self.__get_media_id( + media=task.mediainfo, season=task.meta.begin_season + ) meta_done = True if __metaid__ in self._job_view: meta_done = all( - t.state in ["completed", "failed"] for t in self._job_view[__metaid__].tasks + t.state in ["completed", "failed"] + for t in self._job_view[__metaid__].tasks ) media_done = True if __mediaid__ in self._job_view: media_done = all( - t.state in ["completed", "failed"] for t in self._job_view[__mediaid__].tasks + t.state in ["completed", "failed"] + for t in self._job_view[__mediaid__].tasks ) if meta_done and media_done: @@ -269,17 +307,23 @@ class JobManager: 检查任务对应的作业是否整理完成(不管成功还是失败) """ with job_lock: - __metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) - __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) + __metaid__ = self.__get_meta_id( + meta=task.meta, season=task.meta.begin_season + ) + __mediaid__ = self.__get_media_id( + media=task.mediainfo, season=task.meta.begin_season + ) if __metaid__ in self._job_view: meta_done = all( - task.state in ["completed", "failed"] for task in self._job_view[__metaid__].tasks + task.state in ["completed", "failed"] + for task in self._job_view[__metaid__].tasks ) else: meta_done = True if __mediaid__ in self._job_view: media_done = all( - task.state in ["completed", "failed"] for task in self._job_view[__mediaid__].tasks + task.state in ["completed", "failed"] + for task in self._job_view[__mediaid__].tasks ) else: media_done = True @@ -290,11 +334,16 @@ class JobManager: 检查任务对应的作业是否已完成且有成功的记录 """ with job_lock: - __metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) - __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) + __metaid__ = self.__get_meta_id( + meta=task.meta, season=task.meta.begin_season + ) + __mediaid__ = self.__get_media_id( + media=task.mediainfo, season=task.meta.begin_season + ) if __metaid__ in self._job_view: meta_finished = all( - task.state in ["completed", "failed"] for task in self._job_view[__metaid__].tasks + task.state in ["completed", "failed"] + for task in self._job_view[__metaid__].tasks ) else: meta_finished = True @@ -302,9 +351,7 @@ class JobManager: tasks = self._job_view[__mediaid__].tasks media_finished = all( task.state in ["completed", "failed"] for task in tasks - ) and any( - task.state == "completed" for task in tasks - ) + ) and any(task.state == "completed" for task in tasks) else: media_finished = True return meta_finished and media_finished @@ -314,17 +361,23 @@ class JobManager: 检查任务对应的作业是否全部成功 """ with job_lock: - __metaid__ = self.__get_meta_id(meta=task.meta, season=task.meta.begin_season) - __mediaid__ = self.__get_media_id(media=task.mediainfo, season=task.meta.begin_season) + __metaid__ = self.__get_meta_id( + meta=task.meta, season=task.meta.begin_season + ) + __mediaid__ = self.__get_media_id( + media=task.mediainfo, season=task.meta.begin_season + ) if __metaid__ in self._job_view: meta_success = all( - task.state in ["completed"] for task in self._job_view[__metaid__].tasks + task.state in ["completed"] + for task in self._job_view[__metaid__].tasks ) else: meta_success = True if __mediaid__ in self._job_view: media_success = all( - task.state in ["completed"] for task in self._job_view[__mediaid__].tasks + task.state in ["completed"] + for task in self._job_view[__mediaid__].tasks ) else: media_success = True @@ -369,7 +422,12 @@ class JobManager: return False return True - def has_tasks(self, meta: MetaBase, mediainfo: Optional[MediaInfo] = None, season: Optional[int] = None) -> bool: + def has_tasks( + self, + meta: MetaBase, + mediainfo: Optional[MediaInfo] = None, + season: Optional[int] = None, + ) -> bool: """ 判断作业是否还有任务正在处理 """ @@ -380,9 +438,14 @@ class JobManager: return True __metaid__ = self.__get_meta_id(meta=meta, season=season) - return __metaid__ in self._job_view and len(self._job_view[__metaid__].tasks) > 0 + return ( + __metaid__ in self._job_view + and len(self._job_view[__metaid__].tasks) > 0 + ) - def success_tasks(self, media: MediaInfo, season: Optional[int] = None) -> List[TransferJobTask]: + def success_tasks( + self, media: MediaInfo, season: Optional[int] = None + ) -> List[TransferJobTask]: """ 获取作业中所有成功的任务 """ @@ -390,9 +453,15 @@ class JobManager: __mediaid__ = self.__get_media_id(media=media, season=season) if __mediaid__ not in self._job_view: return [] - return [task for task in self._job_view[__mediaid__].tasks if task.state == "completed"] + return [ + task + for task in self._job_view[__mediaid__].tasks + if task.state == "completed" + ] - def all_tasks(self, media: MediaInfo, season: Optional[int] = None) -> List[TransferJobTask]: + def all_tasks( + self, media: MediaInfo, season: Optional[int] = None + ) -> List[TransferJobTask]: """ 获取作业中全部任务 """ @@ -410,7 +479,13 @@ class JobManager: __mediaid__ = self.__get_media_id(media=media, season=season) if __mediaid__ not in self._job_view: return 0 - return len([task for task in self._job_view[__mediaid__].tasks if task.state == "completed"]) + return len( + [ + task + for task in self._job_view[__mediaid__].tasks + if task.state == "completed" + ] + ) def size(self, media: MediaInfo, season: Optional[int] = None) -> int: """ @@ -420,13 +495,19 @@ class JobManager: __mediaid__ = self.__get_media_id(media=media, season=season) if __mediaid__ not in self._job_view: return 0 - return sum([ - task.fileitem.size if task.fileitem.size is not None - else ( - SystemUtils.get_directory_size(Path(task.fileitem.path)) if task.fileitem.storage == "local" else 0) - for task in self._job_view[__mediaid__].tasks - if task.state == "completed" - ]) + return sum( + [ + task.fileitem.size + if task.fileitem.size is not None + else ( + SystemUtils.get_directory_size(Path(task.fileitem.path)) + if task.fileitem.storage == "local" + else 0 + ) + for task in self._job_view[__mediaid__].tasks + if task.state == "completed" + ] + ) def total(self) -> int: """ @@ -442,7 +523,9 @@ class JobManager: with job_lock: return list(self._job_view.values()) - def season_episodes(self, media: MediaInfo, season: Optional[int] = None) -> List[int]: + def season_episodes( + self, media: MediaInfo, season: Optional[int] = None + ) -> List[int]: """ 获取作业的季集清单 """ @@ -499,9 +582,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): self._queue_active = True for i in range(settings.TRANSFER_THREADS): logger.info(f"启动文件整理线程 {i + 1} ...") - thread = threading.Thread(target=self.__start_transfer, - name=f"transfer-{i}", - daemon=True) + thread = threading.Thread( + target=self.__start_transfer, name=f"transfer-{i}", daemon=True + ) self._threads.append(thread) thread.start() @@ -525,7 +608,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): """ if not fileitem.extension: return False - return True if f".{fileitem.extension.lower()}" in self._subtitle_exts else False + return ( + True if f".{fileitem.extension.lower()}" in self._subtitle_exts else False + ) def __is_audio_file(self, fileitem: FileItem) -> bool: """ @@ -559,10 +644,15 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): """ 判断是否满足最小文件大小 """ - return True if not min_filesize or (fileitem.size or 0) > min_filesize * 1024 * 1024 else False + return ( + True + if not min_filesize or (fileitem.size or 0) > min_filesize * 1024 * 1024 + else False + ) - def __default_callback(self, task: TransferTask, - transferinfo: TransferInfo, /) -> Tuple[bool, str]: + def __default_callback( + self, task: TransferTask, transferinfo: TransferInfo, / + ) -> Tuple[bool, str]: """ 整理完成后处理 """ @@ -577,39 +667,52 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): 完成时发送消息、刮削事件、移除任务等 """ # 更新文件数量 - transferinfo.file_count = self.jobview.count(task.mediainfo, task.meta.begin_season) or 1 + transferinfo.file_count = ( + self.jobview.count(task.mediainfo, task.meta.begin_season) or 1 + ) # 更新文件大小 - transferinfo.total_size = self.jobview.size(task.mediainfo, - task.meta.begin_season) or task.fileitem.size + transferinfo.total_size = ( + self.jobview.size(task.mediainfo, task.meta.begin_season) + or task.fileitem.size + ) # 更新文件清单 with job_lock: - transferinfo.file_list_new = self._success_target_files.pop(transferinfo.target_diritem.path, []) + transferinfo.file_list_new = self._success_target_files.pop( + transferinfo.target_diritem.path, [] + ) # 发送通知,实时手动整理时不发 if transferinfo.need_notify and (task.background or not task.manual): se_str = None if task.mediainfo.type == MediaType.TV: - season_episodes = self.jobview.season_episodes(task.mediainfo, task.meta.begin_season) + season_episodes = self.jobview.season_episodes( + task.mediainfo, task.meta.begin_season + ) if season_episodes: se_str = f"{task.meta.season} {StringUtils.format_ep(season_episodes)}" else: se_str = f"{task.meta.season}" # 发送入库成功消息 - self.send_transfer_message(meta=task.meta, - mediainfo=task.mediainfo, - transferinfo=transferinfo, - season_episode=se_str, - username=task.username) + self.send_transfer_message( + meta=task.meta, + mediainfo=task.mediainfo, + transferinfo=transferinfo, + season_episode=se_str, + username=task.username, + ) # 刮削事件 if transferinfo.need_scrape and self.__is_media_file(task.fileitem): - self.eventmanager.send_event(EventType.MetadataScrape, { - 'meta': task.meta, - 'mediainfo': task.mediainfo, - 'fileitem': transferinfo.target_diritem, - 'file_list': transferinfo.file_list_new, - 'overwrite': False - }) + self.eventmanager.send_event( + EventType.MetadataScrape, + { + "meta": task.meta, + "mediainfo": task.mediainfo, + "fileitem": transferinfo.target_diritem, + "file_list": transferinfo.file_list_new, + "overwrite": False, + }, + ) transferhis = TransferHistoryOper() @@ -620,115 +723,154 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 新增转移失败历史记录 history = transferhis.add_fail( fileitem=task.fileitem, - mode=transferinfo.transfer_type if transferinfo else '', + mode=transferinfo.transfer_type if transferinfo else "", downloader=task.downloader, download_hash=task.download_hash, meta=task.meta, mediainfo=task.mediainfo, - transferinfo=transferinfo + transferinfo=transferinfo, ) # 整理失败事件 if self.__is_media_file(task.fileitem): # 主要媒体文件整理失败事件 - self.eventmanager.send_event(EventType.TransferFailed, { - 'fileitem': task.fileitem, - 'meta': task.meta, - 'mediainfo': task.mediainfo, - 'transferinfo': transferinfo, - 'downloader': task.downloader, - 'download_hash': task.download_hash, - 'transfer_history_id': history.id if history else None, - }) + self.eventmanager.send_event( + EventType.TransferFailed, + { + "fileitem": task.fileitem, + "meta": task.meta, + "mediainfo": task.mediainfo, + "transferinfo": transferinfo, + "downloader": task.downloader, + "download_hash": task.download_hash, + "transfer_history_id": history.id if history else None, + }, + ) elif self.__is_subtitle_file(task.fileitem): # 字幕整理失败事件 - self.eventmanager.send_event(EventType.SubtitleTransferFailed, { - 'fileitem': task.fileitem, - 'meta': task.meta, - 'mediainfo': task.mediainfo, - 'transferinfo': transferinfo, - 'downloader': task.downloader, - 'download_hash': task.download_hash, - 'transfer_history_id': history.id if history else None, - }) + self.eventmanager.send_event( + EventType.SubtitleTransferFailed, + { + "fileitem": task.fileitem, + "meta": task.meta, + "mediainfo": task.mediainfo, + "transferinfo": transferinfo, + "downloader": task.downloader, + "download_hash": task.download_hash, + "transfer_history_id": history.id if history else None, + }, + ) elif self.__is_audio_file(task.fileitem): # 音频文件整理失败事件 - self.eventmanager.send_event(EventType.AudioTransferFailed, { - 'fileitem': task.fileitem, - 'meta': task.meta, - 'mediainfo': task.mediainfo, - 'transferinfo': transferinfo, - 'downloader': task.downloader, - 'download_hash': task.download_hash, - 'transfer_history_id': history.id if history else None, - }) + self.eventmanager.send_event( + EventType.AudioTransferFailed, + { + "fileitem": task.fileitem, + "meta": task.meta, + "mediainfo": task.mediainfo, + "transferinfo": transferinfo, + "downloader": task.downloader, + "download_hash": task.download_hash, + "transfer_history_id": history.id if history else None, + }, + ) # 发送失败消息 - self.post_message(Notification( - mtype=NotificationType.Manual, - title=f"{task.mediainfo.title_year} {task.meta.season_episode} 入库失败!", - text=f"原因:{transferinfo.message or '未知'}", - image=task.mediainfo.get_message_image(), - username=task.username, - link=settings.MP_DOMAIN('#/history') - )) + self.post_message( + Notification( + mtype=NotificationType.Manual, + title=f"{task.mediainfo.title_year} {task.meta.season_episode} 入库失败!", + text=f"原因:{transferinfo.message or '未知'}", + image=task.mediainfo.get_message_image(), + username=task.username, + link=settings.MP_DOMAIN("#/history"), + ) + ) # 设置任务失败 self.jobview.fail_task(task) + # AI智能体自动重试整理 + if ( + history + and settings.AI_AGENT_ENABLE + and settings.AI_AGENT_RETRY_TRANSFER + ): + try: + from app.agent import agent_manager + + asyncio.run_coroutine_threadsafe( + agent_manager.retry_failed_transfer(history.id), + global_vars.loop, + ) + logger.info(f"已触发AI智能体重试整理历史记录 #{history.id}") + except Exception as e: + logger.error(f"触发AI智能体重试整理失败: {e}") + # 返回失败 ret_status = False ret_message = transferinfo.message else: # 转移成功 - logger.info(f"{task.fileitem.name} 入库成功:{transferinfo.target_diritem.path}") + logger.info( + f"{task.fileitem.name} 入库成功:{transferinfo.target_diritem.path}" + ) # 新增task转移成功历史记录 history = transferhis.add_success( fileitem=task.fileitem, - mode=transferinfo.transfer_type if transferinfo else '', + mode=transferinfo.transfer_type if transferinfo else "", downloader=task.downloader, download_hash=task.download_hash, meta=task.meta, mediainfo=task.mediainfo, - transferinfo=transferinfo + transferinfo=transferinfo, ) # task整理完成事件 if self.__is_media_file(task.fileitem): # 主要媒体文件整理完成事件 - self.eventmanager.send_event(EventType.TransferComplete, { - 'fileitem': task.fileitem, - 'meta': task.meta, - 'mediainfo': task.mediainfo, - 'transferinfo': transferinfo, - 'downloader': task.downloader, - 'download_hash': task.download_hash, - 'transfer_history_id': history.id if history else None, - }) + self.eventmanager.send_event( + EventType.TransferComplete, + { + "fileitem": task.fileitem, + "meta": task.meta, + "mediainfo": task.mediainfo, + "transferinfo": transferinfo, + "downloader": task.downloader, + "download_hash": task.download_hash, + "transfer_history_id": history.id if history else None, + }, + ) elif self.__is_subtitle_file(task.fileitem): # 字幕整理完成事件 - self.eventmanager.send_event(EventType.SubtitleTransferComplete, { - 'fileitem': task.fileitem, - 'meta': task.meta, - 'mediainfo': task.mediainfo, - 'transferinfo': transferinfo, - 'downloader': task.downloader, - 'download_hash': task.download_hash, - 'transfer_history_id': history.id if history else None, - }) + self.eventmanager.send_event( + EventType.SubtitleTransferComplete, + { + "fileitem": task.fileitem, + "meta": task.meta, + "mediainfo": task.mediainfo, + "transferinfo": transferinfo, + "downloader": task.downloader, + "download_hash": task.download_hash, + "transfer_history_id": history.id if history else None, + }, + ) elif self.__is_audio_file(task.fileitem): # 音频文件整理完成事件 - self.eventmanager.send_event(EventType.AudioTransferComplete, { - 'fileitem': task.fileitem, - 'meta': task.meta, - 'mediainfo': task.mediainfo, - 'transferinfo': transferinfo, - 'downloader': task.downloader, - 'download_hash': task.download_hash, - 'transfer_history_id': history.id if history else None, - }) + self.eventmanager.send_event( + EventType.AudioTransferComplete, + { + "fileitem": task.fileitem, + "meta": task.meta, + "mediainfo": task.mediainfo, + "transferinfo": transferinfo, + "downloader": task.downloader, + "download_hash": task.download_hash, + "transfer_history_id": history.id if history else None, + }, + ) # task登记转移成功文件清单 target_dir_path = transferinfo.target_diritem.path @@ -748,26 +890,38 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 只要该种子的所有任务都已整理完成,则设置种子状态为已整理 if task.download_hash and self.jobview.is_torrent_done(task.download_hash): - self.transfer_completed(hashs=task.download_hash, downloader=task.downloader) + self.transfer_completed( + hashs=task.download_hash, downloader=task.downloader + ) # 移动模式,全部成功时删除空目录和种子文件 if transferinfo.transfer_type in ["move"]: # 全部整理成功时 if self.jobview.is_success(task): # 所有成功的业务 - tasks = self.jobview.success_tasks(task.mediainfo, task.meta.begin_season) + tasks = self.jobview.success_tasks( + task.mediainfo, task.meta.begin_season + ) # 获取整理屏蔽词 - transfer_exclude_words = SystemConfigOper().get(SystemConfigKey.TransferExcludeWords) + transfer_exclude_words = SystemConfigOper().get( + SystemConfigKey.TransferExcludeWords + ) processed_hashes = set() for t in tasks: if t.download_hash and t.download_hash not in processed_hashes: # 检查该种子的所有任务(跨作业)是否都已成功 if self.jobview.is_torrent_success(t.download_hash): processed_hashes.add(t.download_hash) - if self._can_delete_torrent(t.download_hash, t.downloader, transfer_exclude_words): + if self._can_delete_torrent( + t.download_hash, t.downloader, transfer_exclude_words + ): # 移除种子及文件 - if self.remove_torrents(t.download_hash, downloader=t.downloader): - logger.info(f"移动模式删除种子成功:{t.download_hash}") + if self.remove_torrents( + t.download_hash, downloader=t.downloader + ): + logger.info( + f"移动模式删除种子成功:{t.download_hash}" + ) if not t.download_hash and t.fileitem: # 删除剩余空目录 StorageChain().delete_media_file(t.fileitem, delete_self=False) @@ -786,10 +940,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): if not self.__put_to_jobview(task): return False # 添加到队列 - self._queue.put(TransferQueue( - task=task, - callback=self.__default_callback - )) + self._queue.put(TransferQueue(task=task, callback=self.__default_callback)) return True def __put_to_jobview(self, task: TransferTask) -> bool: @@ -813,7 +964,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): """ while not global_vars.is_system_stopped and self._queue_active: try: - item: TransferQueue = self._queue.get(block=True, timeout=self._transfer_interval) + item: TransferQueue = self._queue.get( + block=True, timeout=self._transfer_interval + ) if not item: continue @@ -839,10 +992,11 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 重置计数 self._processed_num = 0 self._fail_num = 0 - __process_msg = f"开始整理队列处理,当前共 {self._total_num} 个文件 ..." + __process_msg = ( + f"开始整理队列处理,当前共 {self._total_num} 个文件 ..." + ) logger.info(__process_msg) - self._progress.update(value=0, - text=__process_msg) + self._progress.update(value=0, text=__process_msg) # 增加运行中的任务数 self._active_tasks += 1 @@ -852,10 +1006,15 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): logger.info(__process_msg) with task_lock: self._progress.update( - value=(self._processed_num / self._total_num * 100) if self._total_num else 0, - text=__process_msg) + value=(self._processed_num / self._total_num * 100) + if self._total_num + else 0, + text=__process_msg, + ) # 整理 - state, err_msg = self.__handle_transfer(task=task, callback=item.callback) + state, err_msg = self.__handle_transfer( + task=task, callback=item.callback + ) with task_lock: if not state: @@ -866,10 +1025,15 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): __process_msg = f"{fileitem.name} 整理完成" logger.info(__process_msg) self._progress.update( - value=(self._processed_num / self._total_num * 100) if self._total_num else 100, - text=__process_msg) + value=(self._processed_num / self._total_num * 100) + if self._total_num + else 100, + text=__process_msg, + ) except Exception as e: - logger.error(f"{fileitem.name} 整理任务处理出现错误:{e} - {traceback.format_exc()}") + logger.error( + f"{fileitem.name} 整理任务处理出现错误:{e} - {traceback.format_exc()}" + ) with task_lock: self._processed_num += 1 self._fail_num += 1 @@ -883,8 +1047,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 结束进度 __end_msg = f"整理队列处理完成,共整理 {self._processed_num} 个文件,失败 {self._fail_num} 个" logger.info(__end_msg) - self._progress.update(value=100, - text=__end_msg) + self._progress.update(value=100, text=__end_msg) self._progress.end() # 重置计数 self._processed_num = 0 @@ -897,8 +1060,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): except Exception as e: logger.error(f"整理队列处理出现错误:{e} - {traceback.format_exc()}") - def __handle_transfer(self, task: TransferTask, - callback: Optional[Callable] = None) -> Optional[Tuple[bool, str]]: + def __handle_transfer( + self, task: TransferTask, callback: Optional[Callable] = None + ) -> Optional[Tuple[bool, str]]: """ 处理整理任务 """ @@ -915,10 +1079,12 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 识别媒体信息 if download_history.tmdbid or download_history.doubanid: # 下载记录中已存在识别信息 - mediainfo: Optional[MediaInfo] = self.recognize_media(mtype=MediaType(download_history.type), - tmdbid=download_history.tmdbid, - doubanid=download_history.doubanid, - episode_group=download_history.episode_group) + mediainfo: Optional[MediaInfo] = self.recognize_media( + mtype=MediaType(download_history.type), + tmdbid=download_history.tmdbid, + doubanid=download_history.doubanid, + episode_group=download_history.episode_group, + ) if mediainfo: # 更新自定义媒体类别 if download_history.media_category: @@ -938,25 +1104,46 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): mode=task.transfer_type, meta=task.meta, downloader=task.downloader, - download_hash=task.download_hash + download_hash=task.download_hash, + ) + self.post_message( + Notification( + mtype=NotificationType.Manual, + title=f"{task.fileitem.name} 未识别到媒体信息,无法入库!", + text=f"回复:\n```\n/redo {his.id} [tmdbid]|[类型]\n```\n手动识别整理。", + username=task.username, + link=settings.MP_DOMAIN("#/history"), + ) ) - self.post_message(Notification( - mtype=NotificationType.Manual, - title=f"{task.fileitem.name} 未识别到媒体信息,无法入库!", - text=f"回复:\n```\n/redo {his.id} [tmdbid]|[类型]\n```\n手动识别整理。", - username=task.username, - link=settings.MP_DOMAIN('#/history') - )) # 任务失败,直接移除task self.jobview.remove_task(task.fileitem) + + # AI智能体自动重试整理 + if ( + his + and settings.AI_AGENT_ENABLE + and settings.AI_AGENT_RETRY_TRANSFER + ): + try: + from app.agent import agent_manager + + asyncio.run_coroutine_threadsafe( + agent_manager.retry_failed_transfer(his.id), + global_vars.loop, + ) + logger.info(f"已触发AI智能体重试整理历史记录 #{his.id}") + except Exception as e: + logger.error(f"触发AI智能体重试整理失败: {e}") + return False, "未识别到媒体信息" mediainfo_changed = True # 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title if not settings.SCRAP_FOLLOW_TMDB: - transfer_history = transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id, - mtype=mediainfo.type.value) + transfer_history = transferhis.get_by_type_tmdbid( + tmdbid=mediainfo.tmdb_id, mtype=mediainfo.type.value + ) if transfer_history and mediainfo.title != transfer_history.title: mediainfo.title = transfer_history.title mediainfo_changed = True @@ -966,7 +1153,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): task.mediainfo = mediainfo # 更新队列任务 curr_task = self.jobview.remove_task(task.fileitem) - self.jobview.add_task(task, state=curr_task.state if curr_task else "waiting") + self.jobview.add_task( + task, state=curr_task.state if curr_task else "waiting" + ) # 获取集数据 if task.mediainfo.type == MediaType.TV and not task.episodes_info: @@ -981,22 +1170,26 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): task.episodes_info = TmdbChain().tmdb_episodes( tmdbid=task.mediainfo.tmdb_id, season=season_num, - episode_group=task.mediainfo.episode_group + episode_group=task.mediainfo.episode_group, ) # 查询整理目标目录 if not task.target_directory: if task.target_path: # 指定目标路径,`手动整理`场景下使用,忽略源目录匹配,使用指定目录匹配 - task.target_directory = DirectoryHelper().get_dir(media=task.mediainfo, - dest_path=task.target_path, - target_storage=task.target_storage) + task.target_directory = DirectoryHelper().get_dir( + media=task.mediainfo, + dest_path=task.target_path, + target_storage=task.target_storage, + ) else: # 启用源目录匹配时,根据源目录匹配下载目录,否则按源目录同盘优先原则,如无源目录,则根据媒体信息获取目标目录 - task.target_directory = DirectoryHelper().get_dir(media=task.mediainfo, - storage=task.fileitem.storage, - src_path=Path(task.fileitem.path), - target_storage=task.target_storage) + task.target_directory = DirectoryHelper().get_dir( + media=task.mediainfo, + storage=task.fileitem.storage, + src_path=Path(task.fileitem.path), + target_storage=task.target_storage, + ) if not task.target_storage and task.target_directory: task.target_storage = task.target_directory.library_storage @@ -1008,10 +1201,14 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): source_event_data = StorageOperSelectionEventData( storage=task.fileitem.storage, ) - source_event = eventmanager.send_event(ChainEventType.StorageOperSelection, source_event_data) + source_event = eventmanager.send_event( + ChainEventType.StorageOperSelection, source_event_data + ) # 使用事件返回的上下文数据 if source_event and source_event.event_data: - source_event_data: StorageOperSelectionEventData = source_event.event_data + source_event_data: StorageOperSelectionEventData = ( + source_event.event_data + ) if source_event_data.storage_oper: source_oper = source_event_data.storage_oper @@ -1020,27 +1217,33 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): target_event_data = StorageOperSelectionEventData( storage=task.target_storage, ) - target_event = eventmanager.send_event(ChainEventType.StorageOperSelection, target_event_data) + target_event = eventmanager.send_event( + ChainEventType.StorageOperSelection, target_event_data + ) # 使用事件返回的上下文数据 if target_event and target_event.event_data: - target_event_data: StorageOperSelectionEventData = target_event.event_data + target_event_data: StorageOperSelectionEventData = ( + target_event.event_data + ) if target_event_data.storage_oper: target_oper = target_event_data.storage_oper # 执行整理 - transferinfo: TransferInfo = self.transfer(fileitem=task.fileitem, - meta=task.meta, - mediainfo=task.mediainfo, - target_directory=task.target_directory, - target_storage=task.target_storage, - target_path=task.target_path, - transfer_type=task.transfer_type, - episodes_info=task.episodes_info, - scrape=task.scrape, - library_type_folder=task.library_type_folder, - library_category_folder=task.library_category_folder, - source_oper=source_oper, - target_oper=target_oper) + transferinfo: TransferInfo = self.transfer( + fileitem=task.fileitem, + meta=task.meta, + mediainfo=task.mediainfo, + target_directory=task.target_directory, + target_storage=task.target_storage, + target_path=task.target_path, + transfer_type=task.transfer_type, + episodes_info=task.episodes_info, + scrape=task.scrape, + library_type_folder=task.library_type_folder, + library_category_folder=task.library_category_folder, + source_oper=source_oper, + target_oper=target_oper, + ) if not transferinfo: logger.error("文件整理模块运行失败") return False, "文件整理模块运行失败" @@ -1080,8 +1283,10 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): download_dirs = DirectoryHelper().get_download_dirs() # 如果没有下载器监控的目录则不处理 - if not any(dir_info.monitor_type == "downloader" and dir_info.storage == "local" - for dir_info in download_dirs): + if not any( + dir_info.monitor_type == "downloader" and dir_info.storage == "local" + for dir_info in download_dirs + ): return True logger.info("开始整理下载器中已经完成下载的文件 ...") @@ -1094,8 +1299,8 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): torrent for torrent in torrents_list if (h := torrent.hash) not in existing_hashes - # 排除多下载器返回的重复种子 - and (h not in seen and (seen.add(h) or True)) + # 排除多下载器返回的重复种子 + and (h not in seen and (seen.add(h) or True)) ] else: torrents = [] @@ -1128,11 +1333,15 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): is_downloader_monitor = True break if not is_downloader_monitor: - logger.debug(f"文件 {file_path} 不在下载器监控目录中,不通过下载器进行整理") + logger.debug( + f"文件 {file_path} 不在下载器监控目录中,不通过下载器进行整理" + ) continue # 查询下载记录识别情况 - downloadhis: DownloadHistory = DownloadHistoryOper().get_by_hash(torrent.hash) + downloadhis: DownloadHistory = DownloadHistoryOper().get_by_hash( + torrent.hash + ) if downloadhis: # 类型 try: @@ -1140,10 +1349,12 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): except ValueError: mtype = MediaType.TV # 识别媒体信息 - mediainfo = self.recognize_media(mtype=mtype, - tmdbid=downloadhis.tmdbid, - doubanid=downloadhis.doubanid, - episode_group=downloadhis.episode_group) + mediainfo = self.recognize_media( + mtype=mtype, + tmdbid=downloadhis.tmdbid, + doubanid=downloadhis.doubanid, + episode_group=downloadhis.episode_group, + ) if mediainfo: # 补充图片 self.obtain_images(mediainfo) @@ -1159,15 +1370,16 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): self.do_transfer( fileitem=FileItem( storage="local", - path=file_path.as_posix() + ("/" if file_path.is_dir() else ""), + path=file_path.as_posix() + + ("/" if file_path.is_dir() else ""), type="dir" if not file_path.is_file() else "file", name=file_path.name, size=file_path.stat().st_size, - extension=file_path.suffix.lstrip('.'), + extension=file_path.suffix.lstrip("."), ), mediainfo=mediainfo, downloader=torrent.downloader, - download_hash=torrent.hash + download_hash=torrent.hash, ) finally: @@ -1206,7 +1418,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): """ 判断是否蓝光原盘目录内的子目录或文件 """ - return True if re.search(r"BDMV[/\\]STREAM", _path, re.IGNORECASE) else False + return ( + True if re.search(r"BDMV[/\\]STREAM", _path, re.IGNORECASE) else False + ) def __get_bluray_dir(_storage: str, _path: Path) -> Optional[FileItem]: """ @@ -1217,7 +1431,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): return storagechain.get_file_item(storage=_storage, path=p.parent) return None - def _apply_predicate(file_item: FileItem, is_bluray_dir: bool) -> List[Tuple[FileItem, bool]]: + def _apply_predicate( + file_item: FileItem, is_bluray_dir: bool + ) -> List[Tuple[FileItem, bool]]: if predicate is None or predicate(file_item, is_bluray_dir): return [(file_item, is_bluray_dir)] return [] @@ -1259,16 +1475,28 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): ) ] - def do_transfer(self, fileitem: FileItem, - meta: MetaBase = None, mediainfo: MediaInfo = None, - target_directory: TransferDirectoryConf = None, - target_storage: Optional[str] = None, target_path: Path = None, - transfer_type: Optional[str] = None, scrape: Optional[bool] = None, - library_type_folder: Optional[bool] = None, library_category_folder: Optional[bool] = None, - season: Optional[int] = None, epformat: EpisodeFormat = None, min_filesize: Optional[int] = 0, - downloader: Optional[str] = None, download_hash: Optional[str] = None, - force: Optional[bool] = False, background: Optional[bool] = True, - manual: Optional[bool] = False, continue_callback: Callable = None) -> Tuple[bool, str]: + def do_transfer( + self, + fileitem: FileItem, + meta: MetaBase = None, + mediainfo: MediaInfo = None, + target_directory: TransferDirectoryConf = None, + target_storage: Optional[str] = None, + target_path: Path = None, + transfer_type: Optional[str] = None, + scrape: Optional[bool] = None, + library_type_folder: Optional[bool] = None, + library_category_folder: Optional[bool] = None, + season: Optional[int] = None, + epformat: EpisodeFormat = None, + min_filesize: Optional[int] = 0, + downloader: Optional[str] = None, + download_hash: Optional[str] = None, + force: Optional[bool] = False, + background: Optional[bool] = True, + manual: Optional[bool] = False, + continue_callback: Callable = None, + ) -> Tuple[bool, str]: """ 执行一个复杂目录的整理操作 :param fileitem: 文件项 @@ -1296,13 +1524,21 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): all_success = True # 自定义格式 - formaterHandler = FormatParser(eformat=epformat.format, - details=epformat.detail, - part=epformat.part, - offset=epformat.offset) if epformat else None + formaterHandler = ( + FormatParser( + eformat=epformat.format, + details=epformat.detail, + part=epformat.part, + offset=epformat.offset, + ) + if epformat + else None + ) # 整理屏蔽词 - transfer_exclude_words = SystemConfigOper().get(SystemConfigKey.TransferExcludeWords) + transfer_exclude_words = SystemConfigOper().get( + SystemConfigKey.TransferExcludeWords + ) # 汇总错误信息 err_msgs: List[str] = [] @@ -1337,7 +1573,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): logger.debug(f"{file_item.path} 是回收站或隐藏的文件") return False # 整理屏蔽词不处理 - if self._is_blocked_by_exclude_words(file_item.path, transfer_exclude_words): + if self._is_blocked_by_exclude_words( + file_item.path, transfer_exclude_words + ): return False return True @@ -1365,11 +1603,15 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 整理成功的不再处理 if not force: - transferd = TransferHistoryOper().get_by_src(file_item.path, storage=file_item.storage) + transferd = TransferHistoryOper().get_by_src( + file_item.path, storage=file_item.storage + ) if transferd: if not transferd.status: all_success = False - logger.info(f"{file_item.path} 已整理过,如需重新处理,请删除整理记录。") + logger.info( + f"{file_item.path} 已整理过,如需重新处理,请删除整理记录。" + ) err_msgs.append(f"{file_item.name} 已整理过") continue @@ -1384,19 +1626,30 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): download_history = downloadhis.get_by_path(file_path.as_posix()) else: # 按文件全路径查询 - download_file = downloadhis.get_file_by_fullpath(file_path.as_posix()) + download_file = downloadhis.get_file_by_fullpath( + file_path.as_posix() + ) if download_file: - download_history = downloadhis.get_by_hash(download_file.download_hash) + download_history = downloadhis.get_by_hash( + download_file.download_hash + ) if not meta: subscribe_custom_words = None if download_history and isinstance(download_history.note, dict): # 使用source动态获取订阅 - subscribe = SubscribeChain().get_subscribe_by_source(download_history.note.get("source")) - subscribe_custom_words = subscribe.custom_words.split( - "\n") if subscribe and subscribe.custom_words else None + subscribe = SubscribeChain().get_subscribe_by_source( + download_history.note.get("source") + ) + subscribe_custom_words = ( + subscribe.custom_words.split("\n") + if subscribe and subscribe.custom_words + else None + ) # 文件元数据(优先使用订阅识别词) - file_meta = MetaInfoPath(file_path, custom_words=subscribe_custom_words) + file_meta = MetaInfoPath( + file_path, custom_words=subscribe_custom_words + ) else: file_meta = meta @@ -1413,8 +1666,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 自定义识别 if formaterHandler: # 开始集、结束集、PART - begin_ep, end_ep, part = formaterHandler.split_episode(file_name=file_path.name, - file_meta=file_meta) + begin_ep, end_ep, part = formaterHandler.split_episode( + file_name=file_path.name, file_meta=file_meta + ) if begin_ep is not None: file_meta.begin_episode = begin_ep if part is not None: @@ -1446,7 +1700,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): download_hash=_download_hash, download_history=download_history, manual=manual, - background=background + background=background, ) if background: if self.put_to_queue(task=transfer_task): @@ -1481,8 +1735,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): progress.start() __process_msg = f"开始整理,共 {total_num} 个文件 ..." logger.info(__process_msg) - progress.update(value=0, - text=__process_msg) + progress.update(value=0, text=__process_msg) try: for transfer_task in transfer_tasks: if global_vars.is_system_stopped: @@ -1492,15 +1745,16 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 更新进度 __process_msg = f"正在整理 ({processed_num + fail_num + 1}/{total_num}){transfer_task.fileitem.name} ..." logger.info(__process_msg) - progress.update(value=(processed_num + fail_num) / total_num * 100, - text=__process_msg, - data={ - "current": Path(transfer_task.fileitem.path).as_posix(), - "finished": finished_files, - }) + progress.update( + value=(processed_num + fail_num) / total_num * 100, + text=__process_msg, + data={ + "current": Path(transfer_task.fileitem.path).as_posix(), + "finished": finished_files, + }, + ) state, err_msg = self.__handle_transfer( - task=transfer_task, - callback=self.__default_callback + task=transfer_task, callback=self.__default_callback ) if not state: all_success = False @@ -1516,26 +1770,39 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): del transfer_tasks # 整理结束 - __end_msg = f"整理队列处理完成,共整理 {total_num} 个文件,失败 {fail_num} 个" + __end_msg = ( + f"整理队列处理完成,共整理 {total_num} 个文件,失败 {fail_num} 个" + ) logger.info(__end_msg) - progress.update(value=100, - text=__end_msg, - data={}) + progress.update(value=100, text=__end_msg, data={}) progress.end() - error_msg = "、".join(err_msgs[:2]) + (f",等{len(err_msgs)}个文件错误!" if len(err_msgs) > 2 else "") + error_msg = "、".join(err_msgs[:2]) + ( + f",等{len(err_msgs)}个文件错误!" if len(err_msgs) > 2 else "" + ) return all_success, error_msg - def remote_transfer(self, arg_str: str, channel: MessageChannel, - userid: Union[str, int] = None, source: Optional[str] = None): + def remote_transfer( + self, + arg_str: str, + channel: MessageChannel, + userid: Union[str, int] = None, + source: Optional[str] = None, + ): """ 远程重新整理,参数 历史记录ID TMDBID|类型 """ def args_error(): - self.post_message(Notification(channel=channel, source=source, - title="请输入正确的命令格式:/redo [id] [tmdbid/豆瓣id]|[类型]," - "[id]整理记录编号", userid=userid)) + self.post_message( + Notification( + channel=channel, + source=source, + title="请输入正确的命令格式:/redo [id] [tmdbid/豆瓣id]|[类型]," + "[id]整理记录编号", + userid=userid, + ) + ) if not arg_str: args_error() @@ -1550,7 +1817,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): args_error() return # TMDBID/豆瓣ID - id_strs = arg_strs[1].split('|') + id_strs = arg_strs[1].split("|") media_id = id_strs[0] if not logid.isdigit(): args_error() @@ -1560,16 +1827,25 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): if not type_str or type_str not in [MediaType.MOVIE.value, MediaType.TV.value]: args_error() return - state, errmsg = self.__re_transfer(logid=int(logid), - mtype=MediaType(type_str), - mediaid=media_id) + state, errmsg = self.__re_transfer( + logid=int(logid), mtype=MediaType(type_str), mediaid=media_id + ) if not state: - self.post_message(Notification(channel=channel, title="手动整理失败", source=source, - text=errmsg, userid=userid, link=settings.MP_DOMAIN('#/history'))) + self.post_message( + Notification( + channel=channel, + title="手动整理失败", + source=source, + text=errmsg, + userid=userid, + link=settings.MP_DOMAIN("#/history"), + ) + ) return - def __re_transfer(self, logid: int, mtype: MediaType = None, - mediaid: Optional[str] = None) -> Tuple[bool, str]: + def __re_transfer( + self, logid: int, mtype: MediaType = None, mediaid: Optional[str] = None + ) -> Tuple[bool, str]: """ 根据历史记录,重新识别整理,只支持简单条件 :param logid: 历史记录ID @@ -1587,13 +1863,19 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): return False, f"源目录不存在:{src_path}" # 查询媒体信息 if mtype and mediaid: - mediainfo = self.recognize_media(mtype=mtype, tmdbid=int(mediaid) if str(mediaid).isdigit() else None, - doubanid=mediaid, episode_group=history.episode_group) + mediainfo = self.recognize_media( + mtype=mtype, + tmdbid=int(mediaid) if str(mediaid).isdigit() else None, + doubanid=mediaid, + episode_group=history.episode_group, + ) if mediainfo: # 更新媒体图片 self.obtain_images(mediainfo=mediainfo) else: - mediainfo = MediaChain().recognize_by_path(str(src_path), episode_group=history.episode_group) + mediainfo = MediaChain().recognize_by_path( + str(src_path), episode_group=history.episode_group + ) if not mediainfo: return False, f"未识别到媒体信息,类型:{mtype.value},id:{mediaid}" # 重新执行整理 @@ -1607,36 +1889,40 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): # 强制整理 if history.src_fileitem: - state, errmsg = self.do_transfer(fileitem=FileItem(**history.src_fileitem), - mediainfo=mediainfo, - download_hash=history.download_hash, - force=True, - background=False, - manual=True) + state, errmsg = self.do_transfer( + fileitem=FileItem(**history.src_fileitem), + mediainfo=mediainfo, + download_hash=history.download_hash, + force=True, + background=False, + manual=True, + ) if not state: return False, errmsg return True, "" - def manual_transfer(self, - fileitem: FileItem, - target_storage: Optional[str] = None, - target_path: Path = None, - tmdbid: Optional[int] = None, - doubanid: Optional[str] = None, - mtype: MediaType = None, - season: Optional[int] = None, - episode_group: Optional[str] = None, - transfer_type: Optional[str] = None, - epformat: EpisodeFormat = None, - min_filesize: Optional[int] = 0, - scrape: Optional[bool] = None, - library_type_folder: Optional[bool] = None, - library_category_folder: Optional[bool] = None, - force: Optional[bool] = False, - background: Optional[bool] = False, - downloader: Optional[str] = None, - download_hash: Optional[str] = None) -> Tuple[bool, Union[str, list]]: + def manual_transfer( + self, + fileitem: FileItem, + target_storage: Optional[str] = None, + target_path: Path = None, + tmdbid: Optional[int] = None, + doubanid: Optional[str] = None, + mtype: MediaType = None, + season: Optional[int] = None, + episode_group: Optional[str] = None, + transfer_type: Optional[str] = None, + epformat: EpisodeFormat = None, + min_filesize: Optional[int] = 0, + scrape: Optional[bool] = None, + library_type_folder: Optional[bool] = None, + library_category_folder: Optional[bool] = None, + force: Optional[bool] = False, + background: Optional[bool] = False, + downloader: Optional[str] = None, + download_hash: Optional[str] = None, + ) -> Tuple[bool, Union[str, list]]: """ 手动整理,支持复杂条件,带进度显示 :param fileitem: 文件项 @@ -1662,11 +1948,17 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): if tmdbid or doubanid: # 有输入TMDBID时单个识别 # 识别媒体信息 - mediainfo: MediaInfo = MediaChain().recognize_media(tmdbid=tmdbid, doubanid=doubanid, - mtype=mtype, episode_group=episode_group) + mediainfo: MediaInfo = MediaChain().recognize_media( + tmdbid=tmdbid, + doubanid=doubanid, + mtype=mtype, + episode_group=episode_group, + ) if not mediainfo: - return (False, - f"媒体信息识别失败,tmdbid:{tmdbid},doubanid:{doubanid},type: {mtype.value if mtype else None}") + return ( + False, + f"媒体信息识别失败,tmdbid:{tmdbid},doubanid:{doubanid},type: {mtype.value if mtype else None}", + ) else: # 更新媒体图片 self.obtain_images(mediainfo=mediainfo) @@ -1688,7 +1980,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): background=background, manual=True, downloader=downloader, - download_hash=download_hash + download_hash=download_hash, ) if not state: return False, errmsg @@ -1697,26 +1989,33 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): return True, "" else: # 没有输入TMDBID时,按文件识别 - state, errmsg = self.do_transfer(fileitem=fileitem, - target_storage=target_storage, - target_path=target_path, - transfer_type=transfer_type, - season=season, - epformat=epformat, - min_filesize=min_filesize, - scrape=scrape, - library_type_folder=library_type_folder, - library_category_folder=library_category_folder, - force=force, - background=background, - manual=True, - downloader=downloader, - download_hash=download_hash) + state, errmsg = self.do_transfer( + fileitem=fileitem, + target_storage=target_storage, + target_path=target_path, + transfer_type=transfer_type, + season=season, + epformat=epformat, + min_filesize=min_filesize, + scrape=scrape, + library_type_folder=library_type_folder, + library_category_folder=library_category_folder, + force=force, + background=background, + manual=True, + downloader=downloader, + download_hash=download_hash, + ) return state, errmsg - def send_transfer_message(self, meta: MetaBase, mediainfo: MediaInfo, - transferinfo: TransferInfo, season_episode: Optional[str] = None, - username: Optional[str] = None): + def send_transfer_message( + self, + meta: MetaBase, + mediainfo: MediaInfo, + transferinfo: TransferInfo, + season_episode: Optional[str] = None, + username: Optional[str] = None, + ): """ 发送入库成功的消息 """ @@ -1726,13 +2025,13 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): ctype=ContentType.OrganizeSuccess, image=mediainfo.get_message_image(), username=username, - link=settings.MP_DOMAIN('#/history') + link=settings.MP_DOMAIN("#/history"), ), meta=meta, mediainfo=mediainfo, transferinfo=transferinfo, season_episode=season_episode, - username=username + username=username, ) @staticmethod @@ -1752,7 +2051,9 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): return True return False - def _can_delete_torrent(self, download_hash: str, downloader: str, transfer_exclude_words) -> bool: + def _can_delete_torrent( + self, download_hash: str, downloader: str, transfer_exclude_words + ) -> bool: """ 检查是否可以删除种子文件 :param download_hash: 种子Hash @@ -1783,9 +2084,13 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): for file in torrent_files: file_path = save_path / file.name # 如果存在未被屏蔽的媒体文件,则不删除种子 - if (file_path.suffix in self._allowed_exts - and not self._is_blocked_by_exclude_words(file_path.as_posix(), transfer_exclude_words) - and file_path.exists()): + if ( + file_path.suffix in self._allowed_exts + and not self._is_blocked_by_exclude_words( + file_path.as_posix(), transfer_exclude_words + ) + and file_path.exists() + ): return False # 所有媒体文件都被屏蔽或不存在,可以删除种子 diff --git a/app/core/config.py b/app/core/config.py index 3faafa70..fcfa79e0 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -535,6 +535,8 @@ class ConfigModel(BaseModel): AI_AGENT_JOB_INTERVAL: int = 0 # AI智能体啰嗦模式,开启后会回复工具调用过程 AI_AGENT_VERBOSE: bool = False + # AI智能体自动重试整理失败记录开关 + AI_AGENT_RETRY_TRANSFER: bool = False class Settings(BaseSettings, ConfigModel, LogConfigModel): diff --git a/skills/transfer-failed-retry/SKILL.md b/skills/transfer-failed-retry/SKILL.md new file mode 100644 index 00000000..3cd84b11 --- /dev/null +++ b/skills/transfer-failed-retry/SKILL.md @@ -0,0 +1,137 @@ +--- +name: transfer-failed-retry +description: Use this skill when you need to retry a failed file transfer/organization. Given a failed transfer history record ID, this skill guides you through querying the failure details, deleting the old record, and re-identifying and re-organizing the file. This skill is automatically triggered when the system detects a transfer failure and the AI agent retry feature is enabled. +allowed-tools: query_transfer_history delete_transfer_history recognize_media transfer_file search_media +--- + +# Transfer Failed Retry (整理失败重试) + +This skill handles retrying failed file transfers/organizations. When a file transfer fails, you can use this skill to analyze the failure, remove the stale history record, and attempt to re-identify and re-organize the file. + +## Prerequisites + +You need the following tools: +- `query_transfer_history` - Query transfer history records +- `delete_transfer_history` - Delete a transfer history record +- `recognize_media` - Recognize media info from file path or title +- `transfer_file` - Transfer/organize files to the media library +- `search_media` - Search TMDB for media information + +## Workflow + +### Step 1: Query the Failed Transfer History + +Use `query_transfer_history` to get details about the failed record. Filter by status `failed` to find the specific record. + +If you are given a specific history record ID, query with that ID to understand the failure context: + +``` +query_transfer_history(status="failed") +``` + +From the record, extract the following key information: +- **id**: The history record ID +- **src**: Source file path +- **title**: The recognized title (may be incorrect) +- **errmsg**: The error message explaining why the transfer failed +- **type**: Media type (movie/tv) +- **tmdbid**: TMDB ID (if available) +- **seasons/episodes**: Season/episode info (if TV show) +- **downloader**: Which downloader was used +- **download_hash**: The torrent hash + +### Step 2: Analyze the Failure Reason + +Common failure reasons and how to handle them: + +| Error Message | Cause | Solution | +|---------------|-------|----------| +| 未识别到媒体信息 | File name couldn't be matched to any media | Use `search_media` to find the correct TMDB ID, then use `transfer_file` with explicit `tmdbid` | +| 源目录不存在 | Source file was moved or deleted | Cannot retry - skip this record | +| 目标路径不存在 | Target directory issue | Retry transfer - the directory config may have been fixed | +| 文件已存在 | Target file already exists | May need to use `force` mode or skip | +| 未找到有效的集数信息 | Episode number not recognized | Use `recognize_media` with the file path to get better metadata, or specify season/episode in `transfer_file` | +| 未获取到转移目录设置 | No transfer directory configured for this media type | Cannot auto-fix - notify user about directory configuration | + +### Step 3: Delete the Failed History Record + +Before retrying, you **must** delete the old failed history record. The system skips files that already have a transfer history entry (even failed ones). + +``` +delete_transfer_history(history_id=) +``` + +### Step 4: Re-identify and Re-organize + +Based on the failure analysis in Step 2: + +#### Case A: Unrecognized Media (未识别到媒体信息) + +1. Try recognizing the media from file path: + ``` + recognize_media(path="") + ``` + +2. If recognition fails, try searching TMDB with keywords extracted from the filename: + ``` + search_media(title="", media_type="movie" or "tv") + ``` + +3. Once you have the correct TMDB ID, re-transfer with explicit identification: + ``` + transfer_file(file_path="", tmdbid=, media_type="movie" or "tv") + ``` + +#### Case B: Transfer Error (file operation failed) + +Simply retry the transfer: +``` +transfer_file(file_path="") +``` + +#### Case C: Episode Recognition Issue + +For TV shows where episode info couldn't be determined: +1. Use `recognize_media` to get better metadata +2. Re-transfer with explicit season info: + ``` + transfer_file(file_path="", tmdbid=, media_type="tv", season=) + ``` + +### Step 5: Report Result + +After the retry attempt, report the result: +- If successful: Confirm the file has been organized correctly +- If failed again: Report the new error and suggest manual intervention + +## Important Notes + +- **Always delete the old history record first** before retrying. The system will skip files with existing history. +- **Do not retry** if the source file no longer exists (源目录不存在). +- **Do not retry** if the error is about missing directory configuration - this requires user intervention. +- **For unrecognized media**, always try `recognize_media` with the file path first before falling back to `search_media`. +- **Be cautious with TV shows** - ensure the correct season and episode information is used. +- When this skill is triggered automatically by the system, it provides the `history_id` directly. Start from Step 1 with that specific ID. + +## Example: Complete Retry Flow + +``` +# 1. Query the failed record +query_transfer_history(status="failed", page=1) +# Found: id=42, src="/downloads/Movie.Name.2024.1080p.mkv", errmsg="未识别到媒体信息" + +# 2. Try to recognize the media from path +recognize_media(path="/downloads/Movie.Name.2024.1080p.mkv") +# Recognition failed + +# 3. Search TMDB +search_media(title="Movie Name", year="2024", media_type="movie") +# Found: tmdb_id=123456 + +# 4. Delete old history record +delete_transfer_history(history_id=42) + +# 5. Re-transfer with correct identification +transfer_file(file_path="/downloads/Movie.Name.2024.1080p.mkv", tmdbid=123456, media_type="movie") +# Success! +```