diff --git a/app/agent/__init__.py b/app/agent/__init__.py index 7291b43c..d2369380 100644 --- a/app/agent/__init__.py +++ b/app/agent/__init__.py @@ -38,12 +38,12 @@ class MoviePilotAgent: """ def __init__( - self, - session_id: str, - user_id: str = None, - channel: str = None, - source: str = None, - username: str = None, + self, + session_id: str, + user_id: str = None, + channel: str = None, + source: str = None, + username: str = None, ): self.session_id = session_id self.user_id = user_id @@ -176,25 +176,28 @@ class MoviePilotAgent: # 流式运行智能体 async for chunk in agent.astream( - {"messages": messages}, - stream_mode="messages", - config=agent_config, - subgraphs=False, - version="v2", + {"messages": messages}, + stream_mode="messages", + config=agent_config, + subgraphs=False, + version="v2", ): # 处理流式token(过滤工具调用token,只保留模型生成的内容) if chunk["type"] == "messages": token, metadata = chunk["data"] if ( - token - and hasattr(token, "tool_call_chunks") - and not token.tool_call_chunks + token + and hasattr(token, "tool_call_chunks") + and not token.tool_call_chunks ): if token.content: self.stream_handler.emit(token.content) - # 停止流式输出,返回是否已通过流式编辑发送了所有内容 - all_sent_via_stream = await self.stream_handler.stop_streaming() + # 停止流式输出,返回是否已通过流式编辑发送了所有内容及最终文本 + ( + all_sent_via_stream, + streamed_text, + ) = await self.stream_handler.stop_streaming() if not all_sent_via_stream: # 流式输出未能发送全部内容(渠道不支持编辑,或发送失败) @@ -202,6 +205,9 @@ class MoviePilotAgent: remaining_text = await self.stream_handler.take() if remaining_text: await self.send_agent_message(remaining_text) + elif streamed_text: + # 流式输出已发送全部内容,但未记录到数据库,补充保存消息记录 + await self._save_agent_message_to_db(streamed_text) # 保存消息 memory_manager.save_agent_messages( @@ -236,6 +242,26 @@ class MoviePilotAgent: ) ) + async def _save_agent_message_to_db(self, message: str, title: str = ""): + """ + 仅保存Agent回复消息到数据库和SSE队列(不重新发送到渠道) + 用于流式输出场景:消息已通过 send_direct_message/edit_message 发送给用户, + 但未记录到数据库中,此方法补充保存消息历史记录。 + """ + chain = AgentChain() + notification = Notification( + channel=self.channel, + source=self.source, + userid=self.user_id, + username=self.username, + title=title, + text=message, + ) + # 保存到SSE消息队列(供前端展示) + chain.messagehelper.put(notification, role="user", title=title) + # 保存到数据库 + await chain.messageoper.async_add(**notification.model_dump()) + async def cleanup(self): """ 清理智能体资源 @@ -268,13 +294,13 @@ class AgentManager: self.active_agents.clear() async def process_message( - self, - session_id: str, - user_id: str, - message: str, - channel: str = None, - source: str = None, - username: str = None, + self, + session_id: str, + user_id: str, + message: str, + channel: str = None, + source: str = None, + username: str = None, ) -> str: """ 处理用户消息 diff --git a/app/agent/callback/__init__.py b/app/agent/callback/__init__.py index b2ef68a1..c8d837e0 100644 --- a/app/agent/callback/__init__.py +++ b/app/agent/callback/__init__.py @@ -1,6 +1,6 @@ import asyncio import threading -from typing import Optional +from typing import Optional, Tuple from app.chain import ChainBase from app.log import logger @@ -127,14 +127,16 @@ class StreamingHandler: self._flush_task = asyncio.create_task(self._flush_loop()) logger.debug("流式输出已启动") - async def stop_streaming(self) -> bool: + async def stop_streaming(self) -> Tuple[bool, str]: """ 停止流式输出。执行最后一次刷新确保所有内容都已发送。 - :return: 是否已经通过流式编辑将最终完整内容发送给了用户 - (True 表示调用方无需再额外发送消息) + :return: (all_sent, final_text) + all_sent: 是否已经通过流式编辑将最终完整内容发送给了用户 + (True 表示调用方无需再额外发送消息) + final_text: 流式发送的完整文本内容(用于调用方保存消息记录) """ if not self._streaming_enabled: - return False + return False, "" self._streaming_enabled = False @@ -151,13 +153,15 @@ class StreamingHandler: and self._sent_text and self._buffer == self._sent_text ) + # 保留最终文本用于返回 + final_text = self._sent_text if all_sent else "" # 重置状态 self._sent_text = "" self._message_response = None if all_sent: # 所有内容已通过流式发送,清空缓冲区 self._buffer = "" - return all_sent + return all_sent, final_text def _can_stream(self) -> bool: """