refactor(mcp): 精简会话管理逻辑并更新API文档

This commit is contained in:
PKC278
2025-12-23 17:06:17 +08:00
parent 7b99f09810
commit 2de83c44ab
2 changed files with 50 additions and 155 deletions

View File

@@ -2,7 +2,6 @@
通过HTTP API暴露MoviePilot的智能体工具功能
"""
import uuid
from typing import List, Any, Dict, Annotated, Optional, Union
from fastapi import APIRouter, Depends, HTTPException, Request, Header
@@ -25,64 +24,18 @@ router = APIRouter()
MCP_PROTOCOL_VERSIONS = ["2025-11-25", "2025-06-18", "2024-11-05"]
MCP_PROTOCOL_VERSION = MCP_PROTOCOL_VERSIONS[0] # 默认使用最新版本
# 全局会话管理器
_sessions: Dict[str, Dict[str, Any]] = {}
# 全局工具管理器实例单例模式按用户ID缓存
_tools_managers: Dict[str, MoviePilotToolsManager] = {}
def get_tools_manager(user_id: str = "mcp_user", session_id: str = "mcp_session") -> MoviePilotToolsManager:
def get_tools_manager() -> MoviePilotToolsManager:
"""
获取工具管理器实例按用户ID缓存
获取工具管理器实例
Args:
user_id: 用户ID
session_id: 会话ID
Returns:
MoviePilotToolsManager实例
"""
global _tools_managers
# 使用用户ID作为缓存键
cache_key = f"{user_id}_{session_id}"
if cache_key not in _tools_managers:
_tools_managers[cache_key] = MoviePilotToolsManager(
user_id=user_id,
session_id=session_id
)
return _tools_managers[cache_key]
def get_session(session_id: Optional[str]) -> Optional[Dict[str, Any]]:
"""获取会话"""
if not session_id:
return None
return _sessions.get(session_id)
def create_session(user_id: str) -> Dict[str, Any]:
"""创建新会话"""
session_id = str(uuid.uuid4())
session = {
"id": session_id,
"user_id": user_id,
"initialized": False,
"protocol_version": None,
"capabilities": {}
}
_sessions[session_id] = session
return session
def delete_session(session_id: str):
"""删除会话"""
if session_id in _sessions:
del _sessions[session_id]
# 同时清理工具管理器缓存
cache_key = f"{_sessions.get(session_id, {}).get('user_id', 'mcp_user')}_{session_id}"
if cache_key in _tools_managers:
del _tools_managers[cache_key]
return MoviePilotToolsManager()
def create_jsonrpc_response(request_id: Union[str, int, None], result: Any) -> Dict[str, Any]:
@@ -112,13 +65,11 @@ def create_jsonrpc_error(request_id: Union[str, int, None], code: int, message:
# ==================== MCP JSON-RPC 端点 ====================
@router.post("", summary="MCP JSON-RPC 端点")
@router.post("", summary="MCP JSON-RPC 端点", response_model=None)
async def mcp_jsonrpc(
request: Request,
mcp_session_id: Optional[str] = Header(None, alias="MCP-Session-Id"),
mcp_protocol_version: Optional[str] = Header(None, alias="MCP-Protocol-Version"),
_: Annotated[str, Depends(verify_apikey)] = None
) -> JSONResponse:
) -> Union[JSONResponse, Response]:
"""
MCP 标准 JSON-RPC 2.0 端点
@@ -150,48 +101,27 @@ async def mcp_jsonrpc(
try:
# 处理初始化请求
if method == "initialize":
result = await handle_initialize(params, mcp_session_id)
response = create_jsonrpc_response(request_id, result["result"])
# 如果创建了新会话,在响应头中返回
if "session_id" in result:
headers = {"MCP-Session-Id": result["session_id"]}
return JSONResponse(content=response, headers=headers)
return JSONResponse(content=response)
result = await handle_initialize(params)
return JSONResponse(content=create_jsonrpc_response(request_id, result))
# 处理已初始化通知
elif method == "notifications/initialized":
if is_notification:
session = get_session(mcp_session_id)
if session:
session["initialized"] = True
# 通知不需要响应
return Response(status_code=202)
return Response(status_code=204)
else:
return JSONResponse(
content=create_jsonrpc_error(request_id, -32600, "initialized must be a notification")
)
# 验证会话(除了 initialize 和 ping
if method not in ["initialize", "ping"]:
session = get_session(mcp_session_id)
if not session:
return JSONResponse(
status_code=404,
content=create_jsonrpc_error(request_id, -32002, "Session not found")
)
if not session.get("initialized") and method != "notifications/initialized":
return JSONResponse(
content=create_jsonrpc_error(request_id, -32003, "Not initialized")
status_code=400,
content={"error": "initialized must be a notification"}
)
# 处理工具列表请求
if method == "tools/list":
result = await handle_tools_list(mcp_session_id)
result = await handle_tools_list()
return JSONResponse(content=create_jsonrpc_response(request_id, result))
# 处理工具调用请求
elif method == "tools/call":
result = await handle_tools_call(params, mcp_session_id)
result = await handle_tools_call(params)
return JSONResponse(content=create_jsonrpc_response(request_id, result))
# 处理 ping 请求
@@ -204,6 +134,12 @@ async def mcp_jsonrpc(
content=create_jsonrpc_error(request_id, -32601, f"Method not found: {method}")
)
except ValueError as e:
logger.warning(f"MCP 请求参数错误: {e}")
return JSONResponse(
status_code=400,
content=create_jsonrpc_error(request_id, -32602, "Invalid params", str(e))
)
except Exception as e:
logger.error(f"处理 MCP 请求失败: {e}", exc_info=True)
return JSONResponse(
@@ -212,24 +148,13 @@ async def mcp_jsonrpc(
)
async def handle_initialize(params: Dict[str, Any], session_id: Optional[str]) -> Dict[str, Any]:
async def handle_initialize(params: Dict[str, Any]) -> Dict[str, Any]:
"""处理初始化请求"""
protocol_version = params.get("protocolVersion")
client_info = params.get("clientInfo", {})
client_capabilities = params.get("capabilities", {})
logger.info(f"MCP 初始化请求: 客户端={client_info.get('name')}, 协议版本={protocol_version}")
# 如果没有提供会话ID创建新会话
new_session = None
if not session_id:
new_session = create_session(user_id="mcp_user")
session_id = new_session["id"]
session = get_session(session_id) or new_session
if not session:
raise ValueError("Failed to create session")
# 版本协商:选择客户端和服务器都支持的版本
negotiated_version = MCP_PROTOCOL_VERSION
if protocol_version in MCP_PROTOCOL_VERSIONS:
@@ -240,40 +165,26 @@ async def handle_initialize(params: Dict[str, Any], session_id: Optional[str]) -
# 客户端版本不支持,使用服务器默认版本
logger.warning(f"协议版本不匹配: 客户端={protocol_version}, 使用服务器版本={negotiated_version}")
session["protocol_version"] = negotiated_version
session["capabilities"] = client_capabilities
result = {
"result": {
"protocolVersion": negotiated_version,
"capabilities": {
"tools": {
"listChanged": False # 暂不支持工具列表变更通知
},
"logging": {}
return {
"protocolVersion": negotiated_version,
"capabilities": {
"tools": {
"listChanged": False # 暂不支持工具列表变更通知
},
"serverInfo": {
"name": "MoviePilot",
"version": APP_VERSION,
"description": "MoviePilot MCP Server - 电影自动化管理工具",
},
"instructions": "MoviePilot MCP 服务器,提供媒体管理、订阅、下载等工具。使用 tools/list 查看所有可用工具。"
}
"logging": {}
},
"serverInfo": {
"name": "MoviePilot",
"version": APP_VERSION,
"description": "MoviePilot MCP Server - 电影自动化管理工具",
},
"instructions": "MoviePilot MCP 服务器,提供媒体管理、订阅、下载等工具。"
}
# 如果是新创建的会话返回会话ID
if new_session:
result["session_id"] = session_id
return result
async def handle_tools_list(session_id: Optional[str]) -> Dict[str, Any]:
async def handle_tools_list() -> Dict[str, Any]:
"""处理工具列表请求"""
session = get_session(session_id)
user_id = session.get("user_id", "mcp_user") if session else "mcp_user"
manager = get_tools_manager(user_id=user_id, session_id=session_id or "default")
manager = get_tools_manager()
tools = manager.list_tools()
# 转换为 MCP 工具格式
@@ -291,7 +202,7 @@ async def handle_tools_list(session_id: Optional[str]) -> Dict[str, Any]:
}
async def handle_tools_call(params: Dict[str, Any], session_id: Optional[str]) -> Dict[str, Any]:
async def handle_tools_call(params: Dict[str, Any]) -> Dict[str, Any]:
"""处理工具调用请求"""
tool_name = params.get("name")
arguments = params.get("arguments", {})
@@ -299,10 +210,7 @@ async def handle_tools_call(params: Dict[str, Any], session_id: Optional[str]) -
if not tool_name:
raise ValueError("Missing tool name")
session = get_session(session_id)
user_id = session.get("user_id", "mcp_user") if session else "mcp_user"
manager = get_tools_manager(user_id=user_id, session_id=session_id or "default")
manager = get_tools_manager()
try:
result_text = await manager.call_tool(tool_name, arguments)
@@ -328,26 +236,18 @@ async def handle_tools_call(params: Dict[str, Any], session_id: Optional[str]) -
}
@router.delete("", summary="终止 MCP 会话")
@router.delete("", summary="终止 MCP 会话", response_model=None)
async def delete_mcp_session(
mcp_session_id: Optional[str] = Header(None, alias="MCP-Session-Id"),
_: Annotated[str, Depends(verify_apikey)] = None
) -> JSONResponse:
) -> Union[JSONResponse, Response]:
"""
终止 MCP 会话(可选实现
客户端可以主动调用此接口终止会话
终止 MCP 会话(无状态模式下仅返回成功
"""
if not mcp_session_id:
return JSONResponse(
status_code=400,
content={"detail": "Missing MCP-Session-Id header"}
)
delete_session(mcp_session_id)
return Response(status_code=204)
# ==================== 兼容的 RESTful API 端点 ====================
@router.get("/tools", summary="列出所有可用工具", response_model=List[Dict[str, Any]])

View File

@@ -16,11 +16,12 @@ MoviePilot 实现了标准的 **Model Context Protocol (MCP)**,允许 AI 智
### 端点
**POST** `/api/v1/mcp`
## 3. 会话管理
* **会话维持**: 在标准 MCP 流程中,通过 HTTP Header `MCP-Session-Id` 识别会话
* **主动终止**:
**DELETE** `/api/v1/mcp` (携带 `MCP-Session-Id` Header)
### 支持的方法
- `initialize`: 初始化会话,协商协议版本和能力。
- `notifications/initialized`: 客户端确认初始化完成
- `tools/list`: 获取可用工具列表。
- `tools/call`: 调用特定工具。
- `ping`: 连接存活检测。
---
@@ -65,6 +66,7 @@ MoviePilot 实现了标准的 **Model Context Protocol (MCP)**,允许 AI 智
| -32700 | Parse error | JSON 格式错误 |
| -32600 | Invalid Request | 无效的 JSON-RPC 请求 |
| -32601 | Method not found | 方法不存在 |
| -32602 | Invalid params | 参数验证失败 |
| -32002 | Session not found | 会话不存在或已过期 |
| -32003 | Not initialized | 会话未完成初始化流程 |
| -32603 | Internal error | 服务器内部错误 |
@@ -203,10 +205,3 @@ MoviePilot 实现了标准的 **Model Context Protocol (MCP)**,允许 AI 智
"required": ["title", "year", "media_type"]
}
```
## 7. 注意事项
1. **用户上下文**: API调用会使用当前认证用户的ID作为工具执行的用户上下文
2. **会话隔离**: 每个API请求使用独立的会话ID
3. **参数验证**: 工具参数会根据JSON Schema进行验证
4. **错误日志**: 所有工具调用错误都会记录到MoviePilot日志系统