mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-05-08 14:23:27 +08:00
2664 lines
93 KiB
Python
2664 lines
93 KiB
Python
import asyncio
|
||
import base64
|
||
import math
|
||
import mimetypes
|
||
import re
|
||
import time
|
||
import uuid
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
from typing import Any, Optional, Dict, Union, List, Tuple
|
||
from urllib.parse import unquote, urlparse
|
||
|
||
from app.agent import ReplyMode, agent_manager, prompt_manager
|
||
from app.agent.llm import LLMHelper
|
||
from app.chain import ChainBase
|
||
from app.chain.download import DownloadChain
|
||
from app.chain.media import MediaChain
|
||
from app.chain.search import SearchChain
|
||
from app.chain.site import SiteChain, site_interaction_manager
|
||
from app.chain.skills import SkillsChain, skills_interaction_manager
|
||
from app.chain.subscribe import SubscribeChain, subscribe_interaction_manager
|
||
from app.chain.transfer import TransferChain
|
||
from app.core.config import settings, global_vars
|
||
from app.core.context import MediaInfo, Context
|
||
from app.core.meta import MetaBase
|
||
from app.db.models import TransferHistory
|
||
from app.db.transferhistory_oper import TransferHistoryOper
|
||
from app.db.user_oper import UserOper
|
||
from app.helper.interaction import agent_interaction_manager, media_interaction_manager, PendingMediaInteraction
|
||
from app.helper.torrent import TorrentHelper
|
||
from app.helper.voice import VoiceHelper
|
||
from app.log import logger
|
||
from app.schemas import Notification, CommingMessage, NotExistMediaInfo
|
||
from app.schemas.message import ChannelCapabilityManager
|
||
from app.schemas.types import EventType, MessageChannel, MediaType
|
||
from app.utils.http import RequestUtils
|
||
from app.utils.string import StringUtils
|
||
|
||
|
||
class MessageChain(ChainBase):
|
||
"""
|
||
外来消息处理链
|
||
"""
|
||
|
||
# 用户会话信息 {userid: (session_id, last_time)}
|
||
_user_sessions: Dict[Union[str, int], tuple] = {}
|
||
# 会话超时时间(分钟)
|
||
_session_timeout_minutes: int = 24 * 60
|
||
|
||
def process(self, body: Any, form: Any, args: Any) -> None:
|
||
"""
|
||
调用模块识别消息内容
|
||
"""
|
||
# 消息来源
|
||
source = args.get("source")
|
||
# 获取消息内容
|
||
info = self.message_parser(source=source, body=body, form=form, args=args)
|
||
if not info:
|
||
logger.info("消息链路未识别到有效消息: source=%s", source)
|
||
return
|
||
# 更新消息来源
|
||
source = info.source
|
||
# 渠道
|
||
channel = info.channel
|
||
# 用户ID
|
||
userid = info.userid
|
||
# 用户名(当渠道未提供公开用户名时,回退为 userid 的字符串,避免后续类型校验异常)
|
||
username = (
|
||
str(info.username) if info.username not in (None, "") else str(userid)
|
||
)
|
||
if userid is None or userid == "":
|
||
logger.debug(f"未识别到用户ID:{body}{form}{args}")
|
||
return
|
||
|
||
# 消息内容
|
||
text = str(info.text).strip() if info.text else ""
|
||
images = info.images
|
||
audio_refs = info.audio_refs
|
||
files = info.files
|
||
if not text and not images and not audio_refs and not files:
|
||
logger.debug(f"未识别到消息内容::{body}{form}{args}")
|
||
return
|
||
|
||
# 获取原消息ID信息
|
||
original_message_id = info.message_id
|
||
original_chat_id = info.chat_id
|
||
|
||
# 处理消息
|
||
self.handle_message(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
text=text,
|
||
original_message_id=original_message_id,
|
||
original_chat_id=original_chat_id,
|
||
images=images,
|
||
audio_refs=audio_refs,
|
||
files=files,
|
||
)
|
||
|
||
def handle_message(
|
||
self,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
username: str,
|
||
text: str,
|
||
original_message_id: Optional[Union[str, int]] = None,
|
||
original_chat_id: Optional[str] = None,
|
||
images: Optional[List[CommingMessage.MessageImage]] = None,
|
||
audio_refs: Optional[List[str]] = None,
|
||
files: Optional[List[CommingMessage.MessageAttachment]] = None,
|
||
) -> None:
|
||
"""
|
||
识别消息内容,执行操作
|
||
"""
|
||
images = CommingMessage.MessageImage.normalize_list(images)
|
||
|
||
# 语音输入只用于转写为文本,不默认改变回复形式。
|
||
has_audio_input = bool(audio_refs)
|
||
if audio_refs:
|
||
transcript = self._transcribe_audio_refs(audio_refs, channel, source)
|
||
merged_parts = []
|
||
seen_parts = set()
|
||
for item in [text.strip() if text else "", transcript or ""]:
|
||
normalized = item.strip()
|
||
if not normalized or normalized in seen_parts:
|
||
continue
|
||
seen_parts.add(normalized)
|
||
merged_parts.append(normalized)
|
||
text = "\n".join(merged_parts).strip()
|
||
if not text:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="语音识别失败,请稍后重试",
|
||
)
|
||
)
|
||
return
|
||
|
||
if not text.startswith("CALLBACK:"):
|
||
self._record_user_message(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
text=text,
|
||
)
|
||
|
||
if text.startswith("CALLBACK:"):
|
||
if ChannelCapabilityManager.supports_callbacks(channel):
|
||
self._handle_callback(
|
||
text=text,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
original_message_id=original_message_id,
|
||
original_chat_id=original_chat_id,
|
||
)
|
||
else:
|
||
logger.warning(
|
||
"渠道 %s 不支持回调,但收到了回调消息:%s",
|
||
channel.value,
|
||
text,
|
||
)
|
||
return
|
||
|
||
if text.startswith("/") and not text.lower().startswith("/ai"):
|
||
self.eventmanager.send_event(
|
||
EventType.CommandExcute,
|
||
{"cmd": text, "user": userid, "channel": channel, "source": source},
|
||
)
|
||
return
|
||
|
||
latest_slash_interaction = self._get_latest_slash_interaction(userid)
|
||
if latest_slash_interaction == "sites":
|
||
if SiteChain().handle_text_interaction(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
text=text,
|
||
):
|
||
return
|
||
|
||
if latest_slash_interaction == "subscribes":
|
||
if SubscribeChain().handle_text_interaction(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
text=text,
|
||
):
|
||
return
|
||
|
||
if latest_slash_interaction == "skills":
|
||
if SkillsChain().handle_text_interaction(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
text=text,
|
||
):
|
||
return
|
||
|
||
if media_interaction_manager.get_by_user(userid):
|
||
if MediaInteractionChain().handle_text_interaction(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
text=text,
|
||
):
|
||
return
|
||
|
||
if text.lower().startswith("/ai"):
|
||
self._handle_ai_message(
|
||
text=text,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
images=images,
|
||
files=files,
|
||
)
|
||
return
|
||
|
||
if (
|
||
settings.AI_AGENT_ENABLE
|
||
and (settings.AI_AGENT_GLOBAL or images or files or has_audio_input)
|
||
):
|
||
self._handle_ai_message(
|
||
text=text,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
images=images,
|
||
files=files,
|
||
)
|
||
return
|
||
|
||
if MediaInteractionChain().handle_text_interaction(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
text=text,
|
||
):
|
||
return
|
||
|
||
self.eventmanager.send_event(
|
||
EventType.UserMessage,
|
||
{
|
||
"text": text,
|
||
"userid": userid,
|
||
"channel": channel,
|
||
"source": source,
|
||
},
|
||
)
|
||
|
||
def _handle_callback(
|
||
self,
|
||
text: str,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
username: str,
|
||
original_message_id: Optional[Union[str, int]] = None,
|
||
original_chat_id: Optional[str] = None,
|
||
) -> None:
|
||
"""
|
||
处理按钮回调
|
||
"""
|
||
|
||
# 提取回调数据
|
||
callback_data = text[9:] # 去掉 "CALLBACK:" 前缀
|
||
logger.info(f"处理按钮回调:{callback_data}")
|
||
|
||
if self._handle_transfer_callback(
|
||
callback_data=callback_data,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
):
|
||
return
|
||
|
||
if SkillsChain().handle_callback_interaction(
|
||
callback_data=callback_data,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
original_message_id=original_message_id,
|
||
original_chat_id=original_chat_id,
|
||
):
|
||
return
|
||
|
||
if SiteChain().handle_callback_interaction(
|
||
callback_data=callback_data,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
original_message_id=original_message_id,
|
||
original_chat_id=original_chat_id,
|
||
):
|
||
return
|
||
|
||
if SubscribeChain().handle_callback_interaction(
|
||
callback_data=callback_data,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
original_message_id=original_message_id,
|
||
original_chat_id=original_chat_id,
|
||
):
|
||
return
|
||
|
||
if MediaInteractionChain().handle_callback_interaction(
|
||
callback_data=callback_data,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
original_message_id=original_message_id,
|
||
original_chat_id=original_chat_id,
|
||
):
|
||
return
|
||
|
||
if self._handle_agent_choice_callback(
|
||
callback_data=callback_data,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
original_message_id=original_message_id,
|
||
original_chat_id=original_chat_id,
|
||
):
|
||
return
|
||
|
||
# 插件消息的事件回调 [PLUGIN]插件ID|内容
|
||
if callback_data.startswith("[PLUGIN]"):
|
||
# 提取插件ID和内容
|
||
plugin_id, content = callback_data.split("|", 1)
|
||
# 广播给插件处理
|
||
self.eventmanager.send_event(
|
||
EventType.MessageAction,
|
||
{
|
||
"plugin_id": plugin_id.replace("[PLUGIN]", ""),
|
||
"text": content,
|
||
"userid": userid,
|
||
"channel": channel,
|
||
"source": source,
|
||
"original_message_id": original_message_id,
|
||
"original_chat_id": original_chat_id,
|
||
},
|
||
)
|
||
return
|
||
|
||
logger.error(f"回调数据格式错误:{callback_data}")
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="回调数据格式错误,请检查!",
|
||
)
|
||
)
|
||
|
||
@staticmethod
|
||
def _get_latest_slash_interaction(userid: Union[str, int]) -> Optional[str]:
|
||
"""
|
||
返回当前用户最近一次激活的 slash 交互类型。
|
||
"""
|
||
candidates = []
|
||
for name, manager in (
|
||
("sites", site_interaction_manager),
|
||
("subscribes", subscribe_interaction_manager),
|
||
("skills", skills_interaction_manager),
|
||
):
|
||
request = manager.get_by_user(userid)
|
||
if request:
|
||
candidates.append((request.created_at, name))
|
||
if not candidates:
|
||
return None
|
||
return max(candidates, key=lambda item: item[0])[1]
|
||
|
||
@staticmethod
|
||
def _parse_transfer_callback(
|
||
callback_data: str,
|
||
) -> Optional[tuple[str, int]]:
|
||
"""
|
||
解析整理失败通知按钮回调。
|
||
"""
|
||
for prefix, action in (
|
||
("transfer_retry_", "retry"),
|
||
("transfer_ai_retry_", "ai_retry"),
|
||
):
|
||
if callback_data.startswith(prefix):
|
||
history_id = callback_data.replace(prefix, "", 1)
|
||
if history_id.isdigit():
|
||
return action, int(history_id)
|
||
return None
|
||
|
||
def _handle_transfer_callback(
|
||
self,
|
||
callback_data: str,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
username: str,
|
||
) -> bool:
|
||
"""
|
||
处理整理失败通知中的重试类按钮。
|
||
"""
|
||
callback = self._parse_transfer_callback(callback_data)
|
||
if not callback:
|
||
return False
|
||
|
||
action, history_id = callback
|
||
if action == "retry":
|
||
self._retry_transfer_history(
|
||
history_id=history_id,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
)
|
||
else:
|
||
self._take_over_transfer_history_by_ai(
|
||
history_id=history_id,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
)
|
||
return True
|
||
|
||
@staticmethod
|
||
def _parse_agent_choice_callback(
|
||
callback_data: str,
|
||
) -> Optional[tuple[str, int]]:
|
||
"""
|
||
解析 Agent 按钮选择回调。
|
||
"""
|
||
if callback_data.startswith("agent_interaction:choice:"):
|
||
try:
|
||
_, _, request_id, option_index = callback_data.split(":", 3)
|
||
except ValueError:
|
||
return None
|
||
elif callback_data.startswith("agent_choice:"):
|
||
# 兼容旧格式,避免已发送的按钮失效
|
||
try:
|
||
_, request_id, option_index = callback_data.split(":", 2)
|
||
except ValueError:
|
||
return None
|
||
else:
|
||
return None
|
||
if not request_id or not option_index.isdigit():
|
||
return None
|
||
return request_id, int(option_index)
|
||
|
||
def _handle_agent_choice_callback(
|
||
self,
|
||
callback_data: str,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
username: str,
|
||
original_message_id: Optional[Union[str, int]] = None,
|
||
original_chat_id: Optional[str] = None,
|
||
) -> bool:
|
||
"""
|
||
将 Agent 按钮选择回传为同一会话中的下一条用户消息。
|
||
"""
|
||
callback = self._parse_agent_choice_callback(callback_data)
|
||
if not callback:
|
||
return False
|
||
|
||
request_id, option_index = callback
|
||
resolved = agent_interaction_manager.resolve(
|
||
request_id=request_id,
|
||
option_index=option_index,
|
||
user_id=str(userid),
|
||
)
|
||
if not resolved:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="该选择已失效,请重新发起选择",
|
||
)
|
||
)
|
||
return True
|
||
|
||
request, option = resolved
|
||
selected_text = option.value
|
||
self._update_interaction_message_feedback(
|
||
channel=channel,
|
||
source=source,
|
||
original_message_id=original_message_id,
|
||
original_chat_id=original_chat_id,
|
||
title=request.title,
|
||
prompt=request.prompt,
|
||
selected_label=option.label,
|
||
)
|
||
self._bind_session_id(userid, request.session_id)
|
||
self._record_user_message(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
text=selected_text,
|
||
)
|
||
self._handle_ai_message(
|
||
text=selected_text,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
session_id=request.session_id,
|
||
)
|
||
return True
|
||
|
||
def _update_interaction_message_feedback(
|
||
self,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
original_message_id: Optional[Union[str, int]],
|
||
original_chat_id: Optional[str],
|
||
prompt: str,
|
||
selected_label: str,
|
||
title: Optional[str] = None,
|
||
) -> None:
|
||
"""
|
||
在用户点击交互按钮后,立即更新原消息,明确显示已选择的内容。
|
||
"""
|
||
if not original_message_id or not original_chat_id:
|
||
return
|
||
|
||
lines = [prompt.strip()]
|
||
if selected_label:
|
||
lines.append(f"已选择:{selected_label}")
|
||
feedback_text = "\n\n".join(line for line in lines if line)
|
||
self.edit_message(
|
||
channel=channel,
|
||
source=source,
|
||
message_id=original_message_id,
|
||
chat_id=original_chat_id,
|
||
title=title,
|
||
text=feedback_text,
|
||
)
|
||
|
||
def _retry_transfer_history(
|
||
self,
|
||
history_id: int,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
username: str,
|
||
) -> None:
|
||
"""
|
||
立即重新整理一条失败的整理记录。
|
||
"""
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title=f"开始重新整理记录 #{history_id} ...",
|
||
)
|
||
)
|
||
|
||
state, errmsg = TransferChain().redo_transfer_history(history_id)
|
||
if state:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title=f"整理记录 #{history_id} 已重新整理",
|
||
link=settings.MP_DOMAIN("#/history"),
|
||
)
|
||
)
|
||
return
|
||
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="重新整理失败",
|
||
text=errmsg,
|
||
link=settings.MP_DOMAIN("#/history"),
|
||
)
|
||
)
|
||
|
||
def _take_over_transfer_history_by_ai(
|
||
self,
|
||
history_id: int,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
username: str,
|
||
) -> None:
|
||
"""
|
||
由智能助手接管一条失败的整理记录。
|
||
"""
|
||
|
||
def __build_manual_redo_prompt(his: TransferHistory) -> str:
|
||
"""构建手动 AI 整理提示词。"""
|
||
|
||
src_fileitem = his.src_fileitem or {}
|
||
source_path = src_fileitem.get("path") if isinstance(src_fileitem, dict) else ""
|
||
source_path = source_path or his.src or ""
|
||
season_episode = f"{his.seasons or ''}{his.episodes or ''}".strip()
|
||
template_context = {
|
||
"his_id": his.id,
|
||
"current_status": "success" if his.status else "failed",
|
||
"recognized_title": his.title or "unknown",
|
||
"media_type": his.type or "unknown",
|
||
"category": his.category or "unknown",
|
||
"year": his.year or "unknown",
|
||
"season_episode": season_episode or "unknown",
|
||
"source_path": source_path or "unknown",
|
||
"source_storage": his.src_storage or "local",
|
||
"destination_path": his.dest or "unknown",
|
||
"destination_storage": his.dest_storage or "unknown",
|
||
"transfer_mode": his.mode or "unknown",
|
||
"tmdbid": his.tmdbid or "none",
|
||
"doubanid": his.doubanid or "none",
|
||
"error_message": his.errmsg or "none",
|
||
}
|
||
return prompt_manager.render_system_task_message(
|
||
"manual_transfer_redo",
|
||
template_context=template_context,
|
||
)
|
||
|
||
if not settings.AI_AGENT_ENABLE:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="MoviePilot智能助手未启用,请在系统设置中启用",
|
||
)
|
||
)
|
||
return
|
||
|
||
history = TransferHistoryOper().get(history_id)
|
||
if not history:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="重新整理失败",
|
||
text=f"整理记录 #{history_id} 不存在",
|
||
link=settings.MP_DOMAIN("#/history"),
|
||
)
|
||
)
|
||
return
|
||
|
||
redo_prompt = __build_manual_redo_prompt(history)
|
||
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title=f"已将整理记录 #{history_id} 交给智能助手处理",
|
||
text="处理完成后会在这里回复结果。",
|
||
link=settings.MP_DOMAIN("#/history"),
|
||
)
|
||
)
|
||
|
||
async def _run_ai_takeover():
|
||
final_output = ""
|
||
|
||
def _capture_output(text_output: str):
|
||
nonlocal final_output
|
||
final_output = text_output or ""
|
||
|
||
try:
|
||
await agent_manager.run_background_prompt(
|
||
message=redo_prompt,
|
||
session_prefix=f"__agent_manual_redo_{history_id}",
|
||
output_callback=_capture_output,
|
||
reply_mode=ReplyMode.CAPTURE_ONLY,
|
||
persist_output_message=False,
|
||
allow_message_tools=False,
|
||
)
|
||
await self.async_post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="智能助手整理完成",
|
||
text=final_output.strip()
|
||
or f"整理记录 #{history_id} 已由智能助手处理完成。",
|
||
link=settings.MP_DOMAIN("#/history"),
|
||
)
|
||
)
|
||
except Exception as e:
|
||
await self.async_post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="智能助手整理失败",
|
||
text=str(e),
|
||
link=settings.MP_DOMAIN("#/history"),
|
||
)
|
||
)
|
||
|
||
asyncio.run_coroutine_threadsafe(_run_ai_takeover(), global_vars.loop)
|
||
|
||
def _get_or_create_session_id(self, userid: Union[str, int]) -> str:
|
||
"""
|
||
获取或创建会话ID
|
||
如果用户上次会话在15分钟内,则复用相同的会话ID;否则创建新的会话ID
|
||
"""
|
||
current_time = datetime.now()
|
||
|
||
# 检查用户是否有已存在的会话
|
||
if userid in self._user_sessions:
|
||
session_id, last_time = self._user_sessions[userid]
|
||
|
||
# 计算时间差
|
||
time_diff = current_time - last_time
|
||
|
||
# 如果时间差小于等于xx分钟,复用会话ID
|
||
if time_diff <= timedelta(minutes=self._session_timeout_minutes):
|
||
# 更新最后使用时间
|
||
self._user_sessions[userid] = (session_id, current_time)
|
||
logger.info(
|
||
f"复用会话ID: {session_id}, 用户: {userid}, 距离上次会话: {time_diff.total_seconds() / 60:.1f}分钟"
|
||
)
|
||
return session_id
|
||
|
||
# 创建新的会话ID
|
||
new_session_id = f"user_{userid}_{int(time.time())}"
|
||
self._user_sessions[userid] = (new_session_id, current_time)
|
||
logger.info(f"创建新会话ID: {new_session_id}, 用户: {userid}")
|
||
return new_session_id
|
||
|
||
def _bind_session_id(self, userid: Union[str, int], session_id: str) -> None:
|
||
"""
|
||
将用户会话绑定到指定的 session_id,并刷新最后活动时间。
|
||
"""
|
||
self._user_sessions[userid] = (session_id, datetime.now())
|
||
|
||
def _record_user_message(
|
||
self,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
username: str,
|
||
text: str,
|
||
) -> None:
|
||
"""
|
||
保存一条用户消息到消息历史与数据库。
|
||
"""
|
||
self.messagehelper.put(
|
||
CommingMessage(
|
||
userid=userid,
|
||
username=username,
|
||
channel=channel,
|
||
source=source,
|
||
text=text,
|
||
),
|
||
role="user",
|
||
)
|
||
self.messageoper.add(
|
||
channel=channel,
|
||
source=source,
|
||
userid=username or userid,
|
||
text=text,
|
||
action=0,
|
||
)
|
||
|
||
def clear_user_session(self, userid: Union[str, int]) -> bool:
|
||
"""
|
||
清除指定用户的会话信息
|
||
返回是否成功清除
|
||
"""
|
||
if userid in self._user_sessions:
|
||
session_id, _ = self._user_sessions.pop(userid)
|
||
logger.info(f"已清除用户 {userid} 的会话: {session_id}")
|
||
return True
|
||
return False
|
||
|
||
def remote_clear_session(
|
||
self,
|
||
channel: MessageChannel,
|
||
userid: Union[str, int],
|
||
source: Optional[str] = None,
|
||
):
|
||
"""
|
||
清除用户会话(远程命令接口)
|
||
"""
|
||
# 获取并清除会话信息
|
||
session_id = None
|
||
if userid in self._user_sessions:
|
||
session_id, _ = self._user_sessions.pop(userid)
|
||
logger.info(f"已清除用户 {userid} 的会话: {session_id}")
|
||
|
||
# 如果有会话ID,同时清除智能体的会话记忆
|
||
if session_id:
|
||
try:
|
||
asyncio.run_coroutine_threadsafe(
|
||
agent_manager.clear_session(
|
||
session_id=session_id, user_id=str(userid)
|
||
),
|
||
global_vars.loop,
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"清除智能体会话记忆失败: {e}")
|
||
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
title="智能体会话已清除,下次将创建新的会话",
|
||
userid=userid,
|
||
)
|
||
)
|
||
else:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
title="您当前没有活跃的智能体会话",
|
||
userid=userid,
|
||
)
|
||
)
|
||
|
||
def remote_stop_agent(
|
||
self,
|
||
channel: MessageChannel,
|
||
userid: Union[str, int],
|
||
source: Optional[str] = None,
|
||
):
|
||
"""
|
||
应急停止当前正在执行的Agent推理(远程命令接口)。
|
||
与 /clear_session 不同,此命令不会清除会话和记忆,
|
||
停止后用户仍可继续对话。
|
||
"""
|
||
# 查找用户的会话ID(不弹出,保留会话)
|
||
session_info = self._user_sessions.get(userid)
|
||
if session_info:
|
||
session_id, _ = session_info
|
||
try:
|
||
future = asyncio.run_coroutine_threadsafe(
|
||
agent_manager.stop_current_task(session_id=session_id),
|
||
global_vars.loop,
|
||
)
|
||
stopped = future.result(timeout=10)
|
||
except Exception as e:
|
||
logger.warning(f"停止Agent推理失败: {e}")
|
||
stopped = False
|
||
|
||
if stopped:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
title="智能体推理已应急停止,会话记忆已保留,您可以继续对话",
|
||
userid=userid,
|
||
)
|
||
)
|
||
else:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
title="当前没有正在执行的智能体任务",
|
||
userid=userid,
|
||
)
|
||
)
|
||
else:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
title="您当前没有活跃的智能体会话",
|
||
userid=userid,
|
||
)
|
||
)
|
||
|
||
@staticmethod
|
||
def _format_token_count(value: Optional[int]) -> str:
|
||
return f"{value:,}" if value is not None else "未知"
|
||
|
||
@classmethod
|
||
def _format_session_status_text(cls, status: Dict[str, Any]) -> str:
|
||
context_window_tokens = status.get("context_window_tokens")
|
||
last_input_tokens = status.get("last_input_tokens")
|
||
if context_window_tokens and status.get("model_call_count"):
|
||
context_ratio = status.get("last_context_usage_ratio")
|
||
if context_ratio is None and last_input_tokens is not None:
|
||
context_ratio = last_input_tokens / context_window_tokens
|
||
context_usage_text = (
|
||
f"{cls._format_token_count(last_input_tokens)} / "
|
||
f"{cls._format_token_count(context_window_tokens)} "
|
||
f"({context_ratio * 100:.2f}%)"
|
||
if context_ratio is not None
|
||
else f"{cls._format_token_count(last_input_tokens)} / "
|
||
f"{cls._format_token_count(context_window_tokens)}"
|
||
)
|
||
else:
|
||
context_usage_text = "暂无模型调用数据"
|
||
|
||
lines = [
|
||
f"会话ID: {status.get('session_id') or '未知'}",
|
||
f"执行状态: {'运行中' if status.get('is_processing') else '空闲'}",
|
||
f"当前模型: {status.get('model') or '未知'}",
|
||
f"上下文窗口: {cls._format_token_count(context_window_tokens)} tokens",
|
||
f"最近一次上下文占用: {context_usage_text}",
|
||
f"最近一次 tokens: 输入 {cls._format_token_count(status.get('last_input_tokens'))} / 输出 {cls._format_token_count(status.get('last_output_tokens'))} / 总计 {cls._format_token_count(status.get('last_total_tokens'))}",
|
||
f"当前会话累计 tokens: 输入 {cls._format_token_count(status.get('total_input_tokens'))} / 输出 {cls._format_token_count(status.get('total_output_tokens'))} / 总计 {cls._format_token_count(status.get('total_tokens'))}",
|
||
f"模型调用次数: {status.get('model_call_count', 0)}",
|
||
f"排队消息数: {status.get('pending_messages', 0)}",
|
||
f"最后更新: {status.get('last_updated_at') or '暂无'}",
|
||
]
|
||
return "\n".join(lines)
|
||
|
||
def remote_session_status(
|
||
self,
|
||
channel: MessageChannel,
|
||
userid: Union[str, int],
|
||
source: Optional[str] = None,
|
||
):
|
||
"""查询当前用户的智能体会话状态。"""
|
||
session_info = self._user_sessions.get(userid)
|
||
if not session_info:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
title="您当前没有活跃的智能体会话",
|
||
userid=userid,
|
||
)
|
||
)
|
||
return
|
||
|
||
session_id, _ = session_info
|
||
status = agent_manager.get_session_status(session_id=session_id)
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
title="当前智能体会话状态",
|
||
text=self._format_session_status_text(status),
|
||
userid=userid,
|
||
)
|
||
)
|
||
|
||
def _handle_ai_message(
|
||
self,
|
||
text: str,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
username: str,
|
||
images: Optional[List[CommingMessage.MessageImage]] = None,
|
||
files: Optional[List[CommingMessage.MessageAttachment]] = None,
|
||
session_id: Optional[str] = None,
|
||
) -> None:
|
||
"""
|
||
处理AI智能体消息
|
||
"""
|
||
try:
|
||
# 检查AI智能体是否启用
|
||
if not settings.AI_AGENT_ENABLE:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="MoviePilot智能助手未启用,请在系统设置中启用",
|
||
)
|
||
)
|
||
return
|
||
|
||
images = CommingMessage.MessageImage.normalize_list(images)
|
||
|
||
# 提取用户消息
|
||
if text.lower().startswith("/ai"):
|
||
user_message = text[3:].strip() # 移除 "/ai" 前缀(大小写不敏感)
|
||
else:
|
||
user_message = text.strip() # 按原消息处理
|
||
|
||
if not user_message and not images and not files:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="请输入您的问题或需求",
|
||
)
|
||
)
|
||
return
|
||
|
||
# 生成或复用会话ID
|
||
session_id = session_id or self._get_or_create_session_id(userid)
|
||
self._bind_session_id(userid, session_id)
|
||
|
||
# 将可直接输入给 LLM 的附件统一转换为 data URL
|
||
original_images = images
|
||
all_files = list(files or [])
|
||
if images and LLMHelper.supports_image_input():
|
||
images = self._download_attachments_to_data_urls(
|
||
images, channel, source
|
||
)
|
||
if original_images and not images and not user_message and not files:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="附件读取失败,请稍后重试",
|
||
)
|
||
)
|
||
return
|
||
elif images:
|
||
image_attachments = self._build_image_attachments(images)
|
||
if (
|
||
original_images
|
||
and not image_attachments
|
||
and not user_message
|
||
and not files
|
||
):
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="附件读取失败,请稍后重试",
|
||
)
|
||
)
|
||
return
|
||
all_files.extend(image_attachments)
|
||
images = None
|
||
|
||
prepared_files = self._prepare_agent_files(
|
||
session_id=session_id,
|
||
files=all_files,
|
||
channel=channel,
|
||
source=source,
|
||
)
|
||
if all_files and not prepared_files and not user_message and not images:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="文件读取失败,请稍后重试",
|
||
)
|
||
)
|
||
return
|
||
|
||
# 在事件循环中处理
|
||
asyncio.run_coroutine_threadsafe(
|
||
agent_manager.process_message(
|
||
session_id=session_id,
|
||
user_id=str(userid),
|
||
message=user_message,
|
||
images=images,
|
||
files=prepared_files,
|
||
channel=channel.value if channel else None,
|
||
source=source,
|
||
username=username,
|
||
),
|
||
global_vars.loop,
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理AI智能体消息失败: {e}")
|
||
self.messagehelper.put(
|
||
f"AI智能体处理失败: {str(e)}", role="system", title="MoviePilot助手"
|
||
)
|
||
|
||
def _transcribe_audio_refs(
|
||
self, audio_refs: List[str], channel: MessageChannel, source: str
|
||
) -> Optional[str]:
|
||
"""
|
||
下载并识别语音消息,仅处理当前已接入的渠道。
|
||
"""
|
||
if not audio_refs:
|
||
return None
|
||
if not VoiceHelper.is_available("stt"):
|
||
logger.warning("语音能力未配置,跳过语音识别")
|
||
return None
|
||
|
||
transcripts = []
|
||
for audio_ref in audio_refs:
|
||
try:
|
||
if audio_ref.startswith("tg://voice_file_id/"):
|
||
file_id = audio_ref.replace("tg://voice_file_id/", "", 1)
|
||
content = self.run_module(
|
||
"download_telegram_file_bytes", file_id=file_id, source=source
|
||
)
|
||
filename = "input.ogg"
|
||
elif audio_ref.startswith("tg://audio_file_id/"):
|
||
file_id = audio_ref.replace("tg://audio_file_id/", "", 1)
|
||
content = self.run_module(
|
||
"download_telegram_file_bytes", file_id=file_id, source=source
|
||
)
|
||
filename = "input.mp3"
|
||
elif audio_ref.startswith("wxwork://voice_media_id/"):
|
||
content = self.run_module(
|
||
"download_wechat_media_bytes",
|
||
media_ref=audio_ref,
|
||
source=source,
|
||
)
|
||
filename = "input.amr"
|
||
elif audio_ref.startswith("slack://file/"):
|
||
content = self.run_module(
|
||
"download_slack_file_bytes", file_ref=audio_ref, source=source
|
||
)
|
||
filename = self._guess_audio_filename(
|
||
audio_ref, default="input.ogg"
|
||
)
|
||
elif audio_ref.startswith("discord://file/"):
|
||
content = self.run_module(
|
||
"download_discord_file_bytes", file_ref=audio_ref, source=source
|
||
)
|
||
filename = self._guess_audio_filename(
|
||
audio_ref, default="input.ogg"
|
||
)
|
||
elif audio_ref.startswith("qq://file/"):
|
||
content = self.run_module(
|
||
"download_qq_file_bytes", file_ref=audio_ref, source=source
|
||
)
|
||
filename = self._guess_audio_filename(
|
||
audio_ref, default="input.ogg"
|
||
)
|
||
elif audio_ref.startswith("vocechat://file/"):
|
||
content = self.run_module(
|
||
"download_vocechat_file_bytes",
|
||
file_ref=audio_ref,
|
||
source=source,
|
||
)
|
||
filename = self._guess_audio_filename(
|
||
audio_ref, default="input.ogg"
|
||
)
|
||
elif audio_ref.startswith("synology://file/"):
|
||
content = self.run_module(
|
||
"download_synologychat_file_bytes",
|
||
file_ref=audio_ref,
|
||
source=source,
|
||
)
|
||
filename = self._guess_audio_filename(
|
||
audio_ref, default="input.ogg"
|
||
)
|
||
elif audio_ref.startswith("wxbot://voice"):
|
||
continue
|
||
elif audio_ref.startswith("http"):
|
||
resp = RequestUtils(timeout=30).get_res(audio_ref)
|
||
content = resp.content if resp and resp.content else None
|
||
filename = self._guess_audio_filename(
|
||
audio_ref, default="input.ogg"
|
||
)
|
||
else:
|
||
logger.debug(
|
||
"暂不支持的语音引用: channel=%s, source=%s, ref=%s",
|
||
channel.value if channel else None,
|
||
source,
|
||
audio_ref,
|
||
)
|
||
continue
|
||
|
||
if not content:
|
||
logger.warning(
|
||
"语音下载失败,跳过识别: channel=%s, source=%s, ref=%s",
|
||
channel.value if channel else None,
|
||
source,
|
||
audio_ref,
|
||
)
|
||
continue
|
||
|
||
transcript = VoiceHelper.transcribe_bytes(
|
||
content=content, filename=filename
|
||
)
|
||
if transcript:
|
||
transcripts.append(transcript)
|
||
logger.info(
|
||
"语音识别成功: channel=%s, source=%s, ref=%s, text_len=%s",
|
||
channel.value if channel else None,
|
||
source,
|
||
audio_ref,
|
||
len(transcript),
|
||
)
|
||
except Exception as err:
|
||
logger.error(f"语音识别失败: {err}")
|
||
|
||
return "\n".join(transcripts).strip() if transcripts else None
|
||
|
||
@staticmethod
|
||
def _guess_audio_filename(audio_ref: str, default: str = "input.ogg") -> str:
|
||
"""
|
||
根据引用中的扩展名推测音频文件名,便于 STT 服务识别格式。
|
||
"""
|
||
if not audio_ref:
|
||
return default
|
||
raw_ref = unquote(audio_ref).split("?", 1)[0].split("#", 1)[0]
|
||
match = re.search(
|
||
r"([^/]+\.(mp3|m4a|wav|ogg|oga|opus|aac|amr|flac|mpga|mpeg|webm))$",
|
||
raw_ref,
|
||
flags=re.IGNORECASE,
|
||
)
|
||
if match:
|
||
return match.group(1)
|
||
return default
|
||
|
||
def _download_attachments_to_data_urls(
|
||
self,
|
||
attachments: List[CommingMessage.MessageImage],
|
||
channel: MessageChannel,
|
||
source: str,
|
||
) -> Optional[List[str]]:
|
||
"""
|
||
下载可直接提供给 LLM 的附件内容,并统一转换为 data URL。
|
||
"""
|
||
attachments = CommingMessage.MessageImage.normalize_list(attachments)
|
||
if not attachments:
|
||
return None
|
||
data_urls = []
|
||
for attachment in attachments:
|
||
attachment_ref = attachment.ref
|
||
try:
|
||
before_count = len(data_urls)
|
||
if attachment_ref.startswith("data:"):
|
||
data_urls.append(attachment_ref)
|
||
elif attachment_ref.startswith("tg://file_id/"):
|
||
file_id = attachment_ref.replace("tg://file_id/", "")
|
||
base64_data = self.run_module(
|
||
"download_telegram_file_to_base64",
|
||
file_id=file_id,
|
||
source=source,
|
||
)
|
||
if base64_data:
|
||
data_urls.append(f"data:image/jpeg;base64,{base64_data}")
|
||
elif attachment_ref.startswith(
|
||
"wxwork://media_id/"
|
||
) or attachment_ref.startswith(
|
||
"wxbot://image/"
|
||
):
|
||
data_url = self.run_module(
|
||
"download_wechat_image_to_data_url",
|
||
image_ref=attachment_ref,
|
||
source=source,
|
||
)
|
||
if data_url:
|
||
data_urls.append(data_url)
|
||
elif channel == MessageChannel.Slack:
|
||
data_url = self.run_module(
|
||
"download_slack_file_to_data_url",
|
||
file_url=attachment_ref,
|
||
source=source,
|
||
)
|
||
if data_url:
|
||
data_urls.append(data_url)
|
||
elif attachment_ref.startswith("vocechat://file/"):
|
||
data_url = self.run_module(
|
||
"download_vocechat_image_to_data_url",
|
||
image_ref=attachment_ref,
|
||
source=source,
|
||
)
|
||
if data_url:
|
||
data_urls.append(data_url)
|
||
elif attachment_ref.startswith("http"):
|
||
resp = RequestUtils(timeout=30).get_res(attachment_ref)
|
||
if resp and resp.content:
|
||
base64_data = base64.b64encode(resp.content).decode()
|
||
mime_type = resp.headers.get("Content-Type", "image/jpeg")
|
||
data_urls.append(f"data:{mime_type};base64,{base64_data}")
|
||
else:
|
||
logger.debug(
|
||
"暂不支持直接转换为 data URL 的附件引用: channel=%s, source=%s, ref=%s",
|
||
channel.value if channel else None,
|
||
source,
|
||
attachment_ref,
|
||
)
|
||
continue
|
||
|
||
if len(data_urls) > before_count:
|
||
logger.info(
|
||
"附件读取成功并已转换为 data URL: channel=%s, source=%s, ref=%s, mime_type=%s",
|
||
channel.value if channel else None,
|
||
source,
|
||
attachment_ref,
|
||
attachment.mime_type,
|
||
)
|
||
except Exception as err:
|
||
logger.error(
|
||
"附件读取失败,无法转换为 data URL: channel=%s, source=%s, ref=%s, error=%s",
|
||
channel.value if channel else None,
|
||
source,
|
||
attachment_ref,
|
||
err,
|
||
)
|
||
return data_urls if data_urls else None
|
||
|
||
def _build_image_attachments(
|
||
self, images: List[CommingMessage.MessageImage]
|
||
) -> List[CommingMessage.MessageAttachment]:
|
||
"""
|
||
将图片引用转换为附件描述,以便按文件方式交给 Agent 处理。
|
||
"""
|
||
images = CommingMessage.MessageImage.normalize_list(images)
|
||
if not images:
|
||
return []
|
||
|
||
attachments = []
|
||
for index, image in enumerate(images, start=1):
|
||
image_ref = image.ref
|
||
if not image_ref:
|
||
continue
|
||
name = image.name or self._guess_image_attachment_name(image_ref, index)
|
||
mime_type = image.mime_type or self._guess_image_mime_type(image_ref, name)
|
||
attachments.append(
|
||
CommingMessage.MessageAttachment(
|
||
ref=image_ref,
|
||
name=name,
|
||
mime_type=mime_type,
|
||
size=image.size,
|
||
)
|
||
)
|
||
return attachments
|
||
|
||
def _prepare_agent_files(
|
||
self,
|
||
session_id: str,
|
||
files: Optional[List[CommingMessage.MessageAttachment]],
|
||
channel: MessageChannel,
|
||
source: str,
|
||
) -> Optional[List[dict]]:
|
||
"""
|
||
下载用户上传的附件,落盘到临时目录,并生成 Agent 可消费的文件描述。
|
||
"""
|
||
if not files:
|
||
return None
|
||
|
||
prepared_files = []
|
||
for attachment in files:
|
||
payload = {
|
||
"name": attachment.name,
|
||
"mime_type": attachment.mime_type,
|
||
"size": attachment.size,
|
||
"ref": attachment.ref,
|
||
"status": "download_failed",
|
||
}
|
||
try:
|
||
content = self._download_message_file_bytes(
|
||
file_ref=attachment.ref,
|
||
channel=channel,
|
||
source=source,
|
||
)
|
||
if not content:
|
||
prepared_files.append(payload)
|
||
continue
|
||
|
||
local_path = self._save_agent_attachment(
|
||
session_id=session_id,
|
||
filename=attachment.name,
|
||
content=content,
|
||
mime_type=attachment.mime_type,
|
||
)
|
||
payload.update(
|
||
{
|
||
"local_path": str(local_path),
|
||
"status": "ready",
|
||
}
|
||
)
|
||
except Exception as err:
|
||
logger.error(f"准备附件上下文失败: {attachment.ref}, error: {err}")
|
||
payload["error"] = str(err)
|
||
prepared_files.append(payload)
|
||
|
||
return prepared_files or None
|
||
|
||
def _download_message_file_bytes(
|
||
self, file_ref: str, channel: MessageChannel, source: str
|
||
) -> Optional[bytes]:
|
||
"""
|
||
下载消息附件的原始字节内容。
|
||
"""
|
||
if not file_ref:
|
||
return None
|
||
if file_ref.startswith("data:"):
|
||
return self._decode_data_url_bytes(file_ref)
|
||
if file_ref.startswith("tg://file_id/"):
|
||
file_id = file_ref.replace("tg://file_id/", "", 1)
|
||
return self.run_module(
|
||
"download_telegram_file_bytes", file_id=file_id, source=source
|
||
)
|
||
if file_ref.startswith("tg://document_file_id/"):
|
||
file_id = file_ref.replace("tg://document_file_id/", "", 1)
|
||
return self.run_module(
|
||
"download_telegram_file_bytes", file_id=file_id, source=source
|
||
)
|
||
if file_ref.startswith("wxwork://media_id/"):
|
||
return self.run_module(
|
||
"download_wechat_media_bytes", media_ref=file_ref, source=source
|
||
)
|
||
if file_ref.startswith("wxwork://file_media_id/"):
|
||
return self.run_module(
|
||
"download_wechat_media_bytes", media_ref=file_ref, source=source
|
||
)
|
||
if file_ref.startswith("wxbot://image/"):
|
||
data_url = self.run_module(
|
||
"download_wechat_image_to_data_url", image_ref=file_ref, source=source
|
||
)
|
||
return self._decode_data_url_bytes(data_url) if data_url else None
|
||
if file_ref.startswith("wxbot://file/"):
|
||
file_url = unquote(file_ref.replace("wxbot://file/", "", 1))
|
||
resp = RequestUtils(timeout=30).get_res(file_url)
|
||
return resp.content if resp and resp.content else None
|
||
if file_ref.startswith("slack://file/"):
|
||
return self.run_module(
|
||
"download_slack_file_bytes", file_ref=file_ref, source=source
|
||
)
|
||
if file_ref.startswith("discord://file/"):
|
||
return self.run_module(
|
||
"download_discord_file_bytes", file_ref=file_ref, source=source
|
||
)
|
||
if file_ref.startswith("qq://file/"):
|
||
return self.run_module(
|
||
"download_qq_file_bytes", file_ref=file_ref, source=source
|
||
)
|
||
if file_ref.startswith("vocechat://file/"):
|
||
return self.run_module(
|
||
"download_vocechat_file_bytes", file_ref=file_ref, source=source
|
||
)
|
||
if file_ref.startswith("synology://file/"):
|
||
return self.run_module(
|
||
"download_synologychat_file_bytes", file_ref=file_ref, source=source
|
||
)
|
||
if file_ref.startswith("http"):
|
||
if channel == MessageChannel.Slack:
|
||
data_url = self.run_module(
|
||
"download_slack_file_to_data_url", file_url=file_ref, source=source
|
||
)
|
||
return self._decode_data_url_bytes(data_url) if data_url else None
|
||
resp = RequestUtils(timeout=30).get_res(file_ref)
|
||
return resp.content if resp and resp.content else None
|
||
logger.debug(
|
||
"暂不支持的附件引用: channel=%s, source=%s, ref=%s",
|
||
channel.value if channel else None,
|
||
source,
|
||
file_ref,
|
||
)
|
||
return None
|
||
|
||
def _save_agent_attachment(
|
||
self,
|
||
session_id: str,
|
||
filename: Optional[str],
|
||
content: bytes,
|
||
mime_type: Optional[str] = None,
|
||
) -> Path:
|
||
"""
|
||
将用户上传文件写入临时目录,并返回本地路径。
|
||
"""
|
||
safe_name = self._sanitize_attachment_name(filename, mime_type)
|
||
base_dir = settings.TEMP_PATH / "agent_uploads" / session_id
|
||
base_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
file_id = uuid.uuid4().hex[:8]
|
||
local_path = base_dir / f"{file_id}_{safe_name}"
|
||
local_path.write_bytes(content or b"")
|
||
return local_path
|
||
|
||
@staticmethod
|
||
def _sanitize_attachment_name(
|
||
filename: Optional[str], mime_type: Optional[str] = None
|
||
) -> str:
|
||
"""
|
||
规范化附件文件名,避免路径穿越和非法字符。
|
||
"""
|
||
name = Path(filename or "attachment").name
|
||
name = re.sub(r"[^\w.\-]+", "_", name, flags=re.ASCII).strip("._")
|
||
if not name:
|
||
name = "attachment"
|
||
if "." not in name:
|
||
mime = (mime_type or "").split(";", 1)[0].strip().lower()
|
||
default_ext = {
|
||
"image/jpeg": ".jpg",
|
||
"image/png": ".png",
|
||
"image/gif": ".gif",
|
||
"image/webp": ".webp",
|
||
"image/bmp": ".bmp",
|
||
"application/json": ".json",
|
||
"text/plain": ".txt",
|
||
"text/markdown": ".md",
|
||
"text/csv": ".csv",
|
||
}.get(mime)
|
||
if default_ext:
|
||
name = f"{name}{default_ext}"
|
||
return name
|
||
|
||
@staticmethod
|
||
def _guess_image_attachment_name(image_ref: str, index: int) -> str:
|
||
"""
|
||
根据图片引用推测附件名。
|
||
"""
|
||
if not image_ref:
|
||
return f"image_{index}.jpg"
|
||
if image_ref.startswith("data:"):
|
||
mime_part = image_ref[5:].split(";", 1)[0].strip().lower()
|
||
ext = mimetypes.guess_extension(mime_part) or ".jpg"
|
||
return f"image_{index}{ext}"
|
||
|
||
parsed = urlparse(unquote(image_ref))
|
||
name = Path(parsed.path).name if parsed.path else ""
|
||
if name and "." in name:
|
||
return name
|
||
return f"image_{index}.jpg"
|
||
|
||
@staticmethod
|
||
def _guess_image_mime_type(image_ref: str, filename: Optional[str]) -> str:
|
||
"""
|
||
根据图片引用或文件名推测 MIME 类型。
|
||
"""
|
||
if image_ref and image_ref.startswith("data:"):
|
||
mime = image_ref[5:].split(";", 1)[0].strip().lower()
|
||
return mime or "image/jpeg"
|
||
guessed, _ = mimetypes.guess_type(filename or "")
|
||
if guessed and guessed.startswith("image/"):
|
||
return guessed
|
||
return "image/jpeg"
|
||
|
||
@staticmethod
|
||
def _decode_data_url_bytes(data_url: Optional[str]) -> Optional[bytes]:
|
||
"""
|
||
将 data URL 解码为原始字节。
|
||
"""
|
||
if not data_url or not data_url.startswith("data:"):
|
||
return None
|
||
try:
|
||
_, payload = data_url.split(",", 1)
|
||
except ValueError:
|
||
return None
|
||
try:
|
||
return base64.b64decode(payload)
|
||
except Exception as e:
|
||
logger.error(e)
|
||
return None
|
||
|
||
|
||
class MediaInteractionChain(ChainBase):
|
||
"""
|
||
处理媒体搜索、订阅、资源选择和翻页等交互流程。
|
||
"""
|
||
|
||
_button_page_size = 8
|
||
_text_page_size = 8
|
||
|
||
@staticmethod
|
||
def has_pending_interaction(user_id: Union[str, int]) -> bool:
|
||
"""
|
||
判断用户当前是否存在未结束的媒体交互。
|
||
"""
|
||
return media_interaction_manager.get_by_user(user_id) is not None
|
||
|
||
@staticmethod
|
||
def _get_noexits_info(
|
||
meta: MetaBase, mediainfo: MediaInfo
|
||
) -> Dict[Union[int, str], Dict[int, NotExistMediaInfo]]:
|
||
"""
|
||
构造媒体缺失集信息,用于全量重搜或自动下载补全集数。
|
||
"""
|
||
if mediainfo.type == MediaType.TV:
|
||
if not mediainfo.seasons:
|
||
mediainfo = MediaChain().recognize_media(
|
||
mtype=mediainfo.type,
|
||
tmdbid=mediainfo.tmdb_id,
|
||
doubanid=mediainfo.douban_id,
|
||
cache=False,
|
||
)
|
||
if not mediainfo:
|
||
logger.warn("媒体信息识别失败,无法补充季集信息")
|
||
return {}
|
||
if not mediainfo.seasons:
|
||
logger.warn(
|
||
"媒体信息中没有季集信息,标题:%s,tmdbid:%s,doubanid:%s",
|
||
mediainfo.title,
|
||
mediainfo.tmdb_id,
|
||
mediainfo.douban_id,
|
||
)
|
||
return {}
|
||
|
||
mediakey = mediainfo.tmdb_id or mediainfo.douban_id
|
||
no_exists = {mediakey: {}}
|
||
if meta.begin_season:
|
||
episodes = mediainfo.seasons.get(meta.begin_season)
|
||
if not episodes:
|
||
return {}
|
||
no_exists[mediakey][meta.begin_season] = NotExistMediaInfo(
|
||
season=meta.begin_season,
|
||
episodes=[],
|
||
total_episode=len(episodes),
|
||
start_episode=episodes[0],
|
||
)
|
||
else:
|
||
for sea, eps in mediainfo.seasons.items():
|
||
if not eps:
|
||
continue
|
||
no_exists[mediakey][sea] = NotExistMediaInfo(
|
||
season=sea,
|
||
episodes=[],
|
||
total_episode=len(eps),
|
||
start_episode=eps[0],
|
||
)
|
||
return no_exists
|
||
return {}
|
||
|
||
@staticmethod
|
||
def parse_callback(
|
||
callback_data: str,
|
||
) -> Optional[Tuple[Optional[str], str, Optional[int]]]:
|
||
"""
|
||
解析新旧两种媒体交互按钮格式。
|
||
"""
|
||
if callback_data.startswith("media:"):
|
||
parts = callback_data.split(":")
|
||
if len(parts) < 3:
|
||
return None
|
||
request_id = parts[1]
|
||
action = parts[2]
|
||
index = None
|
||
if len(parts) >= 4 and parts[3].isdigit():
|
||
index = int(parts[3])
|
||
return request_id, action, index
|
||
|
||
match = re.match(r"^(select|download)_(\d+)$", callback_data)
|
||
if match:
|
||
return None, match.group(1), int(match.group(2))
|
||
if callback_data == "page_p":
|
||
return None, "page-prev", None
|
||
if callback_data == "page_n":
|
||
return None, "page-next", None
|
||
return None
|
||
|
||
def handle_callback_interaction(
|
||
self,
|
||
callback_data: str,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
username: str,
|
||
original_message_id: Optional[Union[str, int]] = None,
|
||
original_chat_id: Optional[str] = None,
|
||
) -> bool:
|
||
"""
|
||
处理按钮回调,并将当前视图刷新到原消息上。
|
||
"""
|
||
parsed = self.parse_callback(callback_data)
|
||
if not parsed:
|
||
return False
|
||
|
||
request_id, action, index = parsed
|
||
if request_id:
|
||
request = media_interaction_manager.get_by_id(request_id, userid)
|
||
else:
|
||
request = media_interaction_manager.get_by_user(userid)
|
||
|
||
if not request:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="交互已失效,请重新搜索或订阅",
|
||
)
|
||
)
|
||
return True
|
||
|
||
request.channel = channel
|
||
request.source = source
|
||
request.username = username
|
||
|
||
if action == "page-prev":
|
||
if request.page <= 0:
|
||
self._post_invalid_input(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="已经是第一页了!",
|
||
)
|
||
return True
|
||
request.page -= 1
|
||
self._render_interaction(
|
||
request=request,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
original_message_id=original_message_id,
|
||
original_chat_id=original_chat_id,
|
||
)
|
||
return True
|
||
|
||
if action == "page-next":
|
||
if not self._has_next_page(request):
|
||
self._post_invalid_input(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="已经是最后一页了!",
|
||
)
|
||
return True
|
||
request.page += 1
|
||
self._render_interaction(
|
||
request=request,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
original_message_id=original_message_id,
|
||
original_chat_id=original_chat_id,
|
||
)
|
||
return True
|
||
|
||
if action == "select":
|
||
self._handle_media_selection(
|
||
request=request,
|
||
page_index=index,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
original_message_id=original_message_id,
|
||
original_chat_id=original_chat_id,
|
||
)
|
||
return True
|
||
|
||
if action == "download":
|
||
self._handle_torrent_selection(
|
||
request=request,
|
||
page_index=index,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
)
|
||
return True
|
||
|
||
return False
|
||
|
||
def handle_text_interaction(
|
||
self,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
username: str,
|
||
text: str,
|
||
) -> bool:
|
||
"""
|
||
处理文本式交互。
|
||
|
||
有会话时优先处理数字选择和翻页;无会话时负责识别搜索/订阅类入口。
|
||
"""
|
||
request = media_interaction_manager.get_by_user(userid)
|
||
normalized = (text or "").strip()
|
||
lowered = normalized.lower()
|
||
|
||
if request and lowered in {"退出", "关闭", "q", "quit", "exit"}:
|
||
media_interaction_manager.remove(request.request_id)
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="媒体交互已结束",
|
||
)
|
||
)
|
||
return True
|
||
|
||
if normalized.isdigit():
|
||
if not request:
|
||
self._post_invalid_input(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
)
|
||
return True
|
||
request.channel = channel
|
||
request.source = source
|
||
request.username = username
|
||
index = int(normalized)
|
||
if request.phase == "torrent":
|
||
self._handle_torrent_selection(
|
||
request=request,
|
||
page_index=index,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
)
|
||
else:
|
||
self._handle_media_selection(
|
||
request=request,
|
||
page_index=index,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
)
|
||
return True
|
||
|
||
if lowered in {"p", "prev", "上一页"}:
|
||
if not request:
|
||
self._post_invalid_input(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
)
|
||
return True
|
||
if request.page <= 0:
|
||
self._post_invalid_input(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="已经是第一页了!",
|
||
)
|
||
return True
|
||
request.page -= 1
|
||
request.channel = channel
|
||
request.source = source
|
||
request.username = username
|
||
self._render_interaction(
|
||
request=request,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
)
|
||
return True
|
||
|
||
if lowered in {"n", "next", "下一页"}:
|
||
if not request:
|
||
self._post_invalid_input(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
)
|
||
return True
|
||
if not self._has_next_page(request):
|
||
self._post_invalid_input(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="已经是最后一页了!",
|
||
)
|
||
return True
|
||
request.page += 1
|
||
request.channel = channel
|
||
request.source = source
|
||
request.username = username
|
||
self._render_interaction(
|
||
request=request,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
)
|
||
return True
|
||
|
||
action, content = self._resolve_action(normalized)
|
||
if not action:
|
||
return False
|
||
|
||
self._start_media_interaction(
|
||
action=action,
|
||
content=content,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
)
|
||
return True
|
||
|
||
@staticmethod
|
||
def _resolve_action(text: str) -> Tuple[Optional[str], str]:
|
||
"""
|
||
将用户输入归类为搜索、订阅或普通聊天。
|
||
"""
|
||
if text.startswith("订阅"):
|
||
return "Subscribe", re.sub(r"订阅[::\s]*", "", text)
|
||
if text.startswith("洗版"):
|
||
return "ReSubscribe", re.sub(r"洗版[::\s]*", "", text)
|
||
if text.startswith("搜索") or text.startswith("下载"):
|
||
return "ReSearch", re.sub(r"(搜索|下载)[::\s]*", "", text)
|
||
if StringUtils.is_link(text):
|
||
return None, text
|
||
if not StringUtils.is_media_title_like(text):
|
||
return None, text
|
||
return "Search", text
|
||
|
||
def _start_media_interaction(
|
||
self,
|
||
action: str,
|
||
content: str,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
username: str,
|
||
) -> None:
|
||
"""
|
||
根据用户输入搜索媒体,并进入媒体选择阶段。
|
||
"""
|
||
meta, medias = MediaChain().search(content)
|
||
if not meta.name:
|
||
self._post_invalid_input(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title="无法识别输入内容!",
|
||
)
|
||
return
|
||
if not medias:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title=f"{meta.name} 没有找到对应的媒体信息!",
|
||
)
|
||
)
|
||
return
|
||
|
||
logger.info("搜索到 %s 条相关媒体信息", len(medias))
|
||
request = media_interaction_manager.create_or_replace(
|
||
user_id=userid,
|
||
channel=channel,
|
||
source=source,
|
||
username=username,
|
||
action=action,
|
||
keyword=content,
|
||
title=meta.name,
|
||
meta=meta,
|
||
items=medias,
|
||
)
|
||
self._render_interaction(
|
||
request=request,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
)
|
||
|
||
def _handle_media_selection(
|
||
self,
|
||
request: PendingMediaInteraction,
|
||
page_index: Optional[int],
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
username: str,
|
||
original_message_id: Optional[Union[str, int]] = None,
|
||
original_chat_id: Optional[str] = None,
|
||
) -> None:
|
||
"""
|
||
处理媒体选择阶段的序号输入。
|
||
"""
|
||
page_items, page, _ = self._page_items(
|
||
items=request.items,
|
||
page=request.page,
|
||
page_size=self._page_size(request.channel),
|
||
)
|
||
request.page = page
|
||
if not page_index or page_index < 1 or page_index > len(page_items):
|
||
self._post_invalid_input(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
)
|
||
return
|
||
|
||
mediainfo: MediaInfo = page_items[page_index - 1]
|
||
request.current_media = mediainfo
|
||
|
||
if request.action in {"Search", "ReSearch"}:
|
||
self._search_media_resources(
|
||
request=request,
|
||
mediainfo=mediainfo,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
original_message_id=original_message_id,
|
||
original_chat_id=original_chat_id,
|
||
)
|
||
return
|
||
|
||
if request.action in {"Subscribe", "ReSubscribe"}:
|
||
self._subscribe_media(
|
||
request=request,
|
||
mediainfo=mediainfo,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
)
|
||
|
||
def _search_media_resources(
|
||
self,
|
||
request: PendingMediaInteraction,
|
||
mediainfo: MediaInfo,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
username: str,
|
||
original_message_id: Optional[Union[str, int]] = None,
|
||
original_chat_id: Optional[str] = None,
|
||
) -> None:
|
||
"""
|
||
根据已选媒体搜索资源,并切换到资源选择阶段。
|
||
"""
|
||
exist_flag, no_exists = DownloadChain().get_no_exists_info(
|
||
meta=request.meta,
|
||
mediainfo=mediainfo,
|
||
)
|
||
if exist_flag and request.action == "Search":
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title=f"【{mediainfo.title_year}{request.meta.sea} 媒体库中已存在,如需重新下载请发送:搜索 名称 或 下载 名称】",
|
||
)
|
||
)
|
||
return
|
||
if exist_flag:
|
||
no_exists = self._get_noexits_info(request.meta, mediainfo)
|
||
|
||
messages = self._build_no_exists_messages(
|
||
mediainfo=mediainfo,
|
||
no_exists=no_exists,
|
||
show_missing_only=request.action == "Search",
|
||
)
|
||
if messages:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title=f"{mediainfo.title_year}:\n" + "\n".join(messages),
|
||
)
|
||
)
|
||
|
||
logger.info("开始搜索 %s ...", mediainfo.title_year)
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title=f"开始搜索 {mediainfo.type.value} {mediainfo.title_year} ...",
|
||
)
|
||
)
|
||
|
||
contexts = SearchChain().process(mediainfo=mediainfo, no_exists=no_exists)
|
||
if not contexts:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title=f"{mediainfo.title}{request.meta.sea} 未搜索到需要的资源!",
|
||
)
|
||
)
|
||
return
|
||
|
||
contexts = TorrentHelper().sort_torrents(contexts)
|
||
if self._should_auto_download(userid):
|
||
logger.info("用户 %s 在自动下载用户中,开始自动择优下载 ...", userid)
|
||
self._auto_download(
|
||
request=request,
|
||
cache_list=contexts,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
no_exists=no_exists,
|
||
)
|
||
return
|
||
|
||
request.phase = "torrent"
|
||
request.page = 0
|
||
request.title = mediainfo.title
|
||
request.items = list(contexts)
|
||
self._render_interaction(
|
||
request=request,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
original_message_id=original_message_id,
|
||
original_chat_id=original_chat_id,
|
||
)
|
||
|
||
def _subscribe_media(
|
||
self,
|
||
request: PendingMediaInteraction,
|
||
mediainfo: MediaInfo,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
username: str,
|
||
) -> None:
|
||
"""
|
||
根据已选媒体创建订阅或洗版订阅。
|
||
"""
|
||
best_version = request.action == "ReSubscribe"
|
||
if not best_version:
|
||
exist_flag, _ = DownloadChain().get_no_exists_info(
|
||
meta=request.meta,
|
||
mediainfo=mediainfo,
|
||
)
|
||
if exist_flag:
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title=f"【{mediainfo.title_year}{request.meta.sea} 媒体库中已存在,如需洗版请发送:洗版 XXX】",
|
||
)
|
||
)
|
||
return
|
||
|
||
mp_name = (
|
||
UserOper().get_name(**{f"{channel.name.lower()}_userid": userid})
|
||
if channel
|
||
else None
|
||
)
|
||
SubscribeChain().add(
|
||
title=mediainfo.title,
|
||
year=mediainfo.year,
|
||
mtype=mediainfo.type,
|
||
tmdbid=mediainfo.tmdb_id,
|
||
season=request.meta.begin_season,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=mp_name or username,
|
||
best_version=best_version,
|
||
)
|
||
|
||
def _handle_torrent_selection(
|
||
self,
|
||
request: PendingMediaInteraction,
|
||
page_index: Optional[int],
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
username: str,
|
||
) -> None:
|
||
"""
|
||
处理资源选择阶段的下载操作。
|
||
"""
|
||
if request.phase != "torrent":
|
||
self._post_invalid_input(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
)
|
||
return
|
||
|
||
if page_index == 0:
|
||
self._auto_download(
|
||
request=request,
|
||
cache_list=request.items,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
)
|
||
return
|
||
|
||
page_items, page, _ = self._page_items(
|
||
items=request.items,
|
||
page=request.page,
|
||
page_size=self._page_size(request.channel),
|
||
)
|
||
request.page = page
|
||
if not page_index or page_index < 1 or page_index > len(page_items):
|
||
self._post_invalid_input(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
)
|
||
return
|
||
|
||
context: Context = page_items[page_index - 1]
|
||
DownloadChain().download_single(
|
||
context,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
)
|
||
|
||
def _auto_download(
|
||
self,
|
||
request: PendingMediaInteraction,
|
||
cache_list: List[Context],
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
username: str,
|
||
no_exists: Optional[Dict[Union[int, str], Dict[int, NotExistMediaInfo]]] = None,
|
||
) -> None:
|
||
"""
|
||
自动择优下载当前资源列表,并在未完成时补建订阅。
|
||
"""
|
||
downloadchain = DownloadChain()
|
||
if no_exists is None:
|
||
exist_flag, no_exists = downloadchain.get_no_exists_info(
|
||
meta=request.meta,
|
||
mediainfo=request.current_media,
|
||
)
|
||
if exist_flag:
|
||
no_exists = self._get_noexits_info(request.meta, request.current_media)
|
||
|
||
downloads, lefts = downloadchain.batch_download(
|
||
contexts=cache_list,
|
||
no_exists=no_exists,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
)
|
||
if downloads and not lefts:
|
||
logger.info("%s 下载完成", request.current_media.title_year)
|
||
return
|
||
|
||
logger.info("%s 未下载未完整,添加订阅 ...", request.current_media.title_year)
|
||
if downloads and request.current_media.type == MediaType.TV:
|
||
note = [
|
||
download.meta_info.begin_episode
|
||
for download in downloads
|
||
if download.meta_info.begin_episode
|
||
]
|
||
else:
|
||
note = None
|
||
|
||
mp_name = (
|
||
UserOper().get_name(**{f"{channel.name.lower()}_userid": userid})
|
||
if channel
|
||
else None
|
||
)
|
||
SubscribeChain().add(
|
||
title=request.current_media.title,
|
||
year=request.current_media.year,
|
||
mtype=request.current_media.type,
|
||
tmdbid=request.current_media.tmdb_id,
|
||
season=request.meta.begin_season,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=mp_name or username,
|
||
state="R",
|
||
note=note,
|
||
)
|
||
|
||
def _render_interaction(
|
||
self,
|
||
request: PendingMediaInteraction,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
original_message_id: Optional[Union[str, int]] = None,
|
||
original_chat_id: Optional[str] = None,
|
||
) -> None:
|
||
"""
|
||
按当前阶段渲染媒体列表或资源列表。
|
||
"""
|
||
if request.phase == "torrent":
|
||
self._post_torrents_message(
|
||
request=request,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
original_message_id=original_message_id,
|
||
original_chat_id=original_chat_id,
|
||
)
|
||
else:
|
||
self._post_medias_message(
|
||
request=request,
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
original_message_id=original_message_id,
|
||
original_chat_id=original_chat_id,
|
||
)
|
||
|
||
def _post_medias_message(
|
||
self,
|
||
request: PendingMediaInteraction,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
original_message_id: Optional[Union[str, int]] = None,
|
||
original_chat_id: Optional[str] = None,
|
||
) -> None:
|
||
"""
|
||
发送或更新媒体选择列表。
|
||
"""
|
||
page_items, page, total_pages = self._page_items(
|
||
items=request.items,
|
||
page=request.page,
|
||
page_size=self._page_size(channel),
|
||
)
|
||
request.page = page
|
||
total = len(request.items)
|
||
if self._supports_interactive_buttons(channel):
|
||
title = f"【{request.title}】共找到{total}条相关信息,请选择操作"
|
||
buttons = self._create_media_buttons(
|
||
channel=channel,
|
||
request=request,
|
||
items=page_items,
|
||
total=total,
|
||
total_pages=total_pages,
|
||
)
|
||
else:
|
||
if total > self._page_size(channel):
|
||
title = f"【{request.title}】共找到{total}条相关信息,请回复对应数字选择(p: 上一页 n: 下一页)"
|
||
else:
|
||
title = f"【{request.title}】共找到{total}条相关信息,请回复对应数字选择"
|
||
buttons = None
|
||
|
||
self.post_medias_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
title=title,
|
||
userid=userid,
|
||
buttons=buttons,
|
||
original_message_id=original_message_id,
|
||
original_chat_id=original_chat_id,
|
||
),
|
||
medias=page_items,
|
||
)
|
||
|
||
def _post_torrents_message(
|
||
self,
|
||
request: PendingMediaInteraction,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
original_message_id: Optional[Union[str, int]] = None,
|
||
original_chat_id: Optional[str] = None,
|
||
) -> None:
|
||
"""
|
||
发送或更新资源选择列表。
|
||
"""
|
||
page_items, page, total_pages = self._page_items(
|
||
items=request.items,
|
||
page=request.page,
|
||
page_size=self._page_size(channel),
|
||
)
|
||
request.page = page
|
||
total = len(request.items)
|
||
if self._supports_interactive_buttons(channel):
|
||
title = f"【{request.title}】共找到{total}条相关资源,请选择下载"
|
||
buttons = self._create_torrent_buttons(
|
||
channel=channel,
|
||
request=request,
|
||
items=page_items,
|
||
total=total,
|
||
total_pages=total_pages,
|
||
)
|
||
else:
|
||
if total > self._page_size(channel):
|
||
title = f"【{request.title}】共找到{total}条相关资源,请回复对应数字下载(0: 自动选择 p: 上一页 n: 下一页)"
|
||
else:
|
||
title = f"【{request.title}】共找到{total}条相关资源,请回复对应数字下载(0: 自动选择)"
|
||
buttons = None
|
||
|
||
self.post_torrents_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
title=title,
|
||
userid=userid,
|
||
link=settings.MP_DOMAIN("#/resource"),
|
||
buttons=buttons,
|
||
original_message_id=original_message_id,
|
||
original_chat_id=original_chat_id,
|
||
),
|
||
torrents=page_items,
|
||
)
|
||
|
||
def _create_media_buttons(
|
||
self,
|
||
channel: MessageChannel,
|
||
request: PendingMediaInteraction,
|
||
items: List[MediaInfo],
|
||
total: int,
|
||
total_pages: int,
|
||
) -> List[List[Dict[str, str]]]:
|
||
"""
|
||
为媒体列表生成选择和翻页按钮。
|
||
"""
|
||
buttons: List[List[Dict[str, str]]] = []
|
||
max_text_length = ChannelCapabilityManager.get_max_button_text_length(channel)
|
||
max_per_row = ChannelCapabilityManager.get_max_buttons_per_row(channel)
|
||
|
||
current_row: List[Dict[str, str]] = []
|
||
for index, media in enumerate(items, start=1):
|
||
if max_per_row == 1:
|
||
button_text = f"{index}. {media.title_year}"
|
||
if len(button_text) > max_text_length:
|
||
button_text = button_text[: max_text_length - 3] + "..."
|
||
buttons.append(
|
||
[
|
||
{
|
||
"text": button_text,
|
||
"callback_data": f"media:{request.request_id}:select:{index}",
|
||
}
|
||
]
|
||
)
|
||
continue
|
||
|
||
current_row.append(
|
||
{
|
||
"text": f"{index}",
|
||
"callback_data": f"media:{request.request_id}:select:{index}",
|
||
}
|
||
)
|
||
if len(current_row) == max_per_row or index == len(items):
|
||
buttons.append(current_row)
|
||
current_row = []
|
||
|
||
if total > self._page_size(channel):
|
||
buttons.extend(self._navigation_buttons(request, total_pages))
|
||
return buttons
|
||
|
||
def _create_torrent_buttons(
|
||
self,
|
||
channel: MessageChannel,
|
||
request: PendingMediaInteraction,
|
||
items: List[Context],
|
||
total: int,
|
||
total_pages: int,
|
||
) -> List[List[Dict[str, str]]]:
|
||
"""
|
||
为资源列表生成下载和翻页按钮。
|
||
"""
|
||
buttons: List[List[Dict[str, str]]] = [
|
||
[
|
||
{
|
||
"text": "🤖 自动选择下载",
|
||
"callback_data": f"media:{request.request_id}:download:0",
|
||
}
|
||
]
|
||
]
|
||
max_text_length = ChannelCapabilityManager.get_max_button_text_length(channel)
|
||
max_per_row = ChannelCapabilityManager.get_max_buttons_per_row(channel)
|
||
|
||
current_row: List[Dict[str, str]] = []
|
||
for index, context in enumerate(items, start=1):
|
||
torrent = context.torrent_info
|
||
if max_per_row == 1:
|
||
button_text = f"{index}. {torrent.site_name} - {torrent.seeders}↑"
|
||
if len(button_text) > max_text_length:
|
||
button_text = button_text[: max_text_length - 3] + "..."
|
||
buttons.append(
|
||
[
|
||
{
|
||
"text": button_text,
|
||
"callback_data": f"media:{request.request_id}:download:{index}",
|
||
}
|
||
]
|
||
)
|
||
continue
|
||
|
||
current_row.append(
|
||
{
|
||
"text": f"{index}",
|
||
"callback_data": f"media:{request.request_id}:download:{index}",
|
||
}
|
||
)
|
||
if len(current_row) == max_per_row or index == len(items):
|
||
buttons.append(current_row)
|
||
current_row = []
|
||
|
||
if total > self._page_size(channel):
|
||
buttons.extend(self._navigation_buttons(request, total_pages))
|
||
return buttons
|
||
|
||
def _has_next_page(self, request: PendingMediaInteraction) -> bool:
|
||
"""
|
||
判断当前视图是否还有下一页。
|
||
"""
|
||
_, page, total_pages = self._page_items(
|
||
items=request.items,
|
||
page=request.page,
|
||
page_size=self._page_size(request.channel),
|
||
)
|
||
return page < total_pages - 1
|
||
|
||
@staticmethod
|
||
def _navigation_buttons(
|
||
request: PendingMediaInteraction,
|
||
total_pages: int,
|
||
) -> List[List[Dict[str, str]]]:
|
||
"""
|
||
按当前页状态生成上一页和下一页按钮。
|
||
"""
|
||
buttons: List[List[Dict[str, str]]] = []
|
||
nav_row: List[Dict[str, str]] = []
|
||
if request.page > 0:
|
||
nav_row.append(
|
||
{
|
||
"text": "⬅️ 上一页",
|
||
"callback_data": f"media:{request.request_id}:page-prev",
|
||
}
|
||
)
|
||
if request.page < total_pages - 1:
|
||
nav_row.append(
|
||
{
|
||
"text": "下一页 ➡️",
|
||
"callback_data": f"media:{request.request_id}:page-next",
|
||
}
|
||
)
|
||
if nav_row:
|
||
buttons.append(nav_row)
|
||
return buttons
|
||
|
||
@staticmethod
|
||
def _page_items(
|
||
items: List[Any],
|
||
page: int,
|
||
page_size: int,
|
||
) -> Tuple[List[Any], int, int]:
|
||
"""
|
||
返回当前页数据,并把页码限制在有效范围内。
|
||
"""
|
||
total_pages = max(1, math.ceil(len(items) / page_size)) if page_size else 1
|
||
page = min(max(0, page), total_pages - 1)
|
||
start = page * page_size
|
||
end = start + page_size
|
||
return items[start:end], page, total_pages
|
||
|
||
def _page_size(self, channel: Optional[MessageChannel]) -> int:
|
||
"""
|
||
按渠道交互能力选择分页大小。
|
||
"""
|
||
return (
|
||
self._button_page_size
|
||
if self._supports_interactive_buttons(channel)
|
||
else self._text_page_size
|
||
)
|
||
|
||
@staticmethod
|
||
def _supports_interactive_buttons(channel: Optional[MessageChannel]) -> bool:
|
||
"""
|
||
判断渠道是否同时支持按钮展示与按钮回调。
|
||
"""
|
||
return bool(
|
||
channel
|
||
and ChannelCapabilityManager.supports_buttons(channel)
|
||
and ChannelCapabilityManager.supports_callbacks(channel)
|
||
)
|
||
|
||
@staticmethod
|
||
def _build_no_exists_messages(
|
||
mediainfo: MediaInfo,
|
||
no_exists: Optional[Dict[Union[int, str], Dict[int, NotExistMediaInfo]]],
|
||
show_missing_only: bool,
|
||
) -> List[str]:
|
||
"""
|
||
将缺失集信息转换为可发送的文案。
|
||
"""
|
||
if not no_exists:
|
||
return []
|
||
mediakey = mediainfo.tmdb_id or mediainfo.douban_id
|
||
season_map = no_exists.get(mediakey) or {}
|
||
if show_missing_only:
|
||
return [
|
||
f"第 {sea} 季缺失 {StringUtils.str_series(no_exist.episodes) if no_exist.episodes else no_exist.total_episode} 集"
|
||
for sea, no_exist in season_map.items()
|
||
]
|
||
return [
|
||
f"第 {sea} 季总 {no_exist.total_episode} 集"
|
||
for sea, no_exist in season_map.items()
|
||
]
|
||
|
||
@staticmethod
|
||
def _should_auto_download(userid: Union[str, int]) -> bool:
|
||
"""
|
||
判断当前用户是否命中自动下载名单。
|
||
"""
|
||
auto_download_user = settings.AUTO_DOWNLOAD_USER
|
||
return bool(
|
||
auto_download_user
|
||
and (
|
||
auto_download_user == "all"
|
||
or any(userid == user for user in auto_download_user.split(","))
|
||
)
|
||
)
|
||
|
||
def _post_invalid_input(
|
||
self,
|
||
channel: MessageChannel,
|
||
source: str,
|
||
userid: Union[str, int],
|
||
username: Optional[str],
|
||
title: str = "输入有误!",
|
||
) -> None:
|
||
"""
|
||
发送统一的非法输入提示。
|
||
"""
|
||
self.post_message(
|
||
Notification(
|
||
channel=channel,
|
||
source=source,
|
||
userid=userid,
|
||
username=username,
|
||
title=title,
|
||
)
|
||
)
|