feat: 为工具管理器添加参数类型规范化处理,并基于渠道能力动态生成提示词中的格式要求

This commit is contained in:
jxxghp
2026-01-15 20:55:35 +08:00
parent 1bf9862e47
commit 1641d432dd
10 changed files with 324 additions and 282 deletions

View File

@@ -12,7 +12,9 @@ from app.schemas.agent import ConversationMemory
class ConversationMemoryManager:
"""对话记忆管理器"""
"""
对话记忆管理器
"""
def __init__(self):
# 内存中的会话记忆缓存
@@ -23,7 +25,9 @@ class ConversationMemoryManager:
self.cleanup_task: Optional[asyncio.Task] = None
async def initialize(self):
"""初始化记忆管理器"""
"""
初始化记忆管理器
"""
try:
# 启动内存缓存清理任务Redis通过TTL自动过期
self.cleanup_task = asyncio.create_task(self._cleanup_expired_memories())
@@ -33,7 +37,9 @@ class ConversationMemoryManager:
logger.warning(f"Redis连接失败将使用内存存储: {e}")
async def close(self):
"""关闭记忆管理器"""
"""
关闭记忆管理器
"""
if self.cleanup_task:
self.cleanup_task.cancel()
try:
@@ -46,56 +52,83 @@ class ConversationMemoryManager:
logger.info("对话记忆管理器已关闭")
@staticmethod
def get_memory_key(session_id: str, user_id: str):
"""计算内存Key"""
def _get_memory_key(session_id: str, user_id: str):
"""
计算内存Key
"""
return f"{user_id}:{session_id}" if user_id else session_id
@staticmethod
def get_redis_key(session_id: str, user_id: str):
"""计算Redis Key"""
def _get_redis_key(session_id: str, user_id: str):
"""
计算Redis Key
"""
return f"agent_memory:{user_id}:{session_id}" if user_id else f"agent_memory:{session_id}"
async def get_memory(self, session_id: str, user_id: str) -> ConversationMemory:
"""获取会话记忆"""
# 首先检查缓存
cache_key = self.get_memory_key(session_id, user_id)
if cache_key in self.memory_cache:
return self.memory_cache[cache_key]
# 尝试从Redis加载
def _get_memory(self, session_id: str, user_id: str):
"""
获取内存中的记忆
"""
cache_key = self._get_memory_key(session_id, user_id)
return self.memory_cache.get(cache_key)
async def _get_redis(self, session_id: str, user_id: str) -> Optional[ConversationMemory]:
"""
从Redis获取记忆
"""
if settings.CACHE_BACKEND_TYPE == "redis":
try:
redis_key = self.get_redis_key(session_id, user_id)
redis_key = self._get_redis_key(session_id, user_id)
memory_data = await self.redis_helper.get(redis_key, region="AI_AGENT")
if memory_data:
memory_dict = json.loads(memory_data) if isinstance(memory_data, str) else memory_data
memory = ConversationMemory(**memory_dict)
self.memory_cache[cache_key] = memory
return memory
except Exception as e:
logger.warning(f"从Redis加载记忆失败: {e}")
return None
async def get_conversation(self, session_id: str, user_id: str) -> ConversationMemory:
"""
获取会话记忆
"""
# 首先检查缓存
conversion = self._get_memory(session_id, user_id)
if conversion:
return conversion
# 尝试从Redis加载
memory = await self._get_redis(session_id, user_id)
if memory:
# 加载到内存缓存
self._save_memory(memory)
return memory
# 创建新的记忆
memory = ConversationMemory(session_id=session_id, user_id=user_id)
self.memory_cache[cache_key] = memory
await self._save_memory(memory)
await self._save_conversation(memory)
return memory
async def set_title(self, session_id: str, user_id: str, title: str):
"""设置会话标题"""
memory = await self.get_memory(session_id=session_id, user_id=user_id)
"""
设置会话标题
"""
memory = await self.get_conversation(session_id=session_id, user_id=user_id)
memory.title = title
memory.updated_at = datetime.now()
await self._save_memory(memory)
await self._save_conversation(memory)
async def get_title(self, session_id: str, user_id: str) -> Optional[str]:
"""获取会话标题"""
memory = await self.get_memory(session_id=session_id, user_id=user_id)
"""
获取会话标题
"""
memory = await self.get_conversation(session_id=session_id, user_id=user_id)
return memory.title
async def list_sessions(self, user_id: str, limit: int = 100) -> List[Dict[str, Any]]:
"""列出历史会话摘要(按更新时间倒序)
"""
列出历史会话摘要(按更新时间倒序)
- 当启用Redis时遍历 `agent_memory:*` 键并读取摘要
- 当未启用Redis时基于内存缓存返回
@@ -148,7 +181,7 @@ class ConversationMemoryManager:
for m in sorted_list
]
async def add_memory(
async def add_conversation(
self,
session_id: str,
user_id: str,
@@ -156,8 +189,10 @@ class ConversationMemoryManager:
content: str,
metadata: Optional[Dict[str, Any]] = None
):
"""添加消息到记忆"""
memory = await self.get_memory(session_id=session_id, user_id=user_id)
"""
添加消息到记忆
"""
memory = await self.get_conversation(session_id=session_id, user_id=user_id)
message = {
"role": role,
@@ -177,7 +212,7 @@ class ConversationMemoryManager:
recent_messages = memory.messages[-(max_messages - len(system_messages)):]
memory.messages = system_messages + recent_messages
await self._save_memory(memory)
await self._save_conversation(memory)
logger.debug(f"消息已添加到记忆: session_id={session_id}, user_id={user_id}, role={role}")
@@ -186,11 +221,12 @@ class ConversationMemoryManager:
session_id: str,
user_id: str
) -> List[Dict[str, Any]]:
"""为Agent获取最近的消息仅内存缓存
"""
为Agent获取最近的消息仅内存缓存
如果消息Token数量超过模型最大上下文长度的阀值会自动进行摘要裁剪
"""
cache_key = self.get_memory_key(session_id, user_id)
cache_key = self._get_memory_key(session_id, user_id)
memory = self.memory_cache.get(cache_key)
if not memory:
return []
@@ -205,8 +241,10 @@ class ConversationMemoryManager:
limit: int = 10,
role_filter: Optional[list] = None
) -> List[Dict[str, Any]]:
"""获取最近的消息"""
memory = await self.get_memory(session_id=session_id, user_id=user_id)
"""
获取最近的消息
"""
memory = await self.get_conversation(session_id=session_id, user_id=user_id)
messages = memory.messages
if role_filter:
@@ -215,36 +253,41 @@ class ConversationMemoryManager:
return messages[-limit:] if messages else []
async def get_context(self, session_id: str, user_id: str) -> Dict[str, Any]:
"""获取会话上下文"""
memory = await self.get_memory(session_id=session_id, user_id=user_id)
"""
获取会话上下文
"""
memory = await self.get_conversation(session_id=session_id, user_id=user_id)
return memory.context
async def clear_memory(self, session_id: str, user_id: str):
"""清空会话记忆"""
"""
清空会话记忆
"""
cache_key = f"{user_id}:{session_id}" if user_id else session_id
if cache_key in self.memory_cache:
del self.memory_cache[cache_key]
if settings.CACHE_BACKEND_TYPE == "redis":
redis_key = self.get_redis_key(session_id, user_id)
redis_key = self._get_redis_key(session_id, user_id)
await self.redis_helper.delete(redis_key, region="AI_AGENT")
logger.info(f"会话记忆已清空: session_id={session_id}, user_id={user_id}")
async def _save_memory(self, memory: ConversationMemory):
"""保存记忆到存储
Redis中的记忆会自动通过TTL机制过期无需手动清理
def _save_memory(self, memory: ConversationMemory):
"""
# 更新内存缓存
cache_key = self.get_memory_key(memory.session_id, memory.user_id)
保存记忆到内存
"""
cache_key = self._get_memory_key(memory.session_id, memory.user_id)
self.memory_cache[cache_key] = memory
# 保存到Redis设置TTL自动过期
async def _save_redis(self, memory: ConversationMemory):
"""
保存记忆到Redis
"""
if settings.CACHE_BACKEND_TYPE == "redis":
try:
memory_dict = memory.model_dump()
redis_key = self.get_redis_key(memory.session_id, memory.user_id)
redis_key = self._get_redis_key(memory.session_id, memory.user_id)
ttl = int(timedelta(days=settings.LLM_REDIS_MEMORY_RETENTION_DAYS).total_seconds())
await self.redis_helper.set(
redis_key,
@@ -255,8 +298,22 @@ class ConversationMemoryManager:
except Exception as e:
logger.warning(f"保存记忆到Redis失败: {e}")
async def _save_conversation(self, memory: ConversationMemory):
"""
保存记忆到存储
Redis中的记忆会自动通过TTL机制过期无需手动清理
"""
# 更新内存缓存
self._save_memory(memory)
# 保存到Redis设置TTL自动过期
await self._save_redis(memory)
async def _cleanup_expired_memories(self):
"""清理内存中过期记忆的后台任务
"""
清理内存中过期记忆的后台任务
注意Redis中的记忆通过TTL机制自动过期这里只清理内存缓存
"""
@@ -286,3 +343,5 @@ class ConversationMemoryManager:
break
except Exception as e:
logger.error(f"清理记忆时发生错误: {e}")
conversation_manager = ConversationMemoryManager()