Files
MoviePilot/app/helper/interaction.py
2026-05-01 09:53:04 +08:00

627 lines
18 KiB
Python

import math
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from threading import Lock
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
from app.core.context import MediaInfo
from app.core.meta import MetaBase
from app.schemas import Notification
from app.schemas.message import ChannelCapabilityManager
from app.schemas.types import MessageChannel
@dataclass
class PendingSlashInteraction:
"""
通用 slash 命令交互上下文。
"""
request_id: str
user_id: str
channel: Optional[MessageChannel]
source: Optional[str]
username: Optional[str]
command: str
page: int = 0
awaiting_input: Optional[str] = None
created_at: datetime = field(default_factory=datetime.now)
class SlashInteractionManager:
"""
管理单个 slash 命令的交互会话。
"""
_ttl = timedelta(hours=24)
def __init__(self):
self._by_id: Dict[str, PendingSlashInteraction] = {}
self._by_user: Dict[str, str] = {}
self._lock = Lock()
def _cleanup_locked(self) -> None:
expire_before = datetime.now() - self._ttl
expired = [
request_id
for request_id, request in self._by_id.items()
if request.created_at < expire_before
]
for request_id in expired:
request = self._by_id.pop(request_id, None)
if request:
self._by_user.pop(str(request.user_id), None)
def create_or_replace(
self,
user_id: Union[str, int],
command: str,
channel: Optional[MessageChannel],
source: Optional[str],
username: Optional[str],
) -> PendingSlashInteraction:
with self._lock:
self._cleanup_locked()
user_key = str(user_id)
old_request_id = self._by_user.get(user_key)
if old_request_id:
self._by_id.pop(old_request_id, None)
request = PendingSlashInteraction(
request_id=uuid.uuid4().hex[:12],
user_id=user_key,
command=command,
channel=channel,
source=source,
username=username,
)
self._by_id[request.request_id] = request
self._by_user[user_key] = request.request_id
return request
def get_by_user(
self, user_id: Union[str, int]
) -> Optional[PendingSlashInteraction]:
with self._lock:
self._cleanup_locked()
request_id = self._by_user.get(str(user_id))
if not request_id:
return None
return self._by_id.get(request_id)
def get_by_id(
self, request_id: str, user_id: Union[str, int]
) -> Optional[PendingSlashInteraction]:
with self._lock:
self._cleanup_locked()
request = self._by_id.get(request_id)
if not request or str(request.user_id) != str(user_id):
return None
return request
def remove(self, request_id: str) -> None:
with self._lock:
request = self._by_id.pop(request_id, None)
if request:
self._by_user.pop(str(request.user_id), None)
def clear(self) -> None:
with self._lock:
self._by_id.clear()
self._by_user.clear()
def supports_interaction_buttons(channel: Optional[MessageChannel]) -> bool:
"""
渠道同时支持按钮和回调时,优先使用按钮交互。
"""
return bool(
channel
and ChannelCapabilityManager.supports_buttons(channel)
and ChannelCapabilityManager.supports_callbacks(channel)
)
def supports_markdown(channel: Optional[MessageChannel]) -> bool:
"""
仅在支持 Markdown 的渠道上输出 Markdown 内容。
"""
return bool(channel and ChannelCapabilityManager.supports_markdown(channel))
def page_items(
items: Sequence[Any],
page: int,
page_size: int,
) -> Tuple[List[Any], int, int]:
"""
对列表做分页并规范化页码。
"""
total = len(items)
if total == 0:
return [], 0, 1
total_pages = max(1, math.ceil(total / max(1, page_size)))
page = min(max(0, page), total_pages - 1)
start = page * page_size
end = start + page_size
return list(items[start:end]), page, total_pages
def build_navigation_buttons(
prefix: str,
request: Any,
page: int,
total_pages: int,
) -> List[List[dict]]:
"""
构造标准上一页/下一页按钮。
"""
buttons = []
nav_row = []
if page > 0:
nav_row.append(
{
"text": "⬅️ 上一页",
"callback_data": f"{prefix}:{request.request_id}:page-prev",
}
)
if page < total_pages - 1:
nav_row.append(
{
"text": "下一页 ➡️",
"callback_data": f"{prefix}:{request.request_id}:page-next",
}
)
if nav_row:
buttons.append(nav_row)
return buttons
def update_or_post_message(
chain,
channel: MessageChannel,
source: Optional[str],
userid: Union[str, int],
username: Optional[str],
title: str,
text: str,
buttons: Optional[List[List[dict]]] = None,
original_message_id: Optional[Union[str, int]] = None,
original_chat_id: Optional[str] = None,
) -> None:
"""
优先编辑原消息,失败时回退为发送新消息。
"""
if (
original_message_id
and original_chat_id
and ChannelCapabilityManager.supports_editing(channel)
):
edited = chain.edit_message(
channel=channel,
source=source,
message_id=original_message_id,
chat_id=original_chat_id,
title=title,
text=text,
buttons=buttons,
)
if edited:
return
chain.post_message(
Notification(
channel=channel,
source=source,
userid=userid,
username=username,
title=title,
text=text,
buttons=buttons,
)
)
def escape_markdown_table_cell(value: object) -> str:
"""
最小化转义 Markdown 表格中的特殊字符。
"""
text = str(value or "").replace("\n", "<br>")
return text.replace("|", "\\|")
def format_markdown_table(
headers: Sequence[str],
rows: Sequence[Sequence[object]],
) -> str:
"""
生成 Markdown 表格文本。
"""
header_line = (
"| "
+ " | ".join(escape_markdown_table_cell(item) for item in headers)
+ " |"
)
separator_line = "| " + " | ".join("---" for _ in headers) + " |"
data_lines = [
"| "
+ " | ".join(escape_markdown_table_cell(item) for item in row)
+ " |"
for row in rows
]
return "\n".join([header_line, separator_line, *data_lines])
@dataclass
class PendingMediaInteraction:
"""
记录一次搜索/下载/订阅交互的当前上下文。
"""
request_id: str
user_id: str
channel: Optional[MessageChannel]
source: Optional[str]
username: Optional[str]
action: str
keyword: str
phase: str = "media"
page: int = 0
title: str = ""
meta: Optional[MetaBase] = None
current_media: Optional[MediaInfo] = None
items: List[Any] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.now)
class MediaInteractionManager:
"""
管理用户当前激活的媒体交互状态。
每个用户只保留一个有效会话,避免旧按钮与新一轮搜索混用。
"""
_ttl = timedelta(hours=24)
def __init__(self):
self._by_id: Dict[str, PendingMediaInteraction] = {}
self._by_user: Dict[str, str] = {}
self._lock = Lock()
def _cleanup_locked(self) -> None:
"""
清理超时会话,避免内存中残留旧交互状态。
"""
expire_before = datetime.now() - self._ttl
expired = [
request_id
for request_id, request in self._by_id.items()
if request.created_at < expire_before
]
for request_id in expired:
request = self._by_id.pop(request_id, None)
if request:
self._by_user.pop(str(request.user_id), None)
def create_or_replace(
self,
user_id: Union[str, int],
channel: Optional[MessageChannel],
source: Optional[str],
username: Optional[str],
action: str,
keyword: str,
title: str = "",
meta: Optional[MetaBase] = None,
items: Optional[List[Any]] = None,
) -> PendingMediaInteraction:
"""
为用户创建新的交互状态,并替换旧会话。
"""
with self._lock:
self._cleanup_locked()
user_key = str(user_id)
old_request_id = self._by_user.get(user_key)
if old_request_id:
self._by_id.pop(old_request_id, None)
request = PendingMediaInteraction(
request_id=uuid.uuid4().hex[:12],
user_id=user_key,
channel=channel,
source=source,
username=username,
action=action,
keyword=keyword,
title=title,
meta=meta,
items=list(items or []),
)
self._by_id[request.request_id] = request
self._by_user[user_key] = request.request_id
return request
def get_by_user(
self, user_id: Union[str, int]
) -> Optional[PendingMediaInteraction]:
"""
按用户读取当前会话,供文本回复和旧按钮兼容使用。
"""
with self._lock:
self._cleanup_locked()
request_id = self._by_user.get(str(user_id))
if not request_id:
return None
return self._by_id.get(request_id)
def get_by_id(
self, request_id: str, user_id: Union[str, int]
) -> Optional[PendingMediaInteraction]:
"""
按请求 ID 读取会话,并校验用户归属。
"""
with self._lock:
self._cleanup_locked()
request = self._by_id.get(request_id)
if not request or str(request.user_id) != str(user_id):
return None
return request
def remove(self, request_id: str) -> None:
"""
主动结束一条会话。
"""
with self._lock:
request = self._by_id.pop(request_id, None)
if request:
self._by_user.pop(str(request.user_id), None)
def clear(self) -> None:
"""
清空所有交互状态,主要用于测试。
"""
with self._lock:
self._by_id.clear()
self._by_user.clear()
media_interaction_manager = MediaInteractionManager()
@dataclass(frozen=True)
class AgentInteractionOption:
"""
Agent 交互选项。
"""
label: str
value: str
@dataclass
class PendingAgentInteraction:
"""
待处理的 Agent 客户端交互请求。
"""
request_id: str
session_id: str
user_id: str
channel: Optional[str]
source: Optional[str]
username: Optional[str]
title: Optional[str]
prompt: str
options: List[AgentInteractionOption]
created_at: datetime = field(default_factory=datetime.now)
class AgentInteractionManager:
"""
管理 Agent 发起的客户端交互请求。
"""
_ttl = timedelta(hours=24)
def __init__(self):
self._pending_interactions: Dict[str, PendingAgentInteraction] = {}
self._lock = Lock()
def _cleanup_locked(self) -> None:
expire_before = datetime.now() - self._ttl
expired_ids = [
request_id
for request_id, request in self._pending_interactions.items()
if request.created_at < expire_before
]
for request_id in expired_ids:
self._pending_interactions.pop(request_id, None)
def create_request(
self,
session_id: str,
user_id: str,
channel: Optional[str],
source: Optional[str],
username: Optional[str],
title: Optional[str],
prompt: str,
options: List[AgentInteractionOption],
) -> PendingAgentInteraction:
"""
创建一条待用户确认的 Agent 交互请求。
"""
with self._lock:
self._cleanup_locked()
request_id = uuid.uuid4().hex[:12]
while request_id in self._pending_interactions:
request_id = uuid.uuid4().hex[:12]
request = PendingAgentInteraction(
request_id=request_id,
session_id=session_id,
user_id=str(user_id),
channel=channel,
source=source,
username=username,
title=title,
prompt=prompt,
options=options,
)
self._pending_interactions[request_id] = request
return request
def resolve(
self,
request_id: str,
option_index: int,
user_id: Optional[str] = None,
) -> Optional[tuple[PendingAgentInteraction, AgentInteractionOption]]:
"""
消费一条 Agent 交互请求,并返回选中的选项。
"""
with self._lock:
self._cleanup_locked()
request = self._pending_interactions.get(request_id)
if not request:
return None
if user_id is not None and str(request.user_id) != str(user_id):
return None
if option_index < 1 or option_index > len(request.options):
return None
option = request.options[option_index - 1]
self._pending_interactions.pop(request_id, None)
return request, option
def clear(self) -> None:
"""
清空所有 Agent 交互请求。
"""
with self._lock:
self._pending_interactions.clear()
agent_interaction_manager = AgentInteractionManager()
@dataclass
class PendingSkillsInteraction:
"""
记录一次 /skills 会话的上下文,便于按钮和文本回复共用同一状态。
"""
request_id: str
user_id: str
channel: Optional[MessageChannel]
source: Optional[str]
username: Optional[str]
view: str = "root"
local_page: int = 0
market_page: int = 0
market_query: str = ""
awaiting_input: Optional[str] = None
created_at: datetime = field(default_factory=datetime.now)
class SkillsInteractionManager:
"""
管理用户当前的技能交互状态。
每个用户同一时间只保留一个有效会话,避免旧按钮继续生效。
"""
_ttl = timedelta(hours=24)
def __init__(self):
self._by_id: Dict[str, PendingSkillsInteraction] = {}
self._by_user: Dict[str, str] = {}
self._lock = Lock()
def _cleanup_locked(self):
"""
清理超时会话,避免按钮回调无限积累。
"""
expire_before = datetime.now() - self._ttl
expired = [
request_id
for request_id, request in self._by_id.items()
if request.created_at < expire_before
]
for request_id in expired:
request = self._by_id.pop(request_id, None)
if request:
self._by_user.pop(str(request.user_id), None)
def create_or_replace(
self,
user_id: Union[str, int],
channel: Optional[MessageChannel],
source: Optional[str],
username: Optional[str],
) -> PendingSkillsInteraction:
"""
为用户创建新会话,并替换掉旧的技能交互状态。
"""
with self._lock:
self._cleanup_locked()
user_key = str(user_id)
old_request_id = self._by_user.get(user_key)
if old_request_id:
self._by_id.pop(old_request_id, None)
request_id = uuid.uuid4().hex[:12]
request = PendingSkillsInteraction(
request_id=request_id,
user_id=user_key,
channel=channel,
source=source,
username=username,
)
self._by_id[request_id] = request
self._by_user[user_key] = request_id
return request
def get_by_user(
self, user_id: Union[str, int]
) -> Optional[PendingSkillsInteraction]:
"""
按用户获取当前有效会话,供纯文本回复路由使用。
"""
with self._lock:
self._cleanup_locked()
request_id = self._by_user.get(str(user_id))
if not request_id:
return None
return self._by_id.get(request_id)
def get_by_id(
self, request_id: str, user_id: Union[str, int]
) -> Optional[PendingSkillsInteraction]:
"""
按请求 ID 获取会话,并校验会话归属用户。
"""
with self._lock:
self._cleanup_locked()
request = self._by_id.get(request_id)
if not request or str(request.user_id) != str(user_id):
return None
return request
def remove(self, request_id: str) -> None:
"""
主动结束会话,释放用户和请求 ID 的双向索引。
"""
with self._lock:
request = self._by_id.pop(request_id, None)
if request:
self._by_user.pop(str(request.user_id), None)
def clear(self):
"""
清空所有会话,主要用于测试场景。
"""
with self._lock:
self._by_id.clear()
self._by_user.clear()
skills_interaction_manager = SkillsInteractionManager()