mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-05-08 22:33:16 +08:00
fix(agent): 基于langgraph_step过滤中间步骤思考文本,抽离ThinkTagStripper类
- 利用metadata中的langgraph_step检测工具调用前的中间步骤,非VERBOSE模式下 自动reset清除模型输出的计划/推理文本(如NEXT STEPS、tool call描述等) - 将<think>标签流式剥离逻辑抽离为独立的_ThinkTagStripper类,简化主流程
This commit is contained in:
@@ -36,6 +36,79 @@ class AgentChain(ChainBase):
|
||||
pass
|
||||
|
||||
|
||||
class _ThinkTagStripper:
|
||||
"""
|
||||
流式剥离 <think>...</think> 标签的辅助类。
|
||||
维护内部缓冲区,处理标签跨 token 边界被截断的情况。
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.buffer = ""
|
||||
self.in_think_tag = False
|
||||
|
||||
def reset(self):
|
||||
"""重置状态"""
|
||||
self.buffer = ""
|
||||
self.in_think_tag = False
|
||||
|
||||
def process(self, text: str, on_output: Callable[[str], None]):
|
||||
"""
|
||||
将新文本送入处理,剥离 <think> 标签后通过 on_output 回调输出。
|
||||
:param text: 新增的文本片段
|
||||
:param on_output: 输出回调,接收过滤后的文本
|
||||
:return: 本次调用是否通过 on_output 输出了内容
|
||||
"""
|
||||
self.buffer += text
|
||||
emitted = False
|
||||
while self.buffer:
|
||||
if not self.in_think_tag:
|
||||
start_idx = self.buffer.find("<think>")
|
||||
if start_idx != -1:
|
||||
if start_idx > 0:
|
||||
on_output(self.buffer[:start_idx])
|
||||
emitted = True
|
||||
self.in_think_tag = True
|
||||
self.buffer = self.buffer[start_idx + 7:]
|
||||
else:
|
||||
# 检查是否以 <think> 的不完整前缀结尾
|
||||
partial_match = False
|
||||
for i in range(6, 0, -1):
|
||||
if self.buffer.endswith("<think>"[:i]):
|
||||
if len(self.buffer) > i:
|
||||
on_output(self.buffer[:-i])
|
||||
emitted = True
|
||||
self.buffer = self.buffer[-i:]
|
||||
partial_match = True
|
||||
break
|
||||
if not partial_match:
|
||||
on_output(self.buffer)
|
||||
emitted = True
|
||||
self.buffer = ""
|
||||
else:
|
||||
end_idx = self.buffer.find("</think>")
|
||||
if end_idx != -1:
|
||||
self.in_think_tag = False
|
||||
self.buffer = self.buffer[end_idx + 8:]
|
||||
else:
|
||||
# 检查是否以 </think> 的不完整前缀结尾
|
||||
partial_match = False
|
||||
for i in range(7, 0, -1):
|
||||
if self.buffer.endswith("</think>"[:i]):
|
||||
self.buffer = self.buffer[-i:]
|
||||
partial_match = True
|
||||
break
|
||||
if not partial_match:
|
||||
self.buffer = ""
|
||||
break
|
||||
return emitted
|
||||
|
||||
def flush(self, on_output: Callable[[str], None]):
|
||||
"""流式结束时,输出缓冲区中剩余的非思考内容"""
|
||||
if self.buffer and not self.in_think_tag:
|
||||
on_output(self.buffer)
|
||||
self.buffer = ""
|
||||
|
||||
|
||||
class MoviePilotAgent:
|
||||
"""
|
||||
MoviePilot AI智能体(基于 LangChain v1 + LangGraph)
|
||||
@@ -218,8 +291,11 @@ class MoviePilotAgent:
|
||||
:param config: Agent 运行配置
|
||||
:param on_token: 收到有效 token 时的回调
|
||||
"""
|
||||
in_think_tag = False
|
||||
buffer = ""
|
||||
stripper = _ThinkTagStripper()
|
||||
# 非VERBOSE模式下,跟踪当前langgraph_step以检测中间步骤的模型输出
|
||||
# 当模型在工具调用之前输出的"计划/思考"文本,会在检测到tool_call时被清除
|
||||
current_model_step = -1
|
||||
has_emitted_in_step = False
|
||||
|
||||
async for chunk in agent.astream(
|
||||
messages,
|
||||
@@ -230,59 +306,41 @@ class MoviePilotAgent:
|
||||
):
|
||||
if chunk["type"] == "messages":
|
||||
token, metadata = chunk["data"]
|
||||
if (
|
||||
token
|
||||
and hasattr(token, "tool_call_chunks")
|
||||
and not token.tool_call_chunks
|
||||
):
|
||||
# 跳过模型思考/推理内容(如 DeepSeek R1 的 reasoning_content)
|
||||
additional = getattr(token, "additional_kwargs", None)
|
||||
if additional and additional.get("reasoning_content"):
|
||||
continue
|
||||
if token.content:
|
||||
# content 可能是字符串或内容块列表,过滤掉思考类型的块
|
||||
content = self._extract_text_content(token.content)
|
||||
if content:
|
||||
buffer += content
|
||||
while buffer:
|
||||
if not in_think_tag:
|
||||
start_idx = buffer.find("<think>")
|
||||
if start_idx != -1:
|
||||
if start_idx > 0:
|
||||
on_token(buffer[:start_idx])
|
||||
in_think_tag = True
|
||||
buffer = buffer[start_idx + 7:]
|
||||
else:
|
||||
# 检查是否以 <think> 的前缀结尾
|
||||
partial_match = False
|
||||
for i in range(6, 0, -1):
|
||||
if buffer.endswith("<think>"[:i]):
|
||||
if len(buffer) > i:
|
||||
on_token(buffer[:-i])
|
||||
buffer = buffer[-i:]
|
||||
partial_match = True
|
||||
break
|
||||
if not partial_match:
|
||||
on_token(buffer)
|
||||
buffer = ""
|
||||
else:
|
||||
end_idx = buffer.find("</think>")
|
||||
if end_idx != -1:
|
||||
in_think_tag = False
|
||||
buffer = buffer[end_idx + 8:]
|
||||
else:
|
||||
# 检查是否以 </think> 的前缀结尾
|
||||
partial_match = False
|
||||
for i in range(7, 0, -1):
|
||||
if buffer.endswith("</think>"[:i]):
|
||||
buffer = buffer[-i:]
|
||||
partial_match = True
|
||||
break
|
||||
if not partial_match:
|
||||
buffer = ""
|
||||
if not token or not hasattr(token, "tool_call_chunks"):
|
||||
continue
|
||||
|
||||
if buffer and not in_think_tag:
|
||||
on_token(buffer)
|
||||
# 获取当前步骤信息
|
||||
step = metadata.get("langgraph_step", -1) if metadata else -1
|
||||
|
||||
if token.tool_call_chunks:
|
||||
# 检测到工具调用token:说明当前步骤是中间步骤
|
||||
# 非VERBOSE模式下,清除该步骤之前输出的"计划/思考"文本
|
||||
if not settings.AI_AGENT_VERBOSE and has_emitted_in_step:
|
||||
self.stream_handler.reset()
|
||||
stripper.reset()
|
||||
has_emitted_in_step = False
|
||||
continue
|
||||
|
||||
# 以下处理纯文本token(tool_call_chunks为空)
|
||||
|
||||
# 检测步骤变化,重置步骤内emit跟踪
|
||||
if step != current_model_step:
|
||||
current_model_step = step
|
||||
has_emitted_in_step = False
|
||||
|
||||
# 跳过模型思考/推理内容(如 DeepSeek R1 的 reasoning_content)
|
||||
additional = getattr(token, "additional_kwargs", None)
|
||||
if additional and additional.get("reasoning_content"):
|
||||
continue
|
||||
|
||||
if token.content:
|
||||
# content 可能是字符串或内容块列表,过滤掉思考类型的块
|
||||
content = self._extract_text_content(token.content)
|
||||
if content:
|
||||
if stripper.process(content, on_token):
|
||||
has_emitted_in_step = True
|
||||
|
||||
stripper.flush(on_token)
|
||||
|
||||
async def _execute_agent(self, messages: List[BaseMessage]):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user