From 1e338e48ab9fc7270a4d565099b3e5deefacfc71 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Tue, 7 Apr 2026 12:42:46 +0800 Subject: [PATCH] =?UTF-8?q?fix(agent):=20=E5=9F=BA=E4=BA=8Elanggraph=5Fste?= =?UTF-8?q?p=E8=BF=87=E6=BB=A4=E4=B8=AD=E9=97=B4=E6=AD=A5=E9=AA=A4?= =?UTF-8?q?=E6=80=9D=E8=80=83=E6=96=87=E6=9C=AC=EF=BC=8C=E6=8A=BD=E7=A6=BB?= =?UTF-8?q?ThinkTagStripper=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 利用metadata中的langgraph_step检测工具调用前的中间步骤,非VERBOSE模式下 自动reset清除模型输出的计划/推理文本(如NEXT STEPS、tool call描述等) - 将标签流式剥离逻辑抽离为独立的_ThinkTagStripper类,简化主流程 --- app/agent/__init__.py | 166 ++++++++++++++++++++++++++++-------------- 1 file changed, 112 insertions(+), 54 deletions(-) diff --git a/app/agent/__init__.py b/app/agent/__init__.py index dc90c9cb..dfbab507 100644 --- a/app/agent/__init__.py +++ b/app/agent/__init__.py @@ -36,6 +36,79 @@ class AgentChain(ChainBase): pass +class _ThinkTagStripper: + """ + 流式剥离 ... 标签的辅助类。 + 维护内部缓冲区,处理标签跨 token 边界被截断的情况。 + """ + + def __init__(self): + self.buffer = "" + self.in_think_tag = False + + def reset(self): + """重置状态""" + self.buffer = "" + self.in_think_tag = False + + def process(self, text: str, on_output: Callable[[str], None]): + """ + 将新文本送入处理,剥离 标签后通过 on_output 回调输出。 + :param text: 新增的文本片段 + :param on_output: 输出回调,接收过滤后的文本 + :return: 本次调用是否通过 on_output 输出了内容 + """ + self.buffer += text + emitted = False + while self.buffer: + if not self.in_think_tag: + start_idx = self.buffer.find("") + if start_idx != -1: + if start_idx > 0: + on_output(self.buffer[:start_idx]) + emitted = True + self.in_think_tag = True + self.buffer = self.buffer[start_idx + 7:] + else: + # 检查是否以 的不完整前缀结尾 + partial_match = False + for i in range(6, 0, -1): + if self.buffer.endswith(""[:i]): + if len(self.buffer) > i: + on_output(self.buffer[:-i]) + emitted = True + self.buffer = self.buffer[-i:] + partial_match = True + break + if not partial_match: + on_output(self.buffer) + emitted = True + self.buffer = "" + else: + end_idx = self.buffer.find("") + if end_idx != -1: + self.in_think_tag = False + self.buffer = self.buffer[end_idx + 8:] + else: + # 检查是否以 的不完整前缀结尾 + partial_match = False + for i in range(7, 0, -1): + if self.buffer.endswith(""[:i]): + self.buffer = self.buffer[-i:] + partial_match = True + break + if not partial_match: + self.buffer = "" + break + return emitted + + def flush(self, on_output: Callable[[str], None]): + """流式结束时,输出缓冲区中剩余的非思考内容""" + if self.buffer and not self.in_think_tag: + on_output(self.buffer) + self.buffer = "" + + class MoviePilotAgent: """ MoviePilot AI智能体(基于 LangChain v1 + LangGraph) @@ -218,8 +291,11 @@ class MoviePilotAgent: :param config: Agent 运行配置 :param on_token: 收到有效 token 时的回调 """ - in_think_tag = False - buffer = "" + stripper = _ThinkTagStripper() + # 非VERBOSE模式下,跟踪当前langgraph_step以检测中间步骤的模型输出 + # 当模型在工具调用之前输出的"计划/思考"文本,会在检测到tool_call时被清除 + current_model_step = -1 + has_emitted_in_step = False async for chunk in agent.astream( messages, @@ -230,59 +306,41 @@ class MoviePilotAgent: ): if chunk["type"] == "messages": token, metadata = chunk["data"] - if ( - token - and hasattr(token, "tool_call_chunks") - and not token.tool_call_chunks - ): - # 跳过模型思考/推理内容(如 DeepSeek R1 的 reasoning_content) - additional = getattr(token, "additional_kwargs", None) - if additional and additional.get("reasoning_content"): - continue - if token.content: - # content 可能是字符串或内容块列表,过滤掉思考类型的块 - content = self._extract_text_content(token.content) - if content: - buffer += content - while buffer: - if not in_think_tag: - start_idx = buffer.find("") - if start_idx != -1: - if start_idx > 0: - on_token(buffer[:start_idx]) - in_think_tag = True - buffer = buffer[start_idx + 7:] - else: - # 检查是否以 的前缀结尾 - partial_match = False - for i in range(6, 0, -1): - if buffer.endswith(""[:i]): - if len(buffer) > i: - on_token(buffer[:-i]) - buffer = buffer[-i:] - partial_match = True - break - if not partial_match: - on_token(buffer) - buffer = "" - else: - end_idx = buffer.find("") - if end_idx != -1: - in_think_tag = False - buffer = buffer[end_idx + 8:] - else: - # 检查是否以 的前缀结尾 - partial_match = False - for i in range(7, 0, -1): - if buffer.endswith(""[:i]): - buffer = buffer[-i:] - partial_match = True - break - if not partial_match: - buffer = "" + if not token or not hasattr(token, "tool_call_chunks"): + continue - if buffer and not in_think_tag: - on_token(buffer) + # 获取当前步骤信息 + step = metadata.get("langgraph_step", -1) if metadata else -1 + + if token.tool_call_chunks: + # 检测到工具调用token:说明当前步骤是中间步骤 + # 非VERBOSE模式下,清除该步骤之前输出的"计划/思考"文本 + if not settings.AI_AGENT_VERBOSE and has_emitted_in_step: + self.stream_handler.reset() + stripper.reset() + has_emitted_in_step = False + continue + + # 以下处理纯文本token(tool_call_chunks为空) + + # 检测步骤变化,重置步骤内emit跟踪 + if step != current_model_step: + current_model_step = step + has_emitted_in_step = False + + # 跳过模型思考/推理内容(如 DeepSeek R1 的 reasoning_content) + additional = getattr(token, "additional_kwargs", None) + if additional and additional.get("reasoning_content"): + continue + + if token.content: + # content 可能是字符串或内容块列表,过滤掉思考类型的块 + content = self._extract_text_content(token.content) + if content: + if stripper.process(content, on_token): + has_emitted_in_step = True + + stripper.flush(on_token) async def _execute_agent(self, messages: List[BaseMessage]): """