diff --git a/app/api/endpoints/mcp.py b/app/api/endpoints/mcp.py index be238fc2..889b9a3f 100644 --- a/app/api/endpoints/mcp.py +++ b/app/api/endpoints/mcp.py @@ -2,7 +2,6 @@ 通过HTTP API暴露MoviePilot的智能体工具功能 """ -import uuid from typing import List, Any, Dict, Annotated, Optional, Union from fastapi import APIRouter, Depends, HTTPException, Request, Header @@ -25,64 +24,18 @@ router = APIRouter() MCP_PROTOCOL_VERSIONS = ["2025-11-25", "2025-06-18", "2024-11-05"] MCP_PROTOCOL_VERSION = MCP_PROTOCOL_VERSIONS[0] # 默认使用最新版本 -# 全局会话管理器 -_sessions: Dict[str, Dict[str, Any]] = {} -# 全局工具管理器实例(单例模式,按用户ID缓存) -_tools_managers: Dict[str, MoviePilotToolsManager] = {} - - -def get_tools_manager(user_id: str = "mcp_user", session_id: str = "mcp_session") -> MoviePilotToolsManager: +def get_tools_manager() -> MoviePilotToolsManager: """ - 获取工具管理器实例(按用户ID缓存) - + 获取工具管理器实例 + Args: user_id: 用户ID - session_id: 会话ID - + Returns: MoviePilotToolsManager实例 """ - global _tools_managers - # 使用用户ID作为缓存键 - cache_key = f"{user_id}_{session_id}" - if cache_key not in _tools_managers: - _tools_managers[cache_key] = MoviePilotToolsManager( - user_id=user_id, - session_id=session_id - ) - return _tools_managers[cache_key] - - -def get_session(session_id: Optional[str]) -> Optional[Dict[str, Any]]: - """获取会话""" - if not session_id: - return None - return _sessions.get(session_id) - - -def create_session(user_id: str) -> Dict[str, Any]: - """创建新会话""" - session_id = str(uuid.uuid4()) - session = { - "id": session_id, - "user_id": user_id, - "initialized": False, - "protocol_version": None, - "capabilities": {} - } - _sessions[session_id] = session - return session - - -def delete_session(session_id: str): - """删除会话""" - if session_id in _sessions: - del _sessions[session_id] - # 同时清理工具管理器缓存 - cache_key = f"{_sessions.get(session_id, {}).get('user_id', 'mcp_user')}_{session_id}" - if cache_key in _tools_managers: - del _tools_managers[cache_key] + return MoviePilotToolsManager() def create_jsonrpc_response(request_id: Union[str, int, None], result: Any) -> Dict[str, Any]: @@ -112,13 +65,11 @@ def create_jsonrpc_error(request_id: Union[str, int, None], code: int, message: # ==================== MCP JSON-RPC 端点 ==================== -@router.post("", summary="MCP JSON-RPC 端点") +@router.post("", summary="MCP JSON-RPC 端点", response_model=None) async def mcp_jsonrpc( request: Request, - mcp_session_id: Optional[str] = Header(None, alias="MCP-Session-Id"), - mcp_protocol_version: Optional[str] = Header(None, alias="MCP-Protocol-Version"), _: Annotated[str, Depends(verify_apikey)] = None -) -> JSONResponse: +) -> Union[JSONResponse, Response]: """ MCP 标准 JSON-RPC 2.0 端点 @@ -150,48 +101,27 @@ async def mcp_jsonrpc( try: # 处理初始化请求 if method == "initialize": - result = await handle_initialize(params, mcp_session_id) - response = create_jsonrpc_response(request_id, result["result"]) - # 如果创建了新会话,在响应头中返回 - if "session_id" in result: - headers = {"MCP-Session-Id": result["session_id"]} - return JSONResponse(content=response, headers=headers) - return JSONResponse(content=response) + result = await handle_initialize(params) + return JSONResponse(content=create_jsonrpc_response(request_id, result)) # 处理已初始化通知 elif method == "notifications/initialized": if is_notification: - session = get_session(mcp_session_id) - if session: - session["initialized"] = True - # 通知不需要响应 - return Response(status_code=202) + return Response(status_code=204) else: return JSONResponse( - content=create_jsonrpc_error(request_id, -32600, "initialized must be a notification") - ) - - # 验证会话(除了 initialize 和 ping) - if method not in ["initialize", "ping"]: - session = get_session(mcp_session_id) - if not session: - return JSONResponse( - status_code=404, - content=create_jsonrpc_error(request_id, -32002, "Session not found") - ) - if not session.get("initialized") and method != "notifications/initialized": - return JSONResponse( - content=create_jsonrpc_error(request_id, -32003, "Not initialized") + status_code=400, + content={"error": "initialized must be a notification"} ) # 处理工具列表请求 if method == "tools/list": - result = await handle_tools_list(mcp_session_id) + result = await handle_tools_list() return JSONResponse(content=create_jsonrpc_response(request_id, result)) # 处理工具调用请求 elif method == "tools/call": - result = await handle_tools_call(params, mcp_session_id) + result = await handle_tools_call(params) return JSONResponse(content=create_jsonrpc_response(request_id, result)) # 处理 ping 请求 @@ -204,6 +134,12 @@ async def mcp_jsonrpc( content=create_jsonrpc_error(request_id, -32601, f"Method not found: {method}") ) + except ValueError as e: + logger.warning(f"MCP 请求参数错误: {e}") + return JSONResponse( + status_code=400, + content=create_jsonrpc_error(request_id, -32602, "Invalid params", str(e)) + ) except Exception as e: logger.error(f"处理 MCP 请求失败: {e}", exc_info=True) return JSONResponse( @@ -212,24 +148,13 @@ async def mcp_jsonrpc( ) -async def handle_initialize(params: Dict[str, Any], session_id: Optional[str]) -> Dict[str, Any]: +async def handle_initialize(params: Dict[str, Any]) -> Dict[str, Any]: """处理初始化请求""" protocol_version = params.get("protocolVersion") client_info = params.get("clientInfo", {}) - client_capabilities = params.get("capabilities", {}) logger.info(f"MCP 初始化请求: 客户端={client_info.get('name')}, 协议版本={protocol_version}") - # 如果没有提供会话ID,创建新会话 - new_session = None - if not session_id: - new_session = create_session(user_id="mcp_user") - session_id = new_session["id"] - - session = get_session(session_id) or new_session - if not session: - raise ValueError("Failed to create session") - # 版本协商:选择客户端和服务器都支持的版本 negotiated_version = MCP_PROTOCOL_VERSION if protocol_version in MCP_PROTOCOL_VERSIONS: @@ -240,40 +165,26 @@ async def handle_initialize(params: Dict[str, Any], session_id: Optional[str]) - # 客户端版本不支持,使用服务器默认版本 logger.warning(f"协议版本不匹配: 客户端={protocol_version}, 使用服务器版本={negotiated_version}") - session["protocol_version"] = negotiated_version - session["capabilities"] = client_capabilities - - result = { - "result": { - "protocolVersion": negotiated_version, - "capabilities": { - "tools": { - "listChanged": False # 暂不支持工具列表变更通知 - }, - "logging": {} + return { + "protocolVersion": negotiated_version, + "capabilities": { + "tools": { + "listChanged": False # 暂不支持工具列表变更通知 }, - "serverInfo": { - "name": "MoviePilot", - "version": APP_VERSION, - "description": "MoviePilot MCP Server - 电影自动化管理工具", - }, - "instructions": "MoviePilot MCP 服务器,提供媒体管理、订阅、下载等工具。使用 tools/list 查看所有可用工具。" - } + "logging": {} + }, + "serverInfo": { + "name": "MoviePilot", + "version": APP_VERSION, + "description": "MoviePilot MCP Server - 电影自动化管理工具", + }, + "instructions": "MoviePilot MCP 服务器,提供媒体管理、订阅、下载等工具。" } - # 如果是新创建的会话,返回会话ID - if new_session: - result["session_id"] = session_id - return result - - -async def handle_tools_list(session_id: Optional[str]) -> Dict[str, Any]: +async def handle_tools_list() -> Dict[str, Any]: """处理工具列表请求""" - session = get_session(session_id) - user_id = session.get("user_id", "mcp_user") if session else "mcp_user" - - manager = get_tools_manager(user_id=user_id, session_id=session_id or "default") + manager = get_tools_manager() tools = manager.list_tools() # 转换为 MCP 工具格式 @@ -291,7 +202,7 @@ async def handle_tools_list(session_id: Optional[str]) -> Dict[str, Any]: } -async def handle_tools_call(params: Dict[str, Any], session_id: Optional[str]) -> Dict[str, Any]: +async def handle_tools_call(params: Dict[str, Any]) -> Dict[str, Any]: """处理工具调用请求""" tool_name = params.get("name") arguments = params.get("arguments", {}) @@ -299,10 +210,7 @@ async def handle_tools_call(params: Dict[str, Any], session_id: Optional[str]) - if not tool_name: raise ValueError("Missing tool name") - session = get_session(session_id) - user_id = session.get("user_id", "mcp_user") if session else "mcp_user" - - manager = get_tools_manager(user_id=user_id, session_id=session_id or "default") + manager = get_tools_manager() try: result_text = await manager.call_tool(tool_name, arguments) @@ -328,26 +236,18 @@ async def handle_tools_call(params: Dict[str, Any], session_id: Optional[str]) - } -@router.delete("", summary="终止 MCP 会话") +@router.delete("", summary="终止 MCP 会话", response_model=None) async def delete_mcp_session( - mcp_session_id: Optional[str] = Header(None, alias="MCP-Session-Id"), _: Annotated[str, Depends(verify_apikey)] = None -) -> JSONResponse: +) -> Union[JSONResponse, Response]: """ - 终止 MCP 会话(可选实现) - - 客户端可以主动调用此接口终止会话 + 终止 MCP 会话(无状态模式下仅返回成功) """ - if not mcp_session_id: - return JSONResponse( - status_code=400, - content={"detail": "Missing MCP-Session-Id header"} - ) - - delete_session(mcp_session_id) return Response(status_code=204) + + # ==================== 兼容的 RESTful API 端点 ==================== @router.get("/tools", summary="列出所有可用工具", response_model=List[Dict[str, Any]]) diff --git a/docs/mcp-api.md b/docs/mcp-api.md index 01d0cea6..b5f27ba6 100644 --- a/docs/mcp-api.md +++ b/docs/mcp-api.md @@ -16,11 +16,12 @@ MoviePilot 实现了标准的 **Model Context Protocol (MCP)**,允许 AI 智 ### 端点 **POST** `/api/v1/mcp` -## 3. 会话管理 - -* **会话维持**: 在标准 MCP 流程中,通过 HTTP Header `MCP-Session-Id` 识别会话。 -* **主动终止**: - **DELETE** `/api/v1/mcp` (携带 `MCP-Session-Id` Header) +### 支持的方法 +- `initialize`: 初始化会话,协商协议版本和能力。 +- `notifications/initialized`: 客户端确认初始化完成。 +- `tools/list`: 获取可用工具列表。 +- `tools/call`: 调用特定工具。 +- `ping`: 连接存活检测。 --- @@ -65,6 +66,7 @@ MoviePilot 实现了标准的 **Model Context Protocol (MCP)**,允许 AI 智 | -32700 | Parse error | JSON 格式错误 | | -32600 | Invalid Request | 无效的 JSON-RPC 请求 | | -32601 | Method not found | 方法不存在 | +| -32602 | Invalid params | 参数验证失败 | | -32002 | Session not found | 会话不存在或已过期 | | -32003 | Not initialized | 会话未完成初始化流程 | | -32603 | Internal error | 服务器内部错误 | @@ -203,10 +205,3 @@ MoviePilot 实现了标准的 **Model Context Protocol (MCP)**,允许 AI 智 "required": ["title", "year", "media_type"] } ``` - -## 7. 注意事项 - -1. **用户上下文**: API调用会使用当前认证用户的ID作为工具执行的用户上下文 -2. **会话隔离**: 每个API请求使用独立的会话ID -3. **参数验证**: 工具参数会根据JSON Schema进行验证 -4. **错误日志**: 所有工具调用错误都会记录到MoviePilot日志系统