From 0399ab73cf58f9b95cd6d9d27334fcda87e19c1b Mon Sep 17 00:00:00 2001 From: jxxghp Date: Wed, 25 Mar 2026 17:10:48 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=90=8E=E5=8F=B0=E4=BB=BB=E5=8A=A1(?= =?UTF-8?q?=E5=AE=9A=E6=97=B6=E5=94=A4=E9=86=92)=E8=B7=B3=E8=BF=87?= =?UTF-8?q?=E6=B5=81=E5=BC=8F=E8=BE=93=E5=87=BA=EF=BC=8C=E4=BB=85=E5=B9=BF?= =?UTF-8?q?=E6=92=AD=E6=9C=80=E7=BB=88=E7=BB=93=E6=9E=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/agent/__init__.py | 133 ++++++++++++++++++++++++++-------------- app/agent/tools/base.py | 39 +++++++----- 2 files changed, 109 insertions(+), 63 deletions(-) diff --git a/app/agent/__init__.py b/app/agent/__init__.py index 49bfe2ad..9b6153cd 100644 --- a/app/agent/__init__.py +++ b/app/agent/__init__.py @@ -2,7 +2,7 @@ import asyncio import traceback import uuid from time import strftime -from typing import Dict, List +from typing import Callable, Dict, List from langchain.agents import create_agent from langchain.agents.middleware import ( @@ -56,6 +56,13 @@ class MoviePilotAgent: # 流式token管理 self.stream_handler = StreamingHandler() + @property + def is_background(self) -> bool: + """ + 是否为后台任务模式(无渠道信息,如定时唤醒) + """ + return not self.channel and not self.source + @staticmethod def _initialize_llm(): """ @@ -155,11 +162,39 @@ class MoviePilotAgent: await self.send_agent_message(error_message) return error_message + @staticmethod + async def _stream_agent_tokens( + agent, messages: dict, config: dict, on_token: Callable[[str], None] + ): + """ + 流式运行智能体,过滤工具调用token,将模型生成的内容通过回调输出。 + :param agent: LangGraph Agent 实例 + :param messages: Agent 输入消息 + :param config: Agent 运行配置 + :param on_token: 收到有效 token 时的回调 + """ + async for chunk in agent.astream( + messages, + stream_mode="messages", + config=config, + subgraphs=False, + version="v2", + ): + if chunk["type"] == "messages": + token, metadata = chunk["data"] + if ( + token + and hasattr(token, "tool_call_chunks") + and not token.tool_call_chunks + ): + if token.content: + on_token(token.content) + async def _execute_agent(self, messages: List[BaseMessage]): """ - 调用 LangGraph Agent,通过 astream_events 流式获取 token, - 同时用 UsageMetadataCallbackHandler 统计 token 用量。 + 调用 LangGraph Agent,通过 astream 流式获取 token。 支持流式输出:在支持消息编辑的渠道上实时推送 token。 + 后台任务模式(无渠道信息):不进行流式输出,仅广播最终结果。 """ try: # Agent运行配置 @@ -172,48 +207,54 @@ class MoviePilotAgent: # 创建智能体 agent = self._create_agent() - # 启动流式输出(内部会检查渠道是否支持消息编辑) - await self.stream_handler.start_streaming( - channel=self.channel, - source=self.source, - user_id=self.user_id, - username=self.username, - ) + if self.is_background: + # 后台任务模式:不需要流式输出,只收集最终结果 + collected: List[str] = [] - # 流式运行智能体 - async for chunk in agent.astream( - {"messages": messages}, - stream_mode="messages", - config=agent_config, - subgraphs=False, - version="v2", - ): - # 处理流式token(过滤工具调用token,只保留模型生成的内容) - if chunk["type"] == "messages": - token, metadata = chunk["data"] - if ( - token - and hasattr(token, "tool_call_chunks") - and not token.tool_call_chunks - ): - if token.content: - self.stream_handler.emit(token.content) + await self._stream_agent_tokens( + agent=agent, + messages={"messages": messages}, + config=agent_config, + on_token=lambda t: collected.append(t), + ) - # 停止流式输出,返回是否已通过流式编辑发送了所有内容及最终文本 - ( - all_sent_via_stream, - streamed_text, - ) = await self.stream_handler.stop_streaming() + # 后台任务仅广播最终结果,带标题 + final_text = "".join(collected) + if final_text: + await self.send_agent_message(final_text, title="MoviePilot助手") - if not all_sent_via_stream: - # 流式输出未能发送全部内容(渠道不支持编辑,或发送失败) - # 通过常规方式发送剩余内容 - remaining_text = await self.stream_handler.take() - if remaining_text: - await self.send_agent_message(remaining_text) - elif streamed_text: - # 流式输出已发送全部内容,但未记录到数据库,补充保存消息记录 - await self._save_agent_message_to_db(streamed_text) + else: + # 正常渠道模式:启动流式输出 + await self.stream_handler.start_streaming( + channel=self.channel, + source=self.source, + user_id=self.user_id, + username=self.username, + ) + + # 流式运行智能体,token 直接推送到 stream_handler + await self._stream_agent_tokens( + agent=agent, + messages={"messages": messages}, + config=agent_config, + on_token=self.stream_handler.emit, + ) + + # 停止流式输出,返回是否已通过流式编辑发送了所有内容及最终文本 + ( + all_sent_via_stream, + streamed_text, + ) = await self.stream_handler.stop_streaming() + + if not all_sent_via_stream: + # 流式输出未能发送全部内容(渠道不支持编辑,或发送失败) + # 通过常规方式发送剩余内容 + remaining_text = await self.stream_handler.take() + if remaining_text: + await self.send_agent_message(remaining_text) + elif streamed_text: + # 流式输出已发送全部内容,但未记录到数据库,补充保存消息记录 + await self._save_agent_message_to_db(streamed_text) # 保存消息 memory_manager.save_agent_messages( @@ -223,15 +264,15 @@ class MoviePilotAgent: ) except asyncio.CancelledError: - # 确保取消时也停止流式输出 - await self.stream_handler.stop_streaming() logger.info(f"Agent执行被取消: session_id={self.session_id}") return "任务已取消", {} except Exception as e: - # 确保异常时也停止流式输出 - await self.stream_handler.stop_streaming() logger.error(f"Agent执行失败: {e} - {traceback.format_exc()}") return str(e), {} + finally: + # 确保停止流式输出 + if not self.is_background: + await self.stream_handler.stop_streaming() async def send_agent_message(self, message: str, title: str = ""): """ diff --git a/app/agent/tools/base.py b/app/agent/tools/base.py index 66713879..6413f2ef 100644 --- a/app/agent/tools/base.py +++ b/app/agent/tools/base.py @@ -43,6 +43,9 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta): 3. 调用具体工具逻辑(子类实现的 execute 方法) 4. 持久化工具结果到会话记忆 """ + # 判断是否为后台任务模式(无渠道信息,如定时唤醒) + is_background = not self._channel and not self._source + # 获取工具执行提示消息 tool_message = self.get_tool_message(**kwargs) if not tool_message: @@ -50,25 +53,27 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta): if explanation: tool_message = explanation - if self._stream_handler and self._stream_handler.is_streaming: - # 流式渠道:工具消息直接追加到 buffer 中,与 Agent 文字合并为同一条流式消息 - if tool_message: - self._stream_handler.emit(f"\n\n⚙️ => {tool_message}\n\n") - else: - # 非流式渠道:保持原有行为,取出 Agent 文字 + 工具消息合并独立发送 - agent_message = ( - await self._stream_handler.take() if self._stream_handler else "" - ) + if not is_background: + # 非后台模式:发送工具执行过程消息 + if self._stream_handler and self._stream_handler.is_streaming: + # 流式渠道:工具消息直接追加到 buffer 中,与 Agent 文字合并为同一条流式消息 + if tool_message: + self._stream_handler.emit(f"\n\n⚙️ => {tool_message}\n\n") + else: + # 非流式渠道:保持原有行为,取出 Agent 文字 + 工具消息合并独立发送 + agent_message = ( + await self._stream_handler.take() if self._stream_handler else "" + ) - messages = [] - if agent_message: - messages.append(agent_message) - if tool_message: - messages.append(f"⚙️ => {tool_message}") + messages = [] + if agent_message: + messages.append(agent_message) + if tool_message: + messages.append(f"⚙️ => {tool_message}") - if messages: - merged_message = "\n\n".join(messages) - await self.send_tool_message(merged_message) + if messages: + merged_message = "\n\n".join(messages) + await self.send_tool_message(merged_message) logger.debug(f"Executing tool {self.name} with args: {kwargs}")