diff --git a/app/agent/__init__.py b/app/agent/__init__.py index 2342e43a..cb0e3e00 100644 --- a/app/agent/__init__.py +++ b/app/agent/__init__.py @@ -30,6 +30,8 @@ from app.core.config import settings from app.helper.llm import LLMHelper from app.log import logger from app.schemas import Notification, NotificationType +from app.schemas.message import ChannelCapabilityManager, ChannelCapability +from app.schemas.types import MessageChannel class AgentChain(ChainBase): @@ -138,12 +140,37 @@ class MoviePilotAgent: """ return not self.channel and not self.source + def _should_stream(self) -> bool: + """ + 判断是否应启用流式输出: + - 后台模式不启用流式输出 + - 渠道支持消息编辑:启用流式输出(实时推送 token) + - 渠道不支持消息编辑但开启了啰嗦模式:也需要启用流式输出, + 以便在工具调用前捕获 Agent 的中间文字并随工具消息一起发送 + - 其他情况不启用流式输出 + """ + if self.is_background: + return False + if not self.channel: + return False + # 啰嗦模式下始终需要流式输出来捕获工具调用前的 Agent 文字 + if settings.AI_AGENT_VERBOSE: + return True + try: + channel_enum = MessageChannel(self.channel) + return ChannelCapabilityManager.supports_capability( + channel_enum, ChannelCapability.MESSAGE_EDITING + ) + except (ValueError, KeyError): + return False + @staticmethod - def _initialize_llm(): + def _initialize_llm(streaming: bool = False): """ - 初始化 LLM(带流式回调) + 初始化 LLM + :param streaming: 是否启用流式输出 """ - return LLMHelper.get_llm(streaming=True) + return LLMHelper.get_llm(streaming=streaming) @staticmethod def _extract_text_content(content) -> str: @@ -191,16 +218,17 @@ class MoviePilotAgent: stream_handler=self.stream_handler, ) - def _create_agent(self): + def _create_agent(self, streaming: bool = False): """ 创建 LangGraph Agent(使用 create_agent + SummarizationMiddleware) + :param streaming: 是否启用流式输出 """ try: # 系统提示词 system_prompt = prompt_manager.get_agent_prompt(channel=self.channel) # LLM 模型(用于 agent 执行) - llm = self._initialize_llm() + llm = self._initialize_llm(streaming=streaming) # 工具列表 tools = self._initialize_tools() @@ -344,9 +372,11 @@ class MoviePilotAgent: async def _execute_agent(self, messages: List[BaseMessage]): """ - 调用 LangGraph Agent,通过 astream 流式获取 token。 - 支持流式输出:在支持消息编辑的渠道上实时推送 token。 - 后台任务模式(无渠道信息):不进行流式输出,仅广播最终结果。 + 调用 LangGraph Agent 执行推理。 + 根据运行环境选择不同的执行模式: + - 后台任务模式(无渠道信息):非流式 LLM + ainvoke,仅广播最终结果 + - 渠道不支持消息编辑:非流式 LLM + ainvoke,完成后发送最终回复 + - 渠道支持消息编辑:流式 LLM + astream,实时推送 token """ try: # Agent运行配置 @@ -356,39 +386,14 @@ class MoviePilotAgent: } } - # 创建智能体 - agent = self._create_agent() + # 判断是否启用流式输出 + use_streaming = self._should_stream() - if self.is_background: - # 后台任务模式:非流式执行,等待完成后只取最后一条AI回复 - await agent.ainvoke( - {"messages": messages}, - config=agent_config, - ) + # 创建智能体(根据是否流式传入不同 LLM) + agent = self._create_agent(streaming=use_streaming) - # 从最终状态中提取最后一条AI回复内容 - final_messages = agent.get_state(agent_config).values.get( - "messages", [] - ) - final_text = "" - for msg in reversed(final_messages): - if hasattr(msg, "type") and msg.type == "ai" and msg.content: - # 过滤掉思考/推理内容,只提取纯文本 - text = self._extract_text_content(msg.content) - if text: - # 过滤掉包含在 标签中的内容 - text = re.sub( - r".*?(?:|$)", "", text, flags=re.DOTALL - ) - final_text = text.strip() - break - - # 后台任务仅广播最终回复,带标题 - if final_text: - await self.send_agent_message(final_text, title="MoviePilot助手") - - else: - # 正常渠道模式:启动流式输出 + if use_streaming: + # 流式模式:渠道支持消息编辑,启动流式输出实时推送 token await self.stream_handler.start_streaming( channel=self.channel, source=self.source, @@ -411,7 +416,7 @@ class MoviePilotAgent: ) = await self.stream_handler.stop_streaming() if not all_sent_via_stream: - # 流式输出未能发送全部内容(渠道不支持编辑,或发送失败) + # 流式输出未能发送全部内容(发送失败等) # 通过常规方式发送剩余内容 remaining_text = await self.stream_handler.take() if remaining_text: @@ -420,6 +425,40 @@ class MoviePilotAgent: # 流式输出已发送全部内容,但未记录到数据库,补充保存消息记录 await self._save_agent_message_to_db(streamed_text) + else: + # 非流式模式:后台任务或渠道不支持消息编辑 + await agent.ainvoke( + {"messages": messages}, + config=agent_config, + ) + + # 从最终状态中提取最后一条AI回复内容 + final_messages = agent.get_state(agent_config).values.get( + "messages", [] + ) + final_text = "" + for msg in reversed(final_messages): + if hasattr(msg, "type") and msg.type == "ai" and msg.content: + # 过滤掉思考/推理内容,只提取纯文本 + text = self._extract_text_content(msg.content) + if text: + # 过滤掉包含在 标签中的内容 + text = re.sub( + r".*?(?:|$)", "", text, flags=re.DOTALL + ) + final_text = text.strip() + break + + if final_text: + if self.is_background: + # 后台任务仅广播最终回复,带标题 + await self.send_agent_message( + final_text, title="MoviePilot助手" + ) + else: + # 非流式渠道:发送最终回复 + await self.send_agent_message(final_text) + # 保存消息 memory_manager.save_agent_messages( session_id=self.session_id, @@ -435,8 +474,7 @@ class MoviePilotAgent: return str(e), {} finally: # 确保停止流式输出 - if not self.is_background: - await self.stream_handler.stop_streaming() + await self.stream_handler.stop_streaming() async def send_agent_message(self, message: str, title: str = ""): """ diff --git a/app/agent/tools/base.py b/app/agent/tools/base.py index 691c0676..a9a48eca 100644 --- a/app/agent/tools/base.py +++ b/app/agent/tools/base.py @@ -72,22 +72,27 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta): # 非VERBOSE,重置缓冲区从头更新,保持消息编辑能力 self._stream_handler.reset() else: - # 后台模式(无渠道信息)不发送工具调用消息 + # 非流式模式(后台任务或渠道不支持消息编辑) if self._channel: - # 非流式渠道:保持原有行为,取出 Agent 文字 + 工具消息合并独立发送 - agent_message = ( - await self._stream_handler.take() if self._stream_handler else "" - ) - - messages = [] - if agent_message: - messages.append(agent_message) - if tool_message: - messages.append(f"⚙️ => {tool_message}") - - if messages: - merged_message = "\n\n".join(messages) - await self.send_tool_message(merged_message) + if settings.AI_AGENT_VERBOSE: + # 啰嗦模式:取出 Agent 文字 + 工具消息合并发送 + agent_message = ( + await self._stream_handler.take() + if self._stream_handler + else "" + ) + messages = [] + if agent_message: + messages.append(agent_message) + if tool_message: + messages.append(f"⚙️ => {tool_message}") + if messages: + merged_message = "\n\n".join(messages) + await self.send_tool_message(merged_message) + else: + # 非啰嗦模式:不发送中间消息,清掉缓冲区 + if self._stream_handler: + await self._stream_handler.take() logger.debug(f"Executing tool {self.name} with args: {kwargs}")