feat: 同一会话消息排队顺序处理,不同会话互不影响

This commit is contained in:
jxxghp
2026-03-25 22:01:35 +08:00
parent c822782910
commit a6f16dcf8f

View File

@@ -2,7 +2,8 @@ import asyncio
import traceback
import uuid
from time import strftime
from typing import Callable, Dict, List
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional
from langchain.agents import create_agent
from langchain.agents.middleware import (
@@ -321,13 +322,32 @@ class MoviePilotAgent:
logger.info(f"MoviePilot智能体已清理: session_id={self.session_id}")
@dataclass
class _MessageTask:
"""
待处理的消息任务
"""
session_id: str
user_id: str
message: str
channel: Optional[str] = None
source: Optional[str] = None
username: Optional[str] = None
class AgentManager:
"""
AI智能体管理器
同一会话的消息按顺序排队处理,不同会话之间互不影响。
"""
def __init__(self):
self.active_agents: Dict[str, MoviePilotAgent] = {}
# 每个会话的消息队列
self._session_queues: Dict[str, asyncio.Queue] = {}
# 每个会话的worker任务
self._session_workers: Dict[str, asyncio.Task] = {}
@staticmethod
async def initialize():
@@ -341,6 +361,17 @@ class AgentManager:
关闭管理器
"""
await memory_manager.close()
# 取消所有会话worker
for task in self._session_workers.values():
task.cancel()
# 等待所有worker结束
for session_id, task in self._session_workers.items():
try:
await task
except asyncio.CancelledError:
pass
self._session_workers.clear()
self._session_queues.clear()
for agent in self.active_agents.values():
await agent.cleanup()
self.active_agents.clear()
@@ -355,36 +386,133 @@ class AgentManager:
username: str = None,
) -> str:
"""
处理用户消息
处理用户消息:将消息放入会话队列,按顺序依次处理。
同一会话的消息排队等待,不同会话之间互不影响。
"""
task = _MessageTask(
session_id=session_id,
user_id=user_id,
message=message,
channel=channel,
source=source,
username=username,
)
# 获取或创建会话队列
if session_id not in self._session_queues:
self._session_queues[session_id] = asyncio.Queue()
queue = self._session_queues[session_id]
queue_size = queue.qsize()
# 如果队列中已有等待的消息,通知用户消息已排队
if queue_size > 0 or (
session_id in self._session_workers
and not self._session_workers[session_id].done()
):
logger.info(
f"会话 {session_id} 有任务正在处理,消息已排队等待 "
f"(队列中待处理: {queue_size} 条)"
)
# 放入队列
await queue.put(task)
# 确保该会话有一个worker在运行
if (
session_id not in self._session_workers
or self._session_workers[session_id].done()
):
self._session_workers[session_id] = asyncio.create_task(
self._session_worker(session_id)
)
return ""
async def _session_worker(self, session_id: str):
"""
会话消息处理worker从队列中逐条取出消息并处理。
处理完当前消息后才会处理下一条,确保同一会话的消息顺序执行。
"""
queue = self._session_queues.get(session_id)
if not queue:
return
try:
while True:
try:
# 等待消息超时后自动退出worker
task = await asyncio.wait_for(queue.get(), timeout=60.0)
except asyncio.TimeoutError:
# 队列空闲超时退出worker
logger.debug(f"会话 {session_id} 的消息队列空闲worker退出")
break
try:
await self._process_message_internal(task)
except Exception as e:
logger.error(f"处理会话 {session_id} 的消息失败: {e}")
finally:
queue.task_done()
except asyncio.CancelledError:
logger.info(f"会话 {session_id} 的worker被取消")
finally:
# 清理已完成的worker记录
self._session_workers.pop(session_id, None)
# 如果队列为空,清理队列
if (
session_id in self._session_queues
and self._session_queues[session_id].empty()
):
self._session_queues.pop(session_id, None)
async def _process_message_internal(self, task: _MessageTask):
"""
实际处理单条消息
"""
session_id = task.session_id
if session_id not in self.active_agents:
logger.info(
f"创建新的AI智能体实例session_id: {session_id}, user_id: {user_id}"
f"创建新的AI智能体实例session_id: {session_id}, user_id: {task.user_id}"
)
agent = MoviePilotAgent(
session_id=session_id,
user_id=user_id,
channel=channel,
source=source,
username=username,
user_id=task.user_id,
channel=task.channel,
source=task.source,
username=task.username,
)
self.active_agents[session_id] = agent
else:
agent = self.active_agents[session_id]
agent.user_id = user_id
if channel:
agent.channel = channel
if source:
agent.source = source
if username:
agent.username = username
agent.user_id = task.user_id
if task.channel:
agent.channel = task.channel
if task.source:
agent.source = task.source
if task.username:
agent.username = task.username
return await agent.process(message)
return await agent.process(task.message)
async def clear_session(self, session_id: str, user_id: str):
"""
清空会话
"""
# 取消该会话的worker
if session_id in self._session_workers:
self._session_workers[session_id].cancel()
try:
await self._session_workers[session_id]
except asyncio.CancelledError:
pass
self._session_workers.pop(session_id, None)
# 清理队列
self._session_queues.pop(session_id, None)
# 清理agent
if session_id in self.active_agents:
agent = self.active_agents[session_id]
await agent.cleanup()
@@ -429,6 +557,17 @@ class AgentManager:
username=settings.SUPERUSER,
)
# 等待消息队列处理完成
if session_id in self._session_queues:
await self._session_queues[session_id].join()
# 等待worker结束
if session_id in self._session_workers:
try:
await self._session_workers[session_id]
except asyncio.CancelledError:
pass
logger.info("智能体心跳唤醒:任务检查完成")
# 心跳会话用完即弃,清理资源