From 244112be5c2f637fe5289ee77bc56a22dff56de9 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 2 Apr 2026 19:23:40 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=99=BA=E8=83=BD=E4=BD=93=E5=94=A4?= =?UTF-8?q?=E9=86=92=E5=90=8E=E6=B6=88=E6=81=AF=E5=8F=91=E9=80=81=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/agent/__init__.py | 74 +++++++++++++++++++++++-------------------- app/chain/message.py | 2 +- 2 files changed, 40 insertions(+), 36 deletions(-) diff --git a/app/agent/__init__.py b/app/agent/__init__.py index b50dd400..70f7e4c0 100644 --- a/app/agent/__init__.py +++ b/app/agent/__init__.py @@ -42,12 +42,12 @@ class MoviePilotAgent: """ def __init__( - self, - session_id: str, - user_id: str = None, - channel: str = None, - source: str = None, - username: str = None, + self, + session_id: str, + user_id: str = None, + channel: str = None, + source: str = None, + username: str = None, ): self.session_id = session_id self.user_id = user_id @@ -92,10 +92,10 @@ class MoviePilotAgent: if block.get("thought"): continue if block.get("type") in ( - "thinking", - "reasoning_content", - "reasoning", - "thought", + "thinking", + "reasoning_content", + "reasoning", + "thought", ): continue if block.get("type") == "text": @@ -209,7 +209,7 @@ class MoviePilotAgent: return error_message async def _stream_agent_tokens( - self, agent, messages: dict, config: dict, on_token: Callable[[str], None] + self, agent, messages: dict, config: dict, on_token: Callable[[str], None] ): """ 流式运行智能体,过滤工具调用token和思考内容,将模型生成的内容通过回调输出。 @@ -222,18 +222,18 @@ class MoviePilotAgent: buffer = "" async for chunk in agent.astream( - messages, - stream_mode="messages", - config=config, - subgraphs=False, - version="v2", + messages, + stream_mode="messages", + config=config, + subgraphs=False, + version="v2", ): if chunk["type"] == "messages": token, metadata = chunk["data"] if ( - token - and hasattr(token, "tool_call_chunks") - and not token.tool_call_chunks + token + and hasattr(token, "tool_call_chunks") + and not token.tool_call_chunks ): # 跳过模型思考/推理内容(如 DeepSeek R1 的 reasoning_content) additional = getattr(token, "additional_kwargs", None) @@ -384,12 +384,16 @@ class MoviePilotAgent: """ 通过原渠道发送消息给用户 """ + user_id = self.user_id + if self.user_id == "system": + user_id = None + await AgentChain().async_post_message( Notification( channel=self.channel, source=self.source, mtype=NotificationType.Agent, - userid=self.user_id, + userid=user_id, username=self.username, title=title, text=message, @@ -479,14 +483,14 @@ class AgentManager: self.active_agents.clear() async def process_message( - self, - session_id: str, - user_id: str, - message: str, - images: List[str] = None, - channel: str = None, - source: str = None, - username: str = None, + self, + session_id: str, + user_id: str, + message: str, + images: List[str] = None, + channel: str = None, + source: str = None, + username: str = None, ) -> str: """ 处理用户消息:将消息放入会话队列,按顺序依次处理。 @@ -511,8 +515,8 @@ class AgentManager: # 如果队列中已有等待的消息,通知用户消息已排队 if queue_size > 0 or ( - session_id in self._session_workers - and not self._session_workers[session_id].done() + session_id in self._session_workers + and not self._session_workers[session_id].done() ): logger.info( f"会话 {session_id} 有任务正在处理,消息已排队等待 " @@ -524,8 +528,8 @@ class AgentManager: # 确保该会话有一个worker在运行 if ( - session_id not in self._session_workers - or self._session_workers[session_id].done() + session_id not in self._session_workers + or self._session_workers[session_id].done() ): self._session_workers[session_id] = asyncio.create_task( self._session_worker(session_id) @@ -566,8 +570,8 @@ class AgentManager: self._session_workers.pop(session_id, None) # noqa # 如果队列为空,清理队列 if ( - session_id in self._session_queues - and self._session_queues[session_id].empty() + session_id in self._session_queues + and self._session_queues[session_id].empty() ): self._session_queues.pop(session_id, None) @@ -632,7 +636,7 @@ class AgentManager: try: # 每次使用唯一的 session_id,避免共享上下文 session_id = f"__agent_heartbeat_{uuid.uuid4().hex[:12]}__" - user_id = settings.SUPERUSER + user_id = "system" logger.info("智能体心跳唤醒:开始检查待处理任务...") diff --git a/app/chain/message.py b/app/chain/message.py index 2900972f..7c755621 100644 --- a/app/chain/message.py +++ b/app/chain/message.py @@ -580,7 +580,7 @@ class MessageChain(ChainBase): total = len(cache_list) # 加一页 cache_list = cache_list[ - (_current_page + 1) * self._page_size : (_current_page + 2) + (_current_page + 1) * self._page_size: (_current_page + 2) * self._page_size ] if not cache_list: