feat: 后台任务(定时唤醒)跳过流式输出,仅广播最终结果

This commit is contained in:
jxxghp
2026-03-25 17:10:48 +08:00
parent 940cececf4
commit 0399ab73cf
2 changed files with 109 additions and 63 deletions

View File

@@ -2,7 +2,7 @@ import asyncio
import traceback
import uuid
from time import strftime
from typing import Dict, List
from typing import Callable, Dict, List
from langchain.agents import create_agent
from langchain.agents.middleware import (
@@ -56,6 +56,13 @@ class MoviePilotAgent:
# 流式token管理
self.stream_handler = StreamingHandler()
@property
def is_background(self) -> bool:
"""
是否为后台任务模式(无渠道信息,如定时唤醒)
"""
return not self.channel and not self.source
@staticmethod
def _initialize_llm():
"""
@@ -155,11 +162,39 @@ class MoviePilotAgent:
await self.send_agent_message(error_message)
return error_message
@staticmethod
async def _stream_agent_tokens(
agent, messages: dict, config: dict, on_token: Callable[[str], None]
):
"""
流式运行智能体过滤工具调用token将模型生成的内容通过回调输出。
:param agent: LangGraph Agent 实例
:param messages: Agent 输入消息
:param config: Agent 运行配置
:param on_token: 收到有效 token 时的回调
"""
async for chunk in agent.astream(
messages,
stream_mode="messages",
config=config,
subgraphs=False,
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
if (
token
and hasattr(token, "tool_call_chunks")
and not token.tool_call_chunks
):
if token.content:
on_token(token.content)
async def _execute_agent(self, messages: List[BaseMessage]):
"""
调用 LangGraph Agent通过 astream_events 流式获取 token
同时用 UsageMetadataCallbackHandler 统计 token 用量。
调用 LangGraph Agent通过 astream 流式获取 token
支持流式输出:在支持消息编辑的渠道上实时推送 token。
后台任务模式(无渠道信息):不进行流式输出,仅广播最终结果。
"""
try:
# Agent运行配置
@@ -172,48 +207,54 @@ class MoviePilotAgent:
# 创建智能体
agent = self._create_agent()
# 启动流式输出(内部会检查渠道是否支持消息编辑)
await self.stream_handler.start_streaming(
channel=self.channel,
source=self.source,
user_id=self.user_id,
username=self.username,
)
if self.is_background:
# 后台任务模式:不需要流式输出,只收集最终结果
collected: List[str] = []
# 流式运行智能体
async for chunk in agent.astream(
{"messages": messages},
stream_mode="messages",
config=agent_config,
subgraphs=False,
version="v2",
):
# 处理流式token过滤工具调用token只保留模型生成的内容
if chunk["type"] == "messages":
token, metadata = chunk["data"]
if (
token
and hasattr(token, "tool_call_chunks")
and not token.tool_call_chunks
):
if token.content:
self.stream_handler.emit(token.content)
await self._stream_agent_tokens(
agent=agent,
messages={"messages": messages},
config=agent_config,
on_token=lambda t: collected.append(t),
)
# 停止流式输出,返回是否已通过流式编辑发送了所有内容及最终文本
(
all_sent_via_stream,
streamed_text,
) = await self.stream_handler.stop_streaming()
# 后台任务仅广播最终结果,带标题
final_text = "".join(collected)
if final_text:
await self.send_agent_message(final_text, title="MoviePilot助手")
if not all_sent_via_stream:
# 流式输出未能发送全部内容(渠道不支持编辑,或发送失败)
# 通过常规方式发送剩余内容
remaining_text = await self.stream_handler.take()
if remaining_text:
await self.send_agent_message(remaining_text)
elif streamed_text:
# 流式输出已发送全部内容,但未记录到数据库,补充保存消息记录
await self._save_agent_message_to_db(streamed_text)
else:
# 正常渠道模式:启动流式输出
await self.stream_handler.start_streaming(
channel=self.channel,
source=self.source,
user_id=self.user_id,
username=self.username,
)
# 流式运行智能体token 直接推送到 stream_handler
await self._stream_agent_tokens(
agent=agent,
messages={"messages": messages},
config=agent_config,
on_token=self.stream_handler.emit,
)
# 停止流式输出,返回是否已通过流式编辑发送了所有内容及最终文本
(
all_sent_via_stream,
streamed_text,
) = await self.stream_handler.stop_streaming()
if not all_sent_via_stream:
# 流式输出未能发送全部内容(渠道不支持编辑,或发送失败)
# 通过常规方式发送剩余内容
remaining_text = await self.stream_handler.take()
if remaining_text:
await self.send_agent_message(remaining_text)
elif streamed_text:
# 流式输出已发送全部内容,但未记录到数据库,补充保存消息记录
await self._save_agent_message_to_db(streamed_text)
# 保存消息
memory_manager.save_agent_messages(
@@ -223,15 +264,15 @@ class MoviePilotAgent:
)
except asyncio.CancelledError:
# 确保取消时也停止流式输出
await self.stream_handler.stop_streaming()
logger.info(f"Agent执行被取消: session_id={self.session_id}")
return "任务已取消", {}
except Exception as e:
# 确保异常时也停止流式输出
await self.stream_handler.stop_streaming()
logger.error(f"Agent执行失败: {e} - {traceback.format_exc()}")
return str(e), {}
finally:
# 确保停止流式输出
if not self.is_background:
await self.stream_handler.stop_streaming()
async def send_agent_message(self, message: str, title: str = ""):
"""

View File

@@ -43,6 +43,9 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta):
3. 调用具体工具逻辑(子类实现的 execute 方法)
4. 持久化工具结果到会话记忆
"""
# 判断是否为后台任务模式(无渠道信息,如定时唤醒)
is_background = not self._channel and not self._source
# 获取工具执行提示消息
tool_message = self.get_tool_message(**kwargs)
if not tool_message:
@@ -50,25 +53,27 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta):
if explanation:
tool_message = explanation
if self._stream_handler and self._stream_handler.is_streaming:
# 流式渠道:工具消息直接追加到 buffer 中,与 Agent 文字合并为同一条流式消息
if tool_message:
self._stream_handler.emit(f"\n\n⚙️ => {tool_message}\n\n")
else:
# 非流式渠道:保持原有行为,取出 Agent 文字 + 工具消息合并独立发送
agent_message = (
await self._stream_handler.take() if self._stream_handler else ""
)
if not is_background:
# 非后台模式:发送工具执行过程消息
if self._stream_handler and self._stream_handler.is_streaming:
# 流式渠道:工具消息直接追加到 buffer 中,与 Agent 文字合并为同一条流式消息
if tool_message:
self._stream_handler.emit(f"\n\n⚙️ => {tool_message}\n\n")
else:
# 非流式渠道:保持原有行为,取出 Agent 文字 + 工具消息合并独立发送
agent_message = (
await self._stream_handler.take() if self._stream_handler else ""
)
messages = []
if agent_message:
messages.append(agent_message)
if tool_message:
messages.append(f"⚙️ => {tool_message}")
messages = []
if agent_message:
messages.append(agent_message)
if tool_message:
messages.append(f"⚙️ => {tool_message}")
if messages:
merged_message = "\n\n".join(messages)
await self.send_tool_message(merged_message)
if messages:
merged_message = "\n\n".join(messages)
await self.send_tool_message(merged_message)
logger.debug(f"Executing tool {self.name} with args: {kwargs}")