Files
MoviePilot/tests/test_web_agent_stream.py
2026-06-27 15:09:41 +08:00

703 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import time
from queue import Queue
from types import SimpleNamespace
from unittest.mock import AsyncMock, patch
from app import schemas
from app.agent import ReplyMode, agent_manager
from app.api.endpoints.agent import (
_WebAgentMoviePilotAgent,
_WEB_AGENT_FILE_REGISTRY,
_WEB_AGENT_NOTICE_QUEUES,
_apply_web_agent_display_event,
_build_web_agent_input_attachments,
_build_web_agent_notification_events,
_build_web_agent_command_items,
_build_web_agent_session_id,
_build_web_agent_traditional_callback_payload,
_build_web_agent_display_message_from_events,
_collect_web_agent_traditional_events,
_dispatch_web_agent_notice_event,
_extract_web_agent_notification_from_event_data,
_has_web_agent_traditional_interaction,
_prepare_web_agent_audio_attachment_path,
_transcribe_web_agent_audio_refs,
web_agent_stream,
_resolve_web_agent_choice_payload,
_split_web_agent_output,
)
from app.core.event import Event
from app.db.agentchat_oper import AgentChatOper
from app.helper.agent import build_web_agent_message_update_event
from app.helper.interaction import AgentInteractionOption, agent_interaction_manager, skills_interaction_manager
from app.chain.message import MessageChain
from app.schemas.message import ChannelCapability, ChannelCapabilityManager
from app.schemas.types import EventType, MessageChannel, NotificationType
def test_split_web_agent_output_extracts_verbose_tool_message():
"""应将啰嗦模式工具提示拆成独立工具事件,并保留渠道展示文案。"""
events = _split_web_agent_output("准备查询。\n\n⚙️ => 查询站点\n\n已完成")
assert events == [
{"type": "delta", "content": "准备查询。\n\n"},
{"type": "tool", "message": "⚙️ => 查询站点"},
{"type": "delta", "content": "已完成"},
]
def test_split_web_agent_output_extracts_summary_tool_message():
"""应将非啰嗦模式工具汇总行拆成独立工具事件,并保留渠道展示文案。"""
events = _split_web_agent_output("(查询了 2 次数据)\n\n这里是结果")
assert events == [
{"type": "tool", "message": "(查询了 2 次数据)"},
{"type": "delta", "content": "\n这里是结果"},
]
def test_split_web_agent_output_preserves_standalone_newline_delta():
"""独立换行增量应保留,避免流式 Markdown 列表被拼成同一行。"""
chunks = [
"可以这样操作:",
"\n",
"- **搜索资源**:搜索电影",
"\n",
"- **下载管理**:添加任务",
]
content = ""
for chunk in chunks:
for event in _split_web_agent_output(chunk):
if event["type"] == "delta":
content += event["content"]
assert content == "可以这样操作:\n- **搜索资源**:搜索电影\n- **下载管理**:添加任务"
def test_build_web_agent_session_id_is_stable_per_user_and_seed():
"""同一用户和前端会话标识应生成稳定的服务端会话 ID。"""
user = SimpleNamespace(id=1, name="admin")
first = _build_web_agent_session_id(user, "browser-session")
second = _build_web_agent_session_id(user, "browser-session")
other = _build_web_agent_session_id(user, "other-session")
assert first == second
assert first != other
assert first.startswith("web-agent:")
def test_build_web_agent_session_id_reuses_accessible_history():
"""传入已有历史会话 ID 时应直接复用,避免跨渠道继续对话丢上下文。"""
user = SimpleNamespace(id=1, name="admin", is_superuser=True)
AgentChatOper().save_display_messages(
session_id="telegram-session",
user_id="telegram-user",
username="tester",
channel=MessageChannel.Telegram.value,
source="telegram-main",
messages=[],
title="Telegram 会话",
)
assert _build_web_agent_session_id(user, "telegram-session") == "telegram-session"
def test_apply_web_agent_display_event_updates_snapshot():
"""WebAgent SSE 事件应可聚合为服务端展示快照。"""
message = {
"id": "assistant-1",
"role": "assistant",
"content": "",
"createdAt": 1,
"status": "streaming",
"tools": [],
"attachments": [],
"choices": [],
}
_apply_web_agent_display_event({"type": "delta", "content": "你好"}, message)
_apply_web_agent_display_event({"type": "tool", "message": "查询订阅"}, message)
_apply_web_agent_display_event(
{
"type": "attachment",
"attachment": {"kind": "file", "url": "message/agent/file/a"},
},
message,
)
_apply_web_agent_display_event({"type": "done"}, message)
assert message["content"] == "你好"
assert message["status"] == "done"
assert len(message["tools"]) == 1
assert message["tools"][0]["message"] == "查询订阅"
assert message["tools"][0]["status"] == "done"
assert message["attachments"] == [{"kind": "file", "url": "message/agent/file/a"}]
def test_build_web_agent_input_attachments_marks_kinds():
"""WebAgent 用户输入附件应转换为可展示的附件记录。"""
attachments = _build_web_agent_input_attachments(
images=["data:image/png;base64,abc"],
files=[
{
"ref": "message/agent/file/file-1",
"name": "report.txt",
"mime_type": "text/plain",
"size": 5,
}
],
audio_refs=["message/agent/file/audio-1"],
)
assert [item["kind"] for item in attachments] == ["image", "file", "audio"]
assert attachments[1]["name"] == "report.txt"
def test_build_web_agent_command_items_returns_slash_commands():
"""WebAgent 命令建议应返回可展示的斜杠命令。"""
with patch(
"app.api.endpoints.agent.Command",
return_value=SimpleNamespace(
get_commands=lambda: {
"/sites": {"description": "管理站点", "category": "站点"},
"hidden": {"description": "忽略", "category": "其他"},
"/hidden": {"description": "隐藏", "category": "其他", "show": False},
}
),
):
commands = _build_web_agent_command_items()
assert commands == [
{
"command": "/sites",
"description": "管理站点",
"category": "站点",
"type": "",
"pid": None,
}
]
def test_build_web_agent_command_items_includes_sites_command():
"""WebAgent 命令建议应包含内建站点管理命令。"""
commands = _build_web_agent_command_items()
assert any(command["command"] == "/sites" for command in commands)
def test_build_web_agent_traditional_callback_payload_wraps_callback():
"""传统按钮回调应包装为可继续提交给 MessageChain 的消息。"""
payload = _build_web_agent_traditional_callback_payload(
"skills:req-1:root",
original_message_id="assistant-1",
original_chat_id="web-session",
)
assert payload["message"] == "CALLBACK:skills:req-1:root"
assert payload["traditional"] is True
assert payload["original_message_id"] == "assistant-1"
assert payload["original_chat_id"] == "web-session"
def test_web_agent_stream_returns_error_for_unknown_command():
"""不存在的 WebAgent 斜杠命令应立即返回错误,不进入等待队列。"""
payload = schemas.AgentWebChatRequest(
text="/missing_command 参数",
session_id="browser-session",
)
request = SimpleNamespace()
user = SimpleNamespace(id=1, name="admin", is_superuser=True)
with patch(
"app.api.endpoints.agent.Command",
return_value=SimpleNamespace(get=lambda _: {}),
), patch("app.api.endpoints.agent.MessageChain.handle_message") as handle_message:
response = asyncio.run(web_agent_stream(payload, request, user))
body = "".join(asyncio.run(_collect_streaming_response(response)))
assert "error" in body
assert "命令不存在:/missing_command" in body
handle_message.assert_not_called()
def test_build_web_agent_message_update_event_converts_buttons():
"""WebAgent 编辑消息应转换为可原地更新卡片的事件。"""
event = build_web_agent_message_update_event(
message_id="assistant-1",
title="技能管理",
text="请选择操作",
buttons=[[{"text": "返回", "callback_data": "skills:req-1:root"}]],
)
assert event["type"] == "message_update"
assert event["target_message"]["id"] == "assistant-1"
assert event["target_message"]["choices"][0]["title"] == "技能管理"
assert event["target_message"]["choices"][0]["prompt"] == "请选择操作"
assert event["target_message"]["choices"][0]["buttons"][0]["label"] == "返回"
def test_build_web_agent_display_message_from_events_marks_done():
"""传统消息事件应聚合为完成态助手展示消息。"""
message = _build_web_agent_display_message_from_events([
{"type": "delta", "content": "菜单"},
{
"type": "choice",
"choice": {
"id": "choice-1",
"prompt": "请选择",
"buttons": [{"label": "返回", "callback_data": "back"}],
},
},
])
assert message["content"] == "菜单"
assert message["status"] == "done"
assert message["choices"][0]["prompt"] == "请选择"
def test_has_web_agent_traditional_interaction_detects_pending_skills():
"""WebAgent 应能识别命令后的传统交互上下文。"""
skills_interaction_manager.clear()
try:
skills_interaction_manager.create_or_replace(
user_id="1",
channel=MessageChannel.WebAgent,
source="web-agent",
username="admin",
)
assert _has_web_agent_traditional_interaction("1") is True
assert _has_web_agent_traditional_interaction("2") is False
finally:
skills_interaction_manager.clear()
def test_web_agent_admin_context_uses_current_user_id():
"""Web Agent 工具权限应按当前登录用户 ID 判断管理员身份。"""
agent = _WebAgentMoviePilotAgent(
session_id="web-agent:session",
user_id="7",
channel=MessageChannel.WebAgent.value,
source="web-agent",
username="normal-user",
replay_mode=ReplyMode.CAPTURE_ONLY,
)
with patch("app.api.endpoints.agent.UserOper") as user_oper:
user_oper.return_value.async_get_by_id = AsyncMock(
return_value=SimpleNamespace(is_superuser=True)
)
assert asyncio.run(agent._is_system_admin_context()) is True
user_oper.return_value.async_get_by_id.assert_awaited_once_with(7)
def test_web_agent_channel_supports_streaming_and_attachments():
"""WebAgent 渠道应声明流式、多媒体和文件发送能力。"""
assert ChannelCapabilityManager.supports_capability(
MessageChannel.WebAgent, ChannelCapability.INLINE_BUTTONS
)
assert ChannelCapabilityManager.supports_capability(
MessageChannel.WebAgent, ChannelCapability.CALLBACK_QUERIES
)
assert ChannelCapabilityManager.supports_capability(
MessageChannel.WebAgent, ChannelCapability.MESSAGE_EDITING
)
assert ChannelCapabilityManager.supports_capability(
MessageChannel.WebAgent, ChannelCapability.IMAGES
)
assert ChannelCapabilityManager.supports_capability(
MessageChannel.WebAgent, ChannelCapability.AUDIO_OUTPUT
)
assert ChannelCapabilityManager.supports_capability(
MessageChannel.WebAgent, ChannelCapability.FILE_SENDING
)
def test_build_web_agent_notification_events_extracts_image():
"""Agent 工具发送图片消息时应转换为图片附件事件。"""
events = _build_web_agent_notification_events(
schemas.Notification(
channel=MessageChannel.WebAgent,
mtype=NotificationType.Agent,
title="海报",
text="已找到图片",
image="https://example.com/poster.jpg",
)
)
assert events == [
{"type": "delta", "content": "海报\n\n已找到图片"},
{
"type": "attachment",
"attachment": {
"kind": "image",
"url": "https://example.com/poster.jpg",
"download_url": "https://example.com/poster.jpg",
"name": "海报",
"mime_type": None,
},
},
]
def test_extract_web_agent_notification_supports_wrapped_message_event():
"""NoticeMessage 包装 Notification 时应仍能解析为 WebAgent 通知。"""
notification = schemas.Notification(
channel=MessageChannel.WebAgent,
source="web-agent",
title="会话状态",
userid="1",
)
extracted = _extract_web_agent_notification_from_event_data(
{"message": notification, "current_time": "2026-06-26 09:18:38"}
)
assert extracted == notification
def test_dispatch_web_agent_notice_event_accepts_wrapped_message_event():
"""WebAgent 等待队列应接收 message 包装格式的 NoticeMessage 事件。"""
notice_queue = Queue()
_WEB_AGENT_NOTICE_QUEUES["1"] = [notice_queue]
notification = schemas.Notification(
channel=MessageChannel.WebAgent,
source="web-agent",
title="会话状态",
userid="1",
)
try:
_dispatch_web_agent_notice_event(
Event(
EventType.NoticeMessage,
{"message": notification, "current_time": "2026-06-26 09:18:38"},
)
)
finally:
_WEB_AGENT_NOTICE_QUEUES.pop("1", None)
assert notice_queue.get_nowait() == notification
def test_collect_web_agent_traditional_events_does_not_emit_submit_hint():
"""传统命令未产生通知时不应返回“命令已提交”的兜底提示。"""
user = SimpleNamespace(id=1, name="admin")
with patch(
"app.api.endpoints.agent.MessageChain.handle_message",
), patch(
"app.api.endpoints.agent.WEB_AGENT_TRADITIONAL_IDLE_TIMEOUT_SECONDS",
0.01,
), patch(
"app.api.endpoints.agent.WEB_AGENT_TRADITIONAL_MAX_WAIT_SECONDS",
0.05,
):
events = asyncio.run(
_collect_web_agent_traditional_events(
text="/session_status",
current_user=user,
)
)
assert events == []
def test_build_web_agent_notification_events_registers_local_file(tmp_path):
"""Agent 工具发送本地文件时应生成可下载附件事件。"""
file_path = tmp_path / "report.txt"
file_path.write_text("hello", encoding="utf-8")
events = _build_web_agent_notification_events(
schemas.Notification(
channel=MessageChannel.WebAgent,
mtype=NotificationType.Agent,
file_path=str(file_path),
file_name="report.txt",
)
)
assert len(events) == 1
attachment = events[0]["attachment"]
assert events[0]["type"] == "attachment"
assert attachment["kind"] == "file"
assert attachment["name"] == "report.txt"
assert attachment["mime_type"] == "text/plain"
assert attachment["size"] == 5
assert attachment["url"].startswith("message/agent/file/")
def test_build_web_agent_notification_events_registers_voice_attachment(tmp_path):
"""Agent 工具发送语音时应转换为可播放的音频附件事件。"""
voice_path = tmp_path / "reply.wav"
voice_path.write_bytes(b"wav-bytes")
events = _build_web_agent_notification_events(
schemas.Notification(
channel=MessageChannel.WebAgent,
mtype=NotificationType.Agent,
text="你好",
voice_path=str(voice_path),
)
)
assert len(events) == 2
assert events[0] == {"type": "delta", "content": "你好"}
attachment = events[1]["attachment"]
assert events[1]["type"] == "attachment"
assert attachment["kind"] == "audio"
assert attachment["name"] == "reply.wav"
assert attachment["mime_type"] == "audio/wav"
assert attachment["size"] == len(b"wav-bytes")
assert attachment["url"].startswith("message/agent/file/")
def test_prepare_web_agent_audio_attachment_converts_unsupported_audio(tmp_path):
"""WebAgent 会把浏览器不稳定支持的语音格式转为 WAV 供面板播放。"""
source_path = tmp_path / "reply.opus"
source_path.write_bytes(b"opus-bytes")
converted_path = tmp_path / "voice" / "reply_web_abcdef12.wav"
with patch("app.api.endpoints.agent.shutil.which", return_value="/usr/bin/ffmpeg"), patch(
"app.api.endpoints.agent.uuid.uuid4",
return_value=SimpleNamespace(hex="abcdef1234567890"),
), patch("app.api.endpoints.agent.subprocess.run") as run:
def write_converted_file(*args, **kwargs):
converted_path.write_bytes(b"wav-bytes")
return SimpleNamespace(returncode=0, stderr="")
run.side_effect = write_converted_file
with patch("app.api.endpoints.agent.settings", SimpleNamespace(TEMP_PATH=tmp_path)):
output_path = _prepare_web_agent_audio_attachment_path(str(source_path))
assert output_path == converted_path
assert output_path.read_bytes() == b"wav-bytes"
def test_transcribe_web_agent_audio_refs_reads_registered_upload(tmp_path):
"""WebAgent 上传录音应从临时附件登记表读取并转写为文本。"""
voice_path = tmp_path / "recording.webm"
voice_path.write_bytes(b"webm-bytes")
_WEB_AGENT_FILE_REGISTRY["audio-test"] = {
"path": voice_path,
"name": "recording.webm",
"mime_type": "audio/webm",
"created_at": time.time(),
}
try:
with patch(
"app.api.endpoints.agent.AgentCapabilityManager.is_audio_input_available",
return_value=True,
), patch(
"app.api.endpoints.agent.AgentCapabilityManager.transcribe_audio",
return_value="帮我推荐一部电影",
) as transcribe_audio:
transcript = _transcribe_web_agent_audio_refs(["message/agent/file/audio-test"])
finally:
_WEB_AGENT_FILE_REGISTRY.pop("audio-test", None)
assert transcript == "帮我推荐一部电影"
transcribe_audio.assert_called_once_with(
content=b"webm-bytes",
filename="recording.webm",
)
def test_web_agent_stream_returns_error_when_voice_transcription_fails():
"""仅发送语音且转写失败时应直接返回错误事件。"""
payload = schemas.AgentWebChatRequest(
text="",
session_id="browser-session",
audio_refs=["message/agent/file/missing"],
)
request = SimpleNamespace()
user = SimpleNamespace(id=1, name="admin")
with patch("app.api.endpoints.agent.settings.AI_AGENT_ENABLE", True), patch(
"app.api.endpoints.agent._transcribe_web_agent_audio_refs",
return_value=None,
):
response = asyncio.run(web_agent_stream(payload, request, user))
body = "".join(asyncio.run(_collect_streaming_response(response)))
assert "error" in body
assert "语音识别失败" in body
def test_web_agent_stream_binds_session_to_agent_manager():
"""WebAgent 普通对话应统一进入 AgentManager 并绑定远程命令会话。"""
payload = schemas.AgentWebChatRequest(
text="查看会话",
session_id="browser-session",
)
request = SimpleNamespace(is_disconnected=AsyncMock(return_value=False))
user = SimpleNamespace(id=1, name="admin", is_superuser=True)
class FakeWebAgent:
"""测试用 WebAgent模拟 AgentManager 内部的持久实例。"""
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
self.processed = []
def set_output_callback(self, output_callback):
"""更新当前 SSE 输出回调。"""
self.output_callback = output_callback
def set_notification_callback(self, notification_callback):
"""更新当前 SSE 通知回调。"""
self.notification_callback = notification_callback
async def process(self, message, **kwargs):
"""模拟一次 WebAgent 推理输出。"""
self.processed.append((message, kwargs))
self.output_callback("状态正常")
return "状态正常"
async def cleanup(self):
"""模拟 Agent 资源清理。"""
return None
session_id = _build_web_agent_session_id(user, payload.session_id)
MessageChain._user_sessions.clear()
agent_manager.active_agents.pop(session_id, None)
agent_manager._session_queues.pop(session_id, None)
worker = agent_manager._session_workers.pop(session_id, None)
if worker:
worker.cancel()
try:
with patch("app.api.endpoints.agent.settings.AI_AGENT_ENABLE", True), patch(
"app.api.endpoints.agent._WebAgentMoviePilotAgent",
FakeWebAgent,
):
response = asyncio.run(web_agent_stream(payload, request, user))
body = "".join(asyncio.run(_collect_streaming_response(response)))
assert "状态正常" in body
assert MessageChain._user_sessions["1"][0] == session_id
assert isinstance(agent_manager.active_agents[session_id], FakeWebAgent)
finally:
MessageChain._user_sessions.clear()
agent = agent_manager.active_agents.pop(session_id, None)
if agent:
asyncio.run(agent.cleanup())
agent_manager._session_queues.pop(session_id, None)
worker = agent_manager._session_workers.pop(session_id, None)
if worker:
worker.cancel()
async def _collect_streaming_response(response):
"""读取 StreamingResponse便于断言 SSE 内容。"""
chunks = []
async for chunk in response.body_iterator:
chunks.append(chunk.decode("utf-8") if isinstance(chunk, bytes) else chunk)
return chunks
def test_build_web_agent_notification_events_extracts_choice_card():
"""Agent 按钮通知应转换为 Web 选择卡片事件而非普通文本。"""
events = _build_web_agent_notification_events(
schemas.Notification(
channel=MessageChannel.WebAgent,
mtype=NotificationType.Agent,
title="需要你的选择",
text="请选择要执行的操作",
buttons=[
[
{
"text": "继续下载",
"callback_data": "agent_interaction:choice:req-1:1",
"description": "继续当前下载任务",
}
],
[
{
"text": "查看详情",
"callback_data": "agent_interaction:choice:req-1:2",
}
],
],
)
)
assert events == [
{
"type": "choice",
"choice": {
"id": "req-1",
"title": "需要你的选择",
"prompt": "请选择要执行的操作",
"buttons": [
{
"label": "继续下载",
"callback_data": "agent_interaction:choice:req-1:1",
"description": "继续当前下载任务",
},
{
"label": "查看详情",
"callback_data": "agent_interaction:choice:req-1:2",
},
],
"button_rows": [
[
{
"label": "继续下载",
"callback_data": "agent_interaction:choice:req-1:1",
"description": "继续当前下载任务",
}
],
[
{
"label": "查看详情",
"callback_data": "agent_interaction:choice:req-1:2",
}
],
],
},
}
]
def test_resolve_web_agent_choice_payload_returns_next_message():
"""Web 按钮回调应解析为下一条用户消息并返回卡片反馈。"""
agent_interaction_manager.clear()
request = agent_interaction_manager.create_request(
session_id="web-agent:session",
user_id="1",
channel=MessageChannel.WebAgent.value,
source="web-agent",
username="admin",
title="需要你的选择",
prompt="请选择",
options=[
AgentInteractionOption(label="电影", value="我选择电影"),
AgentInteractionOption(label="电视剧", value="我选择电视剧", description="选择电视剧并继续清理日志"),
],
)
try:
result = _resolve_web_agent_choice_payload(
callback_data=f"agent_interaction:choice:{request.request_id}:2",
user_id="1",
)
finally:
agent_interaction_manager.clear()
assert result["message"] == "我选择电视剧"
assert result["display_message"] == "选择电视剧并继续清理日志"
assert result["session_id"] == "web-agent:session"
assert result["feedback"]["prompt"] == "请选择"
assert result["feedback"]["selected_label"] == "电视剧"
assert result["feedback"]["selected_value"] == "我选择电视剧"
assert result["feedback"]["selected_description"] == "选择电视剧并继续清理日志"
assert result["choice_selection"]["prompt"] == "请选择"
assert result["choice_selection"]["selected_description"] == "选择电视剧并继续清理日志"
assert result["choice_selection"]["button_rows"][1][0]["description"] == "选择电视剧并继续清理日志"