diff --git a/app/agent/__init__.py b/app/agent/__init__.py index 6223d9de..03967a57 100644 --- a/app/agent/__init__.py +++ b/app/agent/__init__.py @@ -2,7 +2,8 @@ import asyncio import traceback import uuid from time import strftime -from typing import Callable, Dict, List +from dataclasses import dataclass +from typing import Callable, Dict, List, Optional from langchain.agents import create_agent from langchain.agents.middleware import ( @@ -321,13 +322,32 @@ class MoviePilotAgent: logger.info(f"MoviePilot智能体已清理: session_id={self.session_id}") +@dataclass +class _MessageTask: + """ + 待处理的消息任务 + """ + + session_id: str + user_id: str + message: str + channel: Optional[str] = None + source: Optional[str] = None + username: Optional[str] = None + + class AgentManager: """ AI智能体管理器 + 同一会话的消息按顺序排队处理,不同会话之间互不影响。 """ def __init__(self): self.active_agents: Dict[str, MoviePilotAgent] = {} + # 每个会话的消息队列 + self._session_queues: Dict[str, asyncio.Queue] = {} + # 每个会话的worker任务 + self._session_workers: Dict[str, asyncio.Task] = {} @staticmethod async def initialize(): @@ -341,6 +361,17 @@ class AgentManager: 关闭管理器 """ await memory_manager.close() + # 取消所有会话worker + for task in self._session_workers.values(): + task.cancel() + # 等待所有worker结束 + for session_id, task in self._session_workers.items(): + try: + await task + except asyncio.CancelledError: + pass + self._session_workers.clear() + self._session_queues.clear() for agent in self.active_agents.values(): await agent.cleanup() self.active_agents.clear() @@ -355,36 +386,133 @@ class AgentManager: username: str = None, ) -> str: """ - 处理用户消息 + 处理用户消息:将消息放入会话队列,按顺序依次处理。 + 同一会话的消息排队等待,不同会话之间互不影响。 """ + task = _MessageTask( + session_id=session_id, + user_id=user_id, + message=message, + channel=channel, + source=source, + username=username, + ) + + # 获取或创建会话队列 + if session_id not in self._session_queues: + self._session_queues[session_id] = asyncio.Queue() + + queue = self._session_queues[session_id] + queue_size = queue.qsize() + + # 如果队列中已有等待的消息,通知用户消息已排队 + if queue_size > 0 or ( + session_id in self._session_workers + and not self._session_workers[session_id].done() + ): + logger.info( + f"会话 {session_id} 有任务正在处理,消息已排队等待 " + f"(队列中待处理: {queue_size} 条)" + ) + + # 放入队列 + await queue.put(task) + + # 确保该会话有一个worker在运行 + if ( + session_id not in self._session_workers + or self._session_workers[session_id].done() + ): + self._session_workers[session_id] = asyncio.create_task( + self._session_worker(session_id) + ) + + return "" + + async def _session_worker(self, session_id: str): + """ + 会话消息处理worker:从队列中逐条取出消息并处理。 + 处理完当前消息后才会处理下一条,确保同一会话的消息顺序执行。 + """ + queue = self._session_queues.get(session_id) + if not queue: + return + + try: + while True: + try: + # 等待消息,超时后自动退出worker + task = await asyncio.wait_for(queue.get(), timeout=60.0) + except asyncio.TimeoutError: + # 队列空闲超时,退出worker + logger.debug(f"会话 {session_id} 的消息队列空闲,worker退出") + break + + try: + await self._process_message_internal(task) + except Exception as e: + logger.error(f"处理会话 {session_id} 的消息失败: {e}") + finally: + queue.task_done() + + except asyncio.CancelledError: + logger.info(f"会话 {session_id} 的worker被取消") + finally: + # 清理已完成的worker记录 + self._session_workers.pop(session_id, None) + # 如果队列为空,清理队列 + if ( + session_id in self._session_queues + and self._session_queues[session_id].empty() + ): + self._session_queues.pop(session_id, None) + + async def _process_message_internal(self, task: _MessageTask): + """ + 实际处理单条消息 + """ + session_id = task.session_id if session_id not in self.active_agents: logger.info( - f"创建新的AI智能体实例,session_id: {session_id}, user_id: {user_id}" + f"创建新的AI智能体实例,session_id: {session_id}, user_id: {task.user_id}" ) agent = MoviePilotAgent( session_id=session_id, - user_id=user_id, - channel=channel, - source=source, - username=username, + user_id=task.user_id, + channel=task.channel, + source=task.source, + username=task.username, ) self.active_agents[session_id] = agent else: agent = self.active_agents[session_id] - agent.user_id = user_id - if channel: - agent.channel = channel - if source: - agent.source = source - if username: - agent.username = username + agent.user_id = task.user_id + if task.channel: + agent.channel = task.channel + if task.source: + agent.source = task.source + if task.username: + agent.username = task.username - return await agent.process(message) + return await agent.process(task.message) async def clear_session(self, session_id: str, user_id: str): """ 清空会话 """ + # 取消该会话的worker + if session_id in self._session_workers: + self._session_workers[session_id].cancel() + try: + await self._session_workers[session_id] + except asyncio.CancelledError: + pass + self._session_workers.pop(session_id, None) + + # 清理队列 + self._session_queues.pop(session_id, None) + + # 清理agent if session_id in self.active_agents: agent = self.active_agents[session_id] await agent.cleanup() @@ -429,6 +557,17 @@ class AgentManager: username=settings.SUPERUSER, ) + # 等待消息队列处理完成 + if session_id in self._session_queues: + await self._session_queues[session_id].join() + + # 等待worker结束 + if session_id in self._session_workers: + try: + await self._session_workers[session_id] + except asyncio.CancelledError: + pass + logger.info("智能体心跳唤醒:任务检查完成") # 心跳会话用完即弃,清理资源