From 011535fbc31b09101a150ebba7e85be938b5062f Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 16 Apr 2026 22:07:21 +0800 Subject: [PATCH] feat: add retry actions for failed transfers --- app/chain/message.py | 182 ++++++++++++++++++++ app/chain/transfer.py | 69 +++++++- tests/test_transfer_failed_retry_buttons.py | 98 +++++++++++ 3 files changed, 344 insertions(+), 5 deletions(-) create mode 100644 tests/test_transfer_failed_retry_buttons.py diff --git a/app/chain/message.py b/app/chain/message.py index 27e9bb47..695d0cd0 100644 --- a/app/chain/message.py +++ b/app/chain/message.py @@ -16,6 +16,7 @@ from app.chain.download import DownloadChain from app.chain.media import MediaChain from app.chain.search import SearchChain from app.chain.subscribe import SubscribeChain +from app.chain.transfer import TransferChain from app.core.config import settings, global_vars from app.core.context import MediaInfo, Context from app.core.meta import MetaBase @@ -784,6 +785,15 @@ class MessageChain(ChainBase): callback_data = text[9:] # 去掉 "CALLBACK:" 前缀 logger.info(f"处理按钮回调:{callback_data}") + if self._handle_transfer_callback( + callback_data=callback_data, + channel=channel, + source=source, + userid=userid, + username=username, + ): + return + # 插件消息的事件回调 [PLUGIN]插件ID|内容 if callback_data.startswith("[PLUGIN]"): # 提取插件ID和内容 @@ -827,6 +837,178 @@ class MessageChain(ChainBase): ) ) + @staticmethod + def _parse_transfer_callback( + callback_data: str, + ) -> Optional[tuple[str, int]]: + """ + 解析整理失败通知按钮回调。 + """ + for prefix, action in ( + ("transfer_retry_", "retry"), + ("transfer_ai_retry_", "ai_retry"), + ): + if callback_data.startswith(prefix): + history_id = callback_data.replace(prefix, "", 1) + if history_id.isdigit(): + return action, int(history_id) + return None + + def _handle_transfer_callback( + self, + callback_data: str, + channel: MessageChannel, + source: str, + userid: Union[str, int], + username: str, + ) -> bool: + """ + 处理整理失败通知中的重试类按钮。 + """ + callback = self._parse_transfer_callback(callback_data) + if not callback: + return False + + action, history_id = callback + if action == "retry": + self._retry_transfer_history( + history_id=history_id, + channel=channel, + source=source, + userid=userid, + username=username, + ) + else: + self._take_over_transfer_history_by_ai( + history_id=history_id, + channel=channel, + source=source, + userid=userid, + username=username, + ) + return True + + def _retry_transfer_history( + self, + history_id: int, + channel: MessageChannel, + source: str, + userid: Union[str, int], + username: str, + ) -> None: + """ + 立即重新整理一条失败的整理记录。 + """ + self.post_message( + Notification( + channel=channel, + source=source, + userid=userid, + username=username, + title=f"开始重新整理记录 #{history_id} ...", + ) + ) + + state, errmsg = TransferChain().redo_transfer_history(history_id) + if state: + self.post_message( + Notification( + channel=channel, + source=source, + userid=userid, + username=username, + title=f"整理记录 #{history_id} 已重新整理", + link=settings.MP_DOMAIN("#/history"), + ) + ) + return + + self.post_message( + Notification( + channel=channel, + source=source, + userid=userid, + username=username, + title="重新整理失败", + text=errmsg, + link=settings.MP_DOMAIN("#/history"), + ) + ) + + def _take_over_transfer_history_by_ai( + self, + history_id: int, + channel: MessageChannel, + source: str, + userid: Union[str, int], + username: str, + ) -> None: + """ + 由智能助手接管一条失败的整理记录。 + """ + if not settings.AI_AGENT_ENABLE: + self.post_message( + Notification( + channel=channel, + source=source, + userid=userid, + username=username, + title="MoviePilot智能助手未启用,请在系统设置中启用", + ) + ) + return + + self.post_message( + Notification( + channel=channel, + source=source, + userid=userid, + username=username, + title=f"已将整理记录 #{history_id} 交给智能助手处理", + text="处理完成后会在这里回复结果。", + link=settings.MP_DOMAIN("#/history"), + ) + ) + + async def _run_ai_takeover(): + final_output = "" + + def _capture_output(text_output: str): + nonlocal final_output + final_output = text_output or "" + + try: + await agent_manager.manual_redo_transfer( + history_id=history_id, + output_callback=_capture_output, + ) + await self.async_post_message( + Notification( + channel=channel, + source=source, + userid=userid, + username=username, + title="智能助手整理完成", + text=final_output.strip() + or f"整理记录 #{history_id} 已由智能助手处理完成。", + link=settings.MP_DOMAIN("#/history"), + ) + ) + except Exception as e: + await self.async_post_message( + Notification( + channel=channel, + source=source, + userid=userid, + username=username, + title="智能助手整理失败", + text=str(e), + link=settings.MP_DOMAIN("#/history"), + ) + ) + + asyncio.run_coroutine_threadsafe(_run_ai_takeover(), global_vars.loop) + def __auto_download( self, channel: MessageChannel, diff --git a/app/chain/transfer.py b/app/chain/transfer.py index eae07853..b50be309 100755 --- a/app/chain/transfer.py +++ b/app/chain/transfer.py @@ -842,10 +842,22 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): Notification( mtype=NotificationType.Manual, title=f"{task.mediainfo.title_year} {task.meta.season_episode} 入库失败!", - text=f"原因:{transferinfo.message or '未知'}", + text="\n".join( + [ + f"原因:{transferinfo.message or '未知'}", + ( + f"如果按钮不可用,可回复:\n```\n/redo {history.id}\n```" + if history + else "" + ), + ] + ).strip(), image=task.mediainfo.get_message_image(), username=task.username, link=settings.MP_DOMAIN("#/history"), + buttons=self.build_failed_transfer_buttons( + history.id if history else None + ), ) ) @@ -1193,9 +1205,17 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): Notification( mtype=NotificationType.Manual, title=f"{task.fileitem.name} 未识别到媒体信息,无法入库!", - text=f"回复:\n```\n/redo {his.id} [tmdbid]|[类型]\n```\n手动识别整理。", + text=( + "原因:未识别到媒体信息\n" + "如果按钮不可用,可回复:\n" + f"```\n/redo {his.id}\n/redo {his.id} [tmdbid]|[类型]\n```\n" + "自动重试或手动识别整理。" + ), username=task.username, link=settings.MP_DOMAIN("#/history"), + buttons=self.build_failed_transfer_buttons( + his.id if his else None + ), ) ) # 任务失败,直接移除task @@ -1895,8 +1915,8 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): Notification( channel=channel, source=source, - title="请输入正确的命令格式:/redo [id] [tmdbid/豆瓣id]|[类型]," - "[id]整理记录编号", + title="请输入正确的命令格式:/redo [id] 或 /redo [id] [tmdbid/豆瓣id]|[类型]," + "[id] 为整理记录编号", userid=userid, ) ) @@ -1905,7 +1925,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): args_error() return arg_strs = str(arg_str).split() - if len(arg_strs) != 2: + if len(arg_strs) not in (1, 2): args_error() return # 历史记录ID @@ -1913,6 +1933,20 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): if not logid.isdigit(): args_error() return + if len(arg_strs) == 1: + state, errmsg = self.redo_transfer_history(int(logid)) + if not state: + self.post_message( + Notification( + channel=channel, + title="手动整理失败", + source=source, + text=errmsg, + userid=userid, + link=settings.MP_DOMAIN("#/history"), + ) + ) + return # TMDBID/豆瓣ID id_strs = arg_strs[1].split("|") media_id = id_strs[0] @@ -1940,6 +1974,31 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton): ) return + @staticmethod + def build_failed_transfer_buttons( + history_id: Optional[int], + ) -> Optional[List[List[dict]]]: + """ + 构建整理失败通知的操作按钮。 + """ + if not history_id: + return None + return [ + [ + {"text": "重试", "callback_data": f"transfer_retry_{history_id}"}, + { + "text": "智能助手接管", + "callback_data": f"transfer_ai_retry_{history_id}", + }, + ] + ] + + def redo_transfer_history(self, history_id: int) -> Tuple[bool, str]: + """ + 按历史记录直接重新整理,自动重新识别媒体信息。 + """ + return self.__re_transfer(logid=history_id) + def __re_transfer( self, logid: int, mtype: MediaType = None, mediaid: Optional[str] = None ) -> Tuple[bool, str]: diff --git a/tests/test_transfer_failed_retry_buttons.py b/tests/test_transfer_failed_retry_buttons.py new file mode 100644 index 00000000..74d9a611 --- /dev/null +++ b/tests/test_transfer_failed_retry_buttons.py @@ -0,0 +1,98 @@ +import unittest +import sys +from types import ModuleType +from unittest.mock import patch + +sys.modules.setdefault("qbittorrentapi", ModuleType("qbittorrentapi")) +setattr(sys.modules["qbittorrentapi"], "TorrentFilesList", list) +sys.modules.setdefault("transmission_rpc", ModuleType("transmission_rpc")) +setattr(sys.modules["transmission_rpc"], "File", object) +sys.modules.setdefault("psutil", ModuleType("psutil")) + +from app.chain.message import MessageChain +from app.chain.transfer import TransferChain +from app.core.config import settings +from app.schemas.types import MessageChannel + + +class TestTransferFailedRetryButtons(unittest.TestCase): + def test_build_failed_transfer_buttons(self): + buttons = TransferChain.build_failed_transfer_buttons(12) + + self.assertEqual( + buttons, + [ + [ + {"text": "重试", "callback_data": "transfer_retry_12"}, + { + "text": "智能助手接管", + "callback_data": "transfer_ai_retry_12", + }, + ] + ], + ) + + def test_remote_transfer_supports_history_only_retry(self): + chain = TransferChain() + + with patch.object(chain, "redo_transfer_history", return_value=(True, "")) as redo: + with patch.object(chain, "post_message") as post_message: + chain.remote_transfer( + "12", + channel=MessageChannel.Telegram, + userid="10001", + source="telegram-test", + ) + + redo.assert_called_once_with(12) + post_message.assert_not_called() + + def test_transfer_retry_callback_retries_history(self): + chain = MessageChain() + + with patch("app.chain.message.TransferChain") as transfer_cls: + transfer_cls.return_value.redo_transfer_history.return_value = (True, "") + with patch.object(chain, "post_message") as post_message: + chain._handle_callback( + text="CALLBACK:transfer_retry_12", + channel=MessageChannel.Telegram, + source="telegram-test", + userid="10001", + username="tester", + ) + + transfer_cls.return_value.redo_transfer_history.assert_called_once_with(12) + self.assertEqual(post_message.call_count, 2) + self.assertEqual( + post_message.call_args_list[0].args[0].title, + "开始重新整理记录 #12 ...", + ) + self.assertEqual( + post_message.call_args_list[1].args[0].title, + "整理记录 #12 已重新整理", + ) + + def test_transfer_ai_retry_callback_schedules_agent_takeover(self): + chain = MessageChain() + + with patch.object(settings, "AI_AGENT_ENABLE", True): + with patch("app.chain.message.asyncio.run_coroutine_threadsafe") as run_task: + with patch.object(chain, "post_message") as post_message: + chain._handle_callback( + text="CALLBACK:transfer_ai_retry_34", + channel=MessageChannel.Telegram, + source="telegram-test", + userid="10001", + username="tester", + ) + + run_task.assert_called_once() + self.assertEqual(post_message.call_count, 1) + self.assertEqual( + post_message.call_args_list[0].args[0].title, + "已将整理记录 #34 交给智能助手处理", + ) + + +if __name__ == "__main__": + unittest.main()