feat: add retry actions for failed transfers

This commit is contained in:
jxxghp
2026-04-16 22:07:21 +08:00
parent 77b95d11fb
commit 011535fbc3
3 changed files with 344 additions and 5 deletions

View File

@@ -16,6 +16,7 @@ from app.chain.download import DownloadChain
from app.chain.media import MediaChain
from app.chain.search import SearchChain
from app.chain.subscribe import SubscribeChain
from app.chain.transfer import TransferChain
from app.core.config import settings, global_vars
from app.core.context import MediaInfo, Context
from app.core.meta import MetaBase
@@ -784,6 +785,15 @@ class MessageChain(ChainBase):
callback_data = text[9:] # 去掉 "CALLBACK:" 前缀
logger.info(f"处理按钮回调:{callback_data}")
if self._handle_transfer_callback(
callback_data=callback_data,
channel=channel,
source=source,
userid=userid,
username=username,
):
return
# 插件消息的事件回调 [PLUGIN]插件ID|内容
if callback_data.startswith("[PLUGIN]"):
# 提取插件ID和内容
@@ -827,6 +837,178 @@ class MessageChain(ChainBase):
)
)
@staticmethod
def _parse_transfer_callback(
callback_data: str,
) -> Optional[tuple[str, int]]:
"""
解析整理失败通知按钮回调。
"""
for prefix, action in (
("transfer_retry_", "retry"),
("transfer_ai_retry_", "ai_retry"),
):
if callback_data.startswith(prefix):
history_id = callback_data.replace(prefix, "", 1)
if history_id.isdigit():
return action, int(history_id)
return None
def _handle_transfer_callback(
self,
callback_data: str,
channel: MessageChannel,
source: str,
userid: Union[str, int],
username: str,
) -> bool:
"""
处理整理失败通知中的重试类按钮。
"""
callback = self._parse_transfer_callback(callback_data)
if not callback:
return False
action, history_id = callback
if action == "retry":
self._retry_transfer_history(
history_id=history_id,
channel=channel,
source=source,
userid=userid,
username=username,
)
else:
self._take_over_transfer_history_by_ai(
history_id=history_id,
channel=channel,
source=source,
userid=userid,
username=username,
)
return True
def _retry_transfer_history(
self,
history_id: int,
channel: MessageChannel,
source: str,
userid: Union[str, int],
username: str,
) -> None:
"""
立即重新整理一条失败的整理记录。
"""
self.post_message(
Notification(
channel=channel,
source=source,
userid=userid,
username=username,
title=f"开始重新整理记录 #{history_id} ...",
)
)
state, errmsg = TransferChain().redo_transfer_history(history_id)
if state:
self.post_message(
Notification(
channel=channel,
source=source,
userid=userid,
username=username,
title=f"整理记录 #{history_id} 已重新整理",
link=settings.MP_DOMAIN("#/history"),
)
)
return
self.post_message(
Notification(
channel=channel,
source=source,
userid=userid,
username=username,
title="重新整理失败",
text=errmsg,
link=settings.MP_DOMAIN("#/history"),
)
)
def _take_over_transfer_history_by_ai(
self,
history_id: int,
channel: MessageChannel,
source: str,
userid: Union[str, int],
username: str,
) -> None:
"""
由智能助手接管一条失败的整理记录。
"""
if not settings.AI_AGENT_ENABLE:
self.post_message(
Notification(
channel=channel,
source=source,
userid=userid,
username=username,
title="MoviePilot智能助手未启用请在系统设置中启用",
)
)
return
self.post_message(
Notification(
channel=channel,
source=source,
userid=userid,
username=username,
title=f"已将整理记录 #{history_id} 交给智能助手处理",
text="处理完成后会在这里回复结果。",
link=settings.MP_DOMAIN("#/history"),
)
)
async def _run_ai_takeover():
final_output = ""
def _capture_output(text_output: str):
nonlocal final_output
final_output = text_output or ""
try:
await agent_manager.manual_redo_transfer(
history_id=history_id,
output_callback=_capture_output,
)
await self.async_post_message(
Notification(
channel=channel,
source=source,
userid=userid,
username=username,
title="智能助手整理完成",
text=final_output.strip()
or f"整理记录 #{history_id} 已由智能助手处理完成。",
link=settings.MP_DOMAIN("#/history"),
)
)
except Exception as e:
await self.async_post_message(
Notification(
channel=channel,
source=source,
userid=userid,
username=username,
title="智能助手整理失败",
text=str(e),
link=settings.MP_DOMAIN("#/history"),
)
)
asyncio.run_coroutine_threadsafe(_run_ai_takeover(), global_vars.loop)
def __auto_download(
self,
channel: MessageChannel,

View File

@@ -842,10 +842,22 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
Notification(
mtype=NotificationType.Manual,
title=f"{task.mediainfo.title_year} {task.meta.season_episode} 入库失败!",
text=f"原因:{transferinfo.message or '未知'}",
text="\n".join(
[
f"原因:{transferinfo.message or '未知'}",
(
f"如果按钮不可用,可回复:\n```\n/redo {history.id}\n```"
if history
else ""
),
]
).strip(),
image=task.mediainfo.get_message_image(),
username=task.username,
link=settings.MP_DOMAIN("#/history"),
buttons=self.build_failed_transfer_buttons(
history.id if history else None
),
)
)
@@ -1193,9 +1205,17 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
Notification(
mtype=NotificationType.Manual,
title=f"{task.fileitem.name} 未识别到媒体信息,无法入库!",
text=f"回复:\n```\n/redo {his.id} [tmdbid]|[类型]\n```\n手动识别整理。",
text=(
"原因:未识别到媒体信息\n"
"如果按钮不可用,可回复:\n"
f"```\n/redo {his.id}\n/redo {his.id} [tmdbid]|[类型]\n```\n"
"自动重试或手动识别整理。"
),
username=task.username,
link=settings.MP_DOMAIN("#/history"),
buttons=self.build_failed_transfer_buttons(
his.id if his else None
),
)
)
# 任务失败直接移除task
@@ -1895,8 +1915,8 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
Notification(
channel=channel,
source=source,
title="请输入正确的命令格式:/redo [id] [tmdbid/豆瓣id]|[类型]"
"[id]整理记录编号",
title="请输入正确的命令格式:/redo [id] 或 /redo [id] [tmdbid/豆瓣id]|[类型]"
"[id]整理记录编号",
userid=userid,
)
)
@@ -1905,7 +1925,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
args_error()
return
arg_strs = str(arg_str).split()
if len(arg_strs) != 2:
if len(arg_strs) not in (1, 2):
args_error()
return
# 历史记录ID
@@ -1913,6 +1933,20 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
if not logid.isdigit():
args_error()
return
if len(arg_strs) == 1:
state, errmsg = self.redo_transfer_history(int(logid))
if not state:
self.post_message(
Notification(
channel=channel,
title="手动整理失败",
source=source,
text=errmsg,
userid=userid,
link=settings.MP_DOMAIN("#/history"),
)
)
return
# TMDBID/豆瓣ID
id_strs = arg_strs[1].split("|")
media_id = id_strs[0]
@@ -1940,6 +1974,31 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
)
return
@staticmethod
def build_failed_transfer_buttons(
history_id: Optional[int],
) -> Optional[List[List[dict]]]:
"""
构建整理失败通知的操作按钮。
"""
if not history_id:
return None
return [
[
{"text": "重试", "callback_data": f"transfer_retry_{history_id}"},
{
"text": "智能助手接管",
"callback_data": f"transfer_ai_retry_{history_id}",
},
]
]
def redo_transfer_history(self, history_id: int) -> Tuple[bool, str]:
"""
按历史记录直接重新整理,自动重新识别媒体信息。
"""
return self.__re_transfer(logid=history_id)
def __re_transfer(
self, logid: int, mtype: MediaType = None, mediaid: Optional[str] = None
) -> Tuple[bool, str]:

View File

@@ -0,0 +1,98 @@
import unittest
import sys
from types import ModuleType
from unittest.mock import patch
sys.modules.setdefault("qbittorrentapi", ModuleType("qbittorrentapi"))
setattr(sys.modules["qbittorrentapi"], "TorrentFilesList", list)
sys.modules.setdefault("transmission_rpc", ModuleType("transmission_rpc"))
setattr(sys.modules["transmission_rpc"], "File", object)
sys.modules.setdefault("psutil", ModuleType("psutil"))
from app.chain.message import MessageChain
from app.chain.transfer import TransferChain
from app.core.config import settings
from app.schemas.types import MessageChannel
class TestTransferFailedRetryButtons(unittest.TestCase):
def test_build_failed_transfer_buttons(self):
buttons = TransferChain.build_failed_transfer_buttons(12)
self.assertEqual(
buttons,
[
[
{"text": "重试", "callback_data": "transfer_retry_12"},
{
"text": "智能助手接管",
"callback_data": "transfer_ai_retry_12",
},
]
],
)
def test_remote_transfer_supports_history_only_retry(self):
chain = TransferChain()
with patch.object(chain, "redo_transfer_history", return_value=(True, "")) as redo:
with patch.object(chain, "post_message") as post_message:
chain.remote_transfer(
"12",
channel=MessageChannel.Telegram,
userid="10001",
source="telegram-test",
)
redo.assert_called_once_with(12)
post_message.assert_not_called()
def test_transfer_retry_callback_retries_history(self):
chain = MessageChain()
with patch("app.chain.message.TransferChain") as transfer_cls:
transfer_cls.return_value.redo_transfer_history.return_value = (True, "")
with patch.object(chain, "post_message") as post_message:
chain._handle_callback(
text="CALLBACK:transfer_retry_12",
channel=MessageChannel.Telegram,
source="telegram-test",
userid="10001",
username="tester",
)
transfer_cls.return_value.redo_transfer_history.assert_called_once_with(12)
self.assertEqual(post_message.call_count, 2)
self.assertEqual(
post_message.call_args_list[0].args[0].title,
"开始重新整理记录 #12 ...",
)
self.assertEqual(
post_message.call_args_list[1].args[0].title,
"整理记录 #12 已重新整理",
)
def test_transfer_ai_retry_callback_schedules_agent_takeover(self):
chain = MessageChain()
with patch.object(settings, "AI_AGENT_ENABLE", True):
with patch("app.chain.message.asyncio.run_coroutine_threadsafe") as run_task:
with patch.object(chain, "post_message") as post_message:
chain._handle_callback(
text="CALLBACK:transfer_ai_retry_34",
channel=MessageChannel.Telegram,
source="telegram-test",
userid="10001",
username="tester",
)
run_task.assert_called_once()
self.assertEqual(post_message.call_count, 1)
self.assertEqual(
post_message.call_args_list[0].args[0].title,
"已将整理记录 #34 交给智能助手处理",
)
if __name__ == "__main__":
unittest.main()