fix: 修复Agent流式输出时回复消息未记录到数据库的问题

This commit is contained in:
jxxghp
2026-03-25 07:01:17 +08:00
parent 94ed065344
commit e6e50d7f0a
2 changed files with 59 additions and 29 deletions

View File

@@ -38,12 +38,12 @@ class MoviePilotAgent:
"""
def __init__(
self,
session_id: str,
user_id: str = None,
channel: str = None,
source: str = None,
username: str = None,
self,
session_id: str,
user_id: str = None,
channel: str = None,
source: str = None,
username: str = None,
):
self.session_id = session_id
self.user_id = user_id
@@ -176,25 +176,28 @@ class MoviePilotAgent:
# 流式运行智能体
async for chunk in agent.astream(
{"messages": messages},
stream_mode="messages",
config=agent_config,
subgraphs=False,
version="v2",
{"messages": messages},
stream_mode="messages",
config=agent_config,
subgraphs=False,
version="v2",
):
# 处理流式token过滤工具调用token只保留模型生成的内容
if chunk["type"] == "messages":
token, metadata = chunk["data"]
if (
token
and hasattr(token, "tool_call_chunks")
and not token.tool_call_chunks
token
and hasattr(token, "tool_call_chunks")
and not token.tool_call_chunks
):
if token.content:
self.stream_handler.emit(token.content)
# 停止流式输出,返回是否已通过流式编辑发送了所有内容
all_sent_via_stream = await self.stream_handler.stop_streaming()
# 停止流式输出,返回是否已通过流式编辑发送了所有内容及最终文本
(
all_sent_via_stream,
streamed_text,
) = await self.stream_handler.stop_streaming()
if not all_sent_via_stream:
# 流式输出未能发送全部内容(渠道不支持编辑,或发送失败)
@@ -202,6 +205,9 @@ class MoviePilotAgent:
remaining_text = await self.stream_handler.take()
if remaining_text:
await self.send_agent_message(remaining_text)
elif streamed_text:
# 流式输出已发送全部内容,但未记录到数据库,补充保存消息记录
await self._save_agent_message_to_db(streamed_text)
# 保存消息
memory_manager.save_agent_messages(
@@ -236,6 +242,26 @@ class MoviePilotAgent:
)
)
async def _save_agent_message_to_db(self, message: str, title: str = ""):
"""
仅保存Agent回复消息到数据库和SSE队列不重新发送到渠道
用于流式输出场景:消息已通过 send_direct_message/edit_message 发送给用户,
但未记录到数据库中,此方法补充保存消息历史记录。
"""
chain = AgentChain()
notification = Notification(
channel=self.channel,
source=self.source,
userid=self.user_id,
username=self.username,
title=title,
text=message,
)
# 保存到SSE消息队列供前端展示
chain.messagehelper.put(notification, role="user", title=title)
# 保存到数据库
await chain.messageoper.async_add(**notification.model_dump())
async def cleanup(self):
"""
清理智能体资源
@@ -268,13 +294,13 @@ class AgentManager:
self.active_agents.clear()
async def process_message(
self,
session_id: str,
user_id: str,
message: str,
channel: str = None,
source: str = None,
username: str = None,
self,
session_id: str,
user_id: str,
message: str,
channel: str = None,
source: str = None,
username: str = None,
) -> str:
"""
处理用户消息

View File

@@ -1,6 +1,6 @@
import asyncio
import threading
from typing import Optional
from typing import Optional, Tuple
from app.chain import ChainBase
from app.log import logger
@@ -127,14 +127,16 @@ class StreamingHandler:
self._flush_task = asyncio.create_task(self._flush_loop())
logger.debug("流式输出已启动")
async def stop_streaming(self) -> bool:
async def stop_streaming(self) -> Tuple[bool, str]:
"""
停止流式输出。执行最后一次刷新确保所有内容都已发送。
:return: 是否已经通过流式编辑将最终完整内容发送给了用户
True 表示调用方无需再额外发送消息)
:return: (all_sent, final_text)
all_sent: 是否已经通过流式编辑将最终完整内容发送给了用户
True 表示调用方无需再额外发送消息)
final_text: 流式发送的完整文本内容(用于调用方保存消息记录)
"""
if not self._streaming_enabled:
return False
return False, ""
self._streaming_enabled = False
@@ -151,13 +153,15 @@ class StreamingHandler:
and self._sent_text
and self._buffer == self._sent_text
)
# 保留最终文本用于返回
final_text = self._sent_text if all_sent else ""
# 重置状态
self._sent_text = ""
self._message_response = None
if all_sent:
# 所有内容已通过流式发送,清空缓冲区
self._buffer = ""
return all_sent
return all_sent, final_text
def _can_stream(self) -> bool:
"""