diff --git a/README.md b/README.md index d7f51e65..104aa5bd 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,8 @@ API文档:https://api.movie-pilot.org +MCP工具API文档:详见 [docs/mcp-api.md](docs/mcp-api.md) + 本地运行需要 `Python 3.12`、`Node JS v20.12.1` - 克隆主项目 [MoviePilot](https://github.com/jxxghp/MoviePilot) diff --git a/app/agent/tools/impl/query_sites.py b/app/agent/tools/impl/query_sites.py index 5992e6b0..028f75b1 100644 --- a/app/agent/tools/impl/query_sites.py +++ b/app/agent/tools/impl/query_sites.py @@ -21,7 +21,7 @@ class QuerySitesInput(BaseModel): class QuerySitesTool(MoviePilotTool): name: str = "query_sites" - description: str = "Query site status and list all configured sites. Shows site name, domain, status, priority, and basic configuration." + description: str = "Query site status and list all configured sites. Shows site name, domain, status, priority, and basic configuration. Site priority (pri): smaller values have higher priority (e.g., pri=1 has higher priority than pri=10)." args_schema: Type[BaseModel] = QuerySitesInput def get_tool_message(self, **kwargs) -> Optional[str]: diff --git a/app/agent/tools/impl/update_site.py b/app/agent/tools/impl/update_site.py index a3b18ead..59d5349b 100644 --- a/app/agent/tools/impl/update_site.py +++ b/app/agent/tools/impl/update_site.py @@ -20,7 +20,7 @@ class UpdateSiteInput(BaseModel): site_id: int = Field(..., description="The ID of the site to update") name: Optional[str] = Field(None, description="Site name (optional)") url: Optional[str] = Field(None, description="Site URL (optional, will be automatically formatted)") - pri: Optional[int] = Field(None, description="Site priority (optional, higher number = higher priority)") + pri: Optional[int] = Field(None, description="Site priority (optional, smaller value = higher priority, e.g., pri=1 has higher priority than pri=10)") rss: Optional[str] = Field(None, description="RSS feed URL (optional)") cookie: Optional[str] = Field(None, description="Site cookie (optional)") ua: Optional[str] = Field(None, description="User-Agent string (optional)") @@ -39,7 +39,7 @@ class UpdateSiteInput(BaseModel): class UpdateSiteTool(MoviePilotTool): name: str = "update_site" - description: str = "Update site configuration including URL, priority, authentication credentials (cookie, UA, API key), proxy settings, rate limits, and other site properties. Supports updating multiple site attributes at once." + description: str = "Update site configuration including URL, priority, authentication credentials (cookie, UA, API key), proxy settings, rate limits, and other site properties. Supports updating multiple site attributes at once. Site priority (pri): smaller values have higher priority (e.g., pri=1 has higher priority than pri=10)." args_schema: Type[BaseModel] = UpdateSiteInput def get_tool_message(self, **kwargs) -> Optional[str]: diff --git a/app/modules/filemanager/storages/u115.py b/app/modules/filemanager/storages/u115.py index 844b903b..a1bb4e27 100644 --- a/app/modules/filemanager/storages/u115.py +++ b/app/modules/filemanager/storages/u115.py @@ -1,15 +1,16 @@ import base64 -import hashlib import secrets -import threading import time from pathlib import Path -from typing import List, Optional, Tuple, Union +from threading import Lock +from typing import List, Optional, Tuple, Union, Dict +from hashlib import sha256 import oss2 -import requests +import httpx from oss2 import SizedFileAdapter, determine_part_size from oss2.models import PartInfo +from cryptography.hazmat.primitives import hashes from app import schemas from app.core.config import settings, global_vars @@ -19,8 +20,10 @@ from app.modules.filemanager.storages import transfer_process from app.schemas.types import StorageSchema from app.utils.singleton import WeakSingleton from app.utils.string import StringUtils +from app.utils.limit import QpsRateLimiter -lock = threading.Lock() + +lock = Lock() class NoCheckInException(Exception): @@ -36,10 +39,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): schema = StorageSchema.U115 # 支持的整理方式 - transtype = { - "move": "移动", - "copy": "复制" - } + transtype = {"move": "移动", "copy": "复制"} # 基础url base_url = "https://proapi.115.com" @@ -52,18 +52,28 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): def __init__(self): super().__init__() self._auth_state = {} - self.session = requests.Session() + self.session = httpx.Client(follow_redirects=True, timeout=20.0) self._init_session() + self.qps_limiter: Dict[str, QpsRateLimiter] = { + "/open/ufile/files": QpsRateLimiter(4), + "/open/folder/get_info": QpsRateLimiter(3), + "/open/ufile/move": QpsRateLimiter(2), + "/open/ufile/copy": QpsRateLimiter(2), + "/open/ufile/update": QpsRateLimiter(2), + "/open/ufile/delete": QpsRateLimiter(2), + } def _init_session(self): """ 初始化带速率限制的会话 """ - self.session.headers.update({ - "User-Agent": "W115Storage/2.0", - "Accept-Encoding": "gzip, deflate", - "Content-Type": "application/x-www-form-urlencoded" - }) + self.session.headers.update( + { + "User-Agent": "W115Storage/2.0", + "Accept-Encoding": "gzip, deflate", + "Content-Type": "application/x-www-form-urlencoded", + } + ) def _check_session(self): """ @@ -87,10 +97,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): if expires_in and refresh_time + expires_in < int(time.time()): tokens = self.__refresh_access_token(refresh_token) if tokens: - self.set_config({ - "refresh_time": int(time.time()), - **tokens - }) + self.set_config({"refresh_time": int(time.time()), **tokens}) else: return None access_token = tokens.get("access_token") @@ -105,7 +112,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): # 生成PKCE参数 code_verifier = secrets.token_urlsafe(96)[:128] code_challenge = base64.b64encode( - hashlib.sha256(code_verifier.encode("utf-8")).digest() + sha256(code_verifier.encode("utf-8")).digest() ).decode("utf-8") # 请求设备码 resp = self.session.post( @@ -113,8 +120,8 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): data={ "client_id": settings.U115_APP_ID, "code_challenge": code_challenge, - "code_challenge_method": "sha256" - } + "code_challenge_method": "sha256", + }, ) if resp is None: return {}, "网络错误" @@ -126,13 +133,11 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): "code_verifier": code_verifier, "uid": result["data"]["uid"], "time": result["data"]["time"], - "sign": result["data"]["sign"] + "sign": result["data"]["sign"], } # 生成二维码内容 - return { - "codeContent": result['data']['qrcode'] - }, "" + return {"codeContent": result["data"]["qrcode"]}, "" def check_login(self) -> Optional[Tuple[dict, str]]: """ @@ -146,8 +151,8 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): params={ "uid": self._auth_state["uid"], "time": self._auth_state["time"], - "sign": self._auth_state["sign"] - } + "sign": self._auth_state["sign"], + }, ) if resp is None: return {}, "网络错误" @@ -156,11 +161,11 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): return {}, result.get("message") if result["data"]["status"] == 2: tokens = self.__get_access_token() - self.set_config({ - "refresh_time": int(time.time()), - **tokens - }) - return {"status": result["data"]["status"], "tip": result["data"]["msg"]}, "" + self.set_config({"refresh_time": int(time.time()), **tokens}) + return { + "status": result["data"]["status"], + "tip": result["data"]["msg"], + }, "" except Exception as e: return {}, str(e) @@ -174,8 +179,8 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): "https://passportapi.115.com/open/deviceCodeToToken", data={ "uid": self._auth_state["uid"], - "code_verifier": self._auth_state["code_verifier"] - } + "code_verifier": self._auth_state["code_verifier"], + }, ) if resp is None: raise Exception("获取 access_token 失败") @@ -190,21 +195,24 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): """ resp = self.session.post( "https://passportapi.115.com/open/refreshToken", - data={ - "refresh_token": refresh_token - } + data={"refresh_token": refresh_token}, ) if resp is None: - logger.error(f"【115】刷新 access_token 失败:refresh_token={refresh_token}") + logger.error( + f"【115】刷新 access_token 失败:refresh_token={refresh_token}" + ) return None result = resp.json() if result.get("code") != 0: - logger.warn(f"【115】刷新 access_token 失败:{result.get('code')} - {result.get('message')}!") + logger.warn( + f"【115】刷新 access_token 失败:{result.get('code')} - {result.get('message')}!" + ) return None return result.get("data") - def _request_api(self, method: str, endpoint: str, - result_key: Optional[str] = None, **kwargs) -> Optional[Union[dict, list]]: + def _request_api( + self, method: str, endpoint: str, result_key: Optional[str] = None, **kwargs + ) -> Optional[Union[dict, list]]: """ 带错误处理和速率限制的API请求 """ @@ -216,12 +224,13 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): # 重试次数 retry_times = kwargs.pop("retry_limit", 5) + # qps 速率限制 + if endpoint in self.qps_limiter: + self.qps_limiter[endpoint].acquire() + try: - resp = self.session.request( - method, f"{self.base_url}{endpoint}", - **kwargs - ) - except requests.exceptions.RequestException as e: + resp = self.session.request(method, f"{self.base_url}{endpoint}", **kwargs) + except httpx.RequestError as e: logger.error(f"【115】{method} 请求 {endpoint} 网络错误: {str(e)}") return None @@ -241,7 +250,21 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): return self._request_api(method, endpoint, result_key, **kwargs) # 处理请求错误 - resp.raise_for_status() + try: + resp.raise_for_status() + except httpx.HTTPStatusError as e: + if retry_times <= 0: + logger.error( + f"【115】{method} 请求 {endpoint} 错误 {e},重试次数用尽!" + ) + return None + kwargs["retry_limit"] = retry_times - 1 + sleep_duration = 2 ** (5 - retry_times + 1) + logger.info( + f"【115】{method} 请求 {endpoint} 错误 {e},等待 {sleep_duration} 秒后重试..." + ) + time.sleep(sleep_duration) + return self._request_api(method, endpoint, result_key, **kwargs) # 返回数据 ret_data = resp.json() @@ -251,10 +274,14 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): logger.warn(f"【115】{method} 请求 {endpoint} 出错:{error_msg}") if "已达到当前访问上限" in error_msg: if retry_times <= 0: - logger.error(f"【115】{method} 请求 {endpoint} 达到访问上限,重试次数用尽!") + logger.error( + f"【115】{method} 请求 {endpoint} 达到访问上限,重试次数用尽!" + ) return None kwargs["retry_limit"] = retry_times - 1 - logger.info(f"【115】{method} 请求 {endpoint} 达到访问上限,等待 {self.retry_delay} 秒后重试...") + logger.info( + f"【115】{method} 请求 {endpoint} 达到访问上限,等待 {self.retry_delay} 秒后重试..." + ) time.sleep(self.retry_delay) return self._request_api(method, endpoint, result_key, **kwargs) return None @@ -269,26 +296,15 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): 计算文件SHA1(符合115规范) size: 前多少字节 """ - sha1 = hashlib.sha1() - with open(filepath, 'rb') as f: + sha1 = hashes.Hash(hashes.SHA1()) + with open(filepath, "rb") as f: if size: chunk = f.read(size) sha1.update(chunk) else: while chunk := f.read(8192): sha1.update(chunk) - return sha1.hexdigest() - - def _delay_get_item(self, path: Path) -> Optional[schemas.FileItem]: - """ - 自动延迟重试 get_item 模块 - """ - for i in range(1, 4): - time.sleep(2 ** i) - fileitem = self.get_item(path) - if fileitem: - return fileitem - return None + return sha1.finalize().hex() def init_storage(self): pass @@ -304,7 +320,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): return [item] return [] if fileitem.path == "/": - cid = '0' + cid = "0" else: cid = fileitem.fileid if not cid: @@ -322,29 +338,37 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): "GET", "/open/ufile/files", "data", - params={"cid": int(cid), "limit": 1000, "offset": offset, "cur": True, "show_dir": 1} + params={ + "cid": int(cid), + "limit": 1000, + "offset": offset, + "cur": True, + "show_dir": 1, + }, ) if resp is None: raise FileNotFoundError(f"【115】{fileitem.path} 检索出错!") if not resp: break for item in resp: - # 更新缓存 - path = f"{fileitem.path}{item['fn']}" - file_path = path + ("/" if item["fc"] == "0" else "") - items.append(schemas.FileItem( - storage=self.schema.value, - fileid=str(item["fid"]), - parent_fileid=cid, - name=item["fn"], - basename=Path(item["fn"]).stem, - extension=item["ico"] if item["fc"] == "1" else None, - type="dir" if item["fc"] == "0" else "file", - path=file_path, - size=item["fs"] if item["fc"] == "1" else None, - modify_time=item["upt"], - pickcode=item["pc"] - )) + parent_path = Path(fileitem.path) # noqa + item_name = item["fn"] + full_path = parent_path / item_name + items.append( + schemas.FileItem( + storage=self.schema.value, + fileid=str(item["fid"]), + parent_fileid=cid, + name=item["fn"], + basename=Path(item["fn"]).stem, + extension=item["ico"] if item["fc"] == "1" else None, + type="dir" if item["fc"] == "0" else "file", + path=full_path.as_posix() + ("/" if item["fc"] == "0" else ""), + size=item["fs"] if item["fc"] == "1" else None, + modify_time=item["upt"], + pickcode=item["pc"], + ) + ) if len(resp) < 1000: break @@ -352,7 +376,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): return items - def create_folder(self, parent_item: schemas.FileItem, name: str) -> Optional[schemas.FileItem]: + def create_folder( + self, parent_item: schemas.FileItem, name: str + ) -> Optional[schemas.FileItem]: """ 创建目录 """ @@ -360,10 +386,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): resp = self._request_api( "POST", "/open/folder/add", - data={ - "pid": int(parent_item.fileid or "0"), - "file_name": name - } + data={"pid": int(parent_item.fileid or "0"), "file_name": name}, ) if not resp: return None @@ -376,15 +399,19 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): return schemas.FileItem( storage=self.schema.value, fileid=str(resp["data"]["file_id"]), - path=str(new_path) + "/", + path=new_path.as_posix() + "/", name=name, basename=name, type="dir", - modify_time=int(time.time()) + modify_time=int(time.time()), ) - def upload(self, target_dir: schemas.FileItem, local_path: Path, - new_name: Optional[str] = None) -> Optional[schemas.FileItem]: + def upload( + self, + target_dir: schemas.FileItem, + local_path: Path, + new_name: Optional[str] = None, + ) -> Optional[schemas.FileItem]: """ 实现带秒传、断点续传和二次认证的文件上传 """ @@ -409,13 +436,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): "file_size": file_size, "target": target_param, "fileid": file_sha1, - "preid": file_preid + "preid": file_preid, } - init_resp = self._request_api( - "POST", - "/open/upload/init", - data=init_data - ) + init_resp = self._request_api("POST", "/open/upload/init", data=init_data) if not init_resp: return None if not init_resp.get("state"): @@ -444,19 +467,15 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): # 取2392148-2392298之间的内容(包含2392148、2392298)的sha1 f.seek(start) chunk = f.read(end - start + 1) - sign_val = hashlib.sha1(chunk).hexdigest().upper() + sha1 = hashes.Hash(hashes.SHA1()) + sha1.update(chunk) + sign_val = sha1.finalize().hex().upper() # 重新初始化请求 # sign_key,sign_val(根据sign_check计算的值大写的sha1值) - init_data.update({ - "pick_code": pick_code, - "sign_key": sign_key, - "sign_val": sign_val - }) - init_resp = self._request_api( - "POST", - "/open/upload/init", - data=init_data + init_data.update( + {"pick_code": pick_code, "sign_key": sign_key, "sign_val": sign_val} ) + init_resp = self._request_api("POST", "/open/upload/init", data=init_data) if not init_resp: return None if not init_resp.get("state"): @@ -485,32 +504,30 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): "GET", "/open/folder/get_info", "data", - params={ - "file_id": int(file_id) - } + params={"file_id": int(file_id)}, ) if info_resp: return schemas.FileItem( storage=self.schema.value, fileid=str(info_resp["file_id"]), - path=str(target_path) + ("/" if info_resp["file_category"] == "0" else ""), + path=target_path.as_posix() + + ("/" if info_resp["file_category"] == "0" else ""), type="file" if info_resp["file_category"] == "1" else "dir", name=info_resp["file_name"], basename=Path(info_resp["file_name"]).stem, - extension=Path(info_resp["file_name"]).suffix[1:] if info_resp[ - "file_category"] == "1" else None, + extension=Path(info_resp["file_name"]).suffix[1:] + if info_resp["file_category"] == "1" + else None, pickcode=info_resp["pick_code"], - size=StringUtils.num_filesize(info_resp['size']) if info_resp["file_category"] == "1" else None, - modify_time=info_resp["utime"] + size=StringUtils.num_filesize(info_resp["size"]) + if info_resp["file_category"] == "1" + else None, + modify_time=info_resp["utime"], ) - return self._delay_get_item(target_path) + return self.get_item(target_path) # Step 4: 获取上传凭证 - token_resp = self._request_api( - "GET", - "/open/upload/get_token", - "data" - ) + token_resp = self._request_api("GET", "/open/upload/get_token", "data") if not token_resp: logger.warn("【115】获取上传凭证失败") return None @@ -530,8 +547,8 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): "file_size": file_size, "target": target_param, "fileid": file_sha1, - "pick_code": pick_code - } + "pick_code": pick_code, + }, ) if resume_resp: logger.debug(f"【115】上传 Step 5 断点续传结果: {resume_resp}") @@ -542,25 +559,25 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): auth = oss2.StsAuth( access_key_id=AccessKeyId, access_key_secret=AccessKeySecret, - security_token=SecurityToken + security_token=SecurityToken, ) bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa # determine_part_size方法用于确定分片大小,设置分片大小为 10M part_size = determine_part_size(file_size, preferred_size=10 * 1024 * 1024) # 初始化进度条 - logger.info(f"【115】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}") + logger.info( + f"【115】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}" + ) progress_callback = transfer_process(local_path.as_posix()) # 初始化分片 - upload_id = bucket.init_multipart_upload(object_name, - params={ - "encoding-type": "url", - "sequential": "" - }).upload_id + upload_id = bucket.init_multipart_upload( + object_name, params={"encoding-type": "url", "sequential": ""} + ).upload_id parts = [] # 逐个上传分片 - with open(local_path, 'rb') as fileobj: + with open(local_path, "rb") as fileobj: part_number = 1 offset = 0 while offset < file_size: @@ -569,9 +586,15 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): return None num_to_upload = min(part_size, file_size - offset) # 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。 - logger.info(f"【115】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}") - result = bucket.upload_part(object_name, upload_id, part_number, - data=SizedFileAdapter(fileobj, num_to_upload)) + logger.info( + f"【115】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}" + ) + result = bucket.upload_part( + object_name, + upload_id, + part_number, + data=SizedFileAdapter(fileobj, num_to_upload), + ) parts.append(PartInfo(part_number, result.etag)) logger.info(f"【115】{target_name} 分片 {part_number} 上传完成") offset += num_to_upload @@ -585,15 +608,18 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): # 请求头 headers = { - 'X-oss-callback': encode_callback(callback["callback"]), - 'x-oss-callback-var': encode_callback(callback["callback_var"]), - 'x-oss-forbid-overwrite': 'false' + "X-oss-callback": encode_callback(callback["callback"]), + "x-oss-callback-var": encode_callback(callback["callback_var"]), + "x-oss-forbid-overwrite": "false", } try: - result = bucket.complete_multipart_upload(object_name, upload_id, parts, - headers=headers) + result = bucket.complete_multipart_upload( + object_name, upload_id, parts, headers=headers + ) if result.status == 200: - logger.debug(f"【115】上传 Step 6 回调结果:{result.resp.response.json()}") + logger.debug( + f"【115】上传 Step 6 回调结果:{result.resp.response.json()}" + ) logger.info(f"【115】{target_name} 上传成功") else: logger.warn(f"【115】{target_name} 上传失败,错误码: {result.status}") @@ -602,10 +628,12 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): if e.code == "FileAlreadyExists": logger.warn(f"【115】{target_name} 已存在") else: - logger.error(f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}") + logger.error( + f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}" + ) return None # 返回结果 - return self._delay_get_item(target_path) + return self.get_item(target_path) def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]: """ @@ -617,12 +645,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): return None download_info = self._request_api( - "POST", - "/open/ufile/downurl", - "data", - data={ - "pick_code": detail.pickcode - } + "POST", "/open/ufile/downurl", "data", data={"pick_code": detail.pickcode} ) if not download_info: logger.error(f"【115】获取下载链接失败: {fileitem.name}") @@ -643,28 +666,26 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): progress_callback = transfer_process(Path(fileitem.path).as_posix()) try: - with self.session.get(download_url, stream=True) as r: + with self.session.stream("GET", download_url) as r: r.raise_for_status() downloaded_size = 0 with open(local_path, "wb") as f: - for chunk in r.iter_content(chunk_size=self.chunk_size): + for chunk in r.iter_bytes(chunk_size=self.chunk_size): if global_vars.is_transfer_stopped(fileitem.path): logger.info(f"【115】{fileitem.path} 下载已取消!") + r.close() return None - if chunk: - f.write(chunk) - downloaded_size += len(chunk) - # 更新进度 - if file_size: - progress = (downloaded_size * 100) / file_size - progress_callback(progress) + f.write(chunk) + downloaded_size += len(chunk) + if file_size: + progress = (downloaded_size * 100) / file_size + progress_callback(progress) # 完成下载 progress_callback(100) logger.info(f"【115】下载完成: {fileitem.name}") - - except requests.exceptions.RequestException as e: + except httpx.RequestError as e: logger.error(f"【115】下载网络错误: {fileitem.name} - {str(e)}") # 删除可能部分下载的文件 if local_path.exists(): @@ -688,14 +709,10 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): """ try: self._request_api( - "POST", - "/open/ufile/delete", - data={ - "file_ids": int(fileitem.fileid) - } + "POST", "/open/ufile/delete", data={"file_ids": int(fileitem.fileid)} ) return True - except requests.exceptions.HTTPError: + except httpx.HTTPError: return False def rename(self, fileitem: schemas.FileItem, name: str) -> bool: @@ -705,10 +722,7 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): resp = self._request_api( "POST", "/open/ufile/update", - data={ - "file_id": int(fileitem.fileid), - "file_name": name - } + data={"file_id": int(fileitem.fileid), "file_name": name}, ) if not resp: return False @@ -725,10 +739,8 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): "POST", "/open/folder/get_info", "data", - data={ - "path": path.as_posix() - }, - no_error_log=True + data={"path": path.as_posix()}, + no_error_log=True, ) if not resp: return None @@ -739,10 +751,12 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): type="file" if resp["file_category"] == "1" else "dir", name=resp["file_name"], basename=Path(resp["file_name"]).stem, - extension=Path(resp["file_name"]).suffix[1:] if resp["file_category"] == "1" else None, + extension=Path(resp["file_name"]).suffix[1:] + if resp["file_category"] == "1" + else None, pickcode=resp["pick_code"], - size=resp['size_byte'] if resp["file_category"] == "1" else None, - modify_time=resp["utime"] + size=resp["size_byte"] if resp["file_category"] == "1" else None, + modify_time=resp["utime"], ) except Exception as e: logger.debug(f"【115】获取文件信息失败: {str(e)}") @@ -753,7 +767,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): 获取指定路径的文件夹,如不存在则创建 """ - def __find_dir(_fileitem: schemas.FileItem, _name: str) -> Optional[schemas.FileItem]: + def __find_dir( + _fileitem: schemas.FileItem, _name: str + ) -> Optional[schemas.FileItem]: """ 查找下级目录中匹配名称的目录 """ @@ -808,13 +824,13 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): data={ "file_id": int(fileitem.fileid), "pid": int(dest_fileitem.fileid), - } + }, ) if not resp: return False if resp["state"]: new_path = Path(path) / fileitem.name - new_item = self._delay_get_item(new_path) + new_item = self.get_item(new_path) if not new_item: return False if self.rename(new_item, new_name): @@ -840,13 +856,13 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): data={ "file_ids": int(fileitem.fileid), "to_cid": int(dest_fileitem.fileid), - } + }, ) if not resp: return False if resp["state"]: new_path = Path(path) / fileitem.name - new_file = self._delay_get_item(new_path) + new_file = self.get_item(new_path) if not new_file: return False if self.rename(new_file, new_name): @@ -864,17 +880,12 @@ class U115Pan(StorageBase, metaclass=WeakSingleton): 获取带有企业级配额信息的存储使用情况 """ try: - resp = self._request_api( - "GET", - "/open/user/info", - "data" - ) + resp = self._request_api("GET", "/open/user/info", "data") if not resp: return None space = resp["rt_space_info"] return schemas.StorageUsage( - total=space["all_total"]["size"], - available=space["all_remain"]["size"] + total=space["all_total"]["size"], available=space["all_remain"]["size"] ) except NoCheckInException: return None diff --git a/app/utils/limit.py b/app/utils/limit.py index 6205d1fc..e9a90acd 100644 --- a/app/utils/limit.py +++ b/app/utils/limit.py @@ -382,3 +382,28 @@ def rate_limit_window(max_calls: int, window_seconds: float, limiter = WindowRateLimiter(max_calls, window_seconds, source, enable_logging) # 使用通用装饰器逻辑包装该限流器 return rate_limit_handler(limiter, raise_on_limit) + + +class QpsRateLimiter: + """ + 速率控制器,精确控制 QPS + """ + + def __init__(self, qps: float | int): + if qps <= 0: + qps = float("inf") + self.interval = 1.0 / qps + self.lock = threading.Lock() + self.next_call_time = time.monotonic() + + def acquire(self) -> None: + """ + 获取调用许可,阻塞直到满足速率限制 + """ + sleep_duration = 0 + with self.lock: + now = time.monotonic() + sleep_duration = self.next_call_time - now + self.next_call_time = max(now, self.next_call_time) + self.interval + if sleep_duration > 0: + time.sleep(sleep_duration) diff --git a/docs/mcp-api.md b/docs/mcp-api.md index d96b849d..03622028 100644 --- a/docs/mcp-api.md +++ b/docs/mcp-api.md @@ -12,7 +12,7 @@ MoviePilot的智能体工具已通过HTTP API暴露,可以通过RESTful API调 获取所有可用的MCP工具列表。 -**认证**: 需要API KEY,从 URL 查询参数中获取 `apikey=xxx`,或请求头中获取 `X-API-KEY` +**认证**: 需要API KEY,在请求头中添加 `X-API-KEY: ` 或在查询参数中添加 `apikey=` **响应示例**: ```json @@ -46,7 +46,7 @@ MoviePilot的智能体工具已通过HTTP API暴露,可以通过RESTful API调 调用指定的MCP工具。 -**认证**: 需要Bearer Token +**认证**: 需要API KEY,在请求头中添加 `X-API-KEY: ` 或在查询参数中添加 `apikey=` **请求体**: ```json @@ -84,7 +84,7 @@ MoviePilot的智能体工具已通过HTTP API暴露,可以通过RESTful API调 获取指定工具的详细信息。 -**认证**: 需要Bearer Token +**认证**: 需要API KEY,在请求头中添加 `X-API-KEY: ` 或在查询参数中添加 `apikey=` **路径参数**: - `tool_name`: 工具名称 @@ -114,7 +114,7 @@ MoviePilot的智能体工具已通过HTTP API暴露,可以通过RESTful API调 获取指定工具的参数Schema(JSON Schema格式)。 -**认证**: 需要Bearer Token +**认证**: 需要API KEY,在请求头中添加 `X-API-KEY: ` 或在查询参数中添加 `apikey=` **路径参数**: - `tool_name`: 工具名称 @@ -138,128 +138,115 @@ MoviePilot的智能体工具已通过HTTP API暴露,可以通过RESTful API调 } ``` -## 使用示例 +## MCP客户端配置 -### 使用curl调用工具 +MoviePilot的MCP工具可以通过HTTP协议在支持MCP的客户端中使用。以下是常见MCP客户端的配置方法: -```bash -# 1. 获取访问令牌(通过登录API) -TOKEN=$(curl -X POST "http://localhost:3001/api/v1/login/access-token" \ - -H "Content-Type: application/x-www-form-urlencoded" \ - -d "username=admin&password=your_password" | jq -r '.access_token') +### Claude Desktop (Anthropic) -# 2. 列出所有工具 -curl -X GET "http://localhost:3001/api/v1/mcp/tools" \ - -H "Authorization: Bearer $TOKEN" +在Claude Desktop的配置文件中添加MoviePilot的MCP服务器配置: -# 3. 调用工具 -curl -X POST "http://localhost:3001/api/v1/mcp/tools/call" \ - -H "Authorization: Bearer $TOKEN" \ - -H "Content-Type: application/json" \ - -d '{ - "tool_name": "query_subscribes", - "arguments": { - "status": "all", - "media_type": "all" +**macOS**: `~/Library/Application Support/Claude/claude_desktop_config.json` +**Windows**: `%APPDATA%\Claude\claude_desktop_config.json` + +```json +{ + "mcpServers": { + "moviepilot": { + "command": "npx", + "args": [ + "-y", + "@modelcontextprotocol/server-http", + "http://localhost:3001/api/v1/mcp" + ], + "env": { + "X-API-KEY": "your_api_key_here" + } } - }' - -# 4. 获取工具详情 -curl -X GET "http://localhost:3001/api/v1/mcp/tools/add_subscribe" \ - -H "Authorization: Bearer $TOKEN" + } +} ``` -### 使用Python调用 +**注意**: 如果MCP HTTP服务器不支持环境变量传递API Key,可以使用查询参数方式: -```python -import requests - -# 配置 -BASE_URL = "http://localhost:3001/api/v1" -TOKEN = "your_access_token" -HEADERS = {"Authorization": f"Bearer {TOKEN}"} - -# 1. 列出所有工具 -response = requests.get(f"{BASE_URL}/mcp/tools", headers=HEADERS) -tools = response.json() -print(f"可用工具数量: {len(tools)}") - -# 2. 调用工具 -tool_call = { - "tool_name": "add_subscribe", - "arguments": { - "title": "流浪地球", - "year": "2019", - "media_type": "电影" +```json +{ + "mcpServers": { + "moviepilot": { + "command": "npx", + "args": [ + "-y", + "@modelcontextprotocol/server-http", + "http://localhost:3001/api/v1/mcp?apikey=your_api_key_here" + ] } + } } -response = requests.post( - f"{BASE_URL}/mcp/tools/call", - headers=HEADERS, - json=tool_call -) -result = response.json() -print(f"执行结果: {result['result']}") - -# 3. 获取工具Schema -response = requests.get( - f"{BASE_URL}/mcp/tools/add_subscribe/schema", - headers=HEADERS -) -schema = response.json() -print(f"工具Schema: {schema}") ``` -### 使用JavaScript/TypeScript调用 +### 其他支持MCP的聊天客户端 -```typescript -const BASE_URL = 'http://localhost:3001/api/v1'; -const TOKEN = 'your_access_token'; +对于其他支持MCP协议的聊天客户端(如其他AI聊天助手、对话机器人等),通常可以通过配置文件或设置界面添加HTTP协议的MCP服务器。配置格式可能因客户端而异,但通常需要以下信息: -// 列出所有工具 -async function listTools() { - const response = await fetch(`${BASE_URL}/mcp/tools`, { - headers: { - 'Authorization': `Bearer ${TOKEN}` +**配置参数**: +1. **服务器类型**: HTTP +2. **服务器地址**: `http://your-moviepilot-host:3001/api/v1/mcp` +3. **认证方式**: + - 在HTTP请求头中添加 `X-API-KEY: ` + - 或在URL查询参数中添加 `apikey=` + +**示例配置**(通用格式): + +使用请求头方式: +```json +{ + "mcpServers": { + "moviepilot": { + "url": "http://localhost:3001/api/v1/mcp", + "headers": { + "X-API-KEY": "your_api_key_here" + } } - }); - return await response.json(); + } } - -// 调用工具 -async function callTool(toolName: string, arguments: Record) { - const response = await fetch(`${BASE_URL}/mcp/tools/call`, { - method: 'POST', - headers: { - 'Authorization': `Bearer ${TOKEN}`, - 'Content-Type': 'application/json' - }, - body: JSON.stringify({ - tool_name: toolName, - arguments: arguments - }) - }); - return await response.json(); -} - -// 使用示例 -const result = await callTool('query_subscribes', { - status: 'all', - media_type: 'all' -}); -console.log(result); ``` +或使用查询参数方式: +```json +{ + "mcpServers": { + "moviepilot": { + "url": "http://localhost:3001/api/v1/mcp?apikey=your_api_key_here" + } + } +} +``` + +**支持的端点**: +- `GET /tools` - 列出所有工具 +- `POST /tools/call` - 调用工具 +- `GET /tools/{tool_name}` - 获取工具详情 +- `GET /tools/{tool_name}/schema` - 获取工具参数Schema + +配置完成后,您就可以在聊天对话中使用MoviePilot的各种工具,例如: +- 添加媒体订阅 +- 查询下载历史 +- 搜索媒体资源 +- 管理媒体服务器 +- 等等... + +### 获取API Key + +API Key可以在MoviePilot的系统设置中生成和查看。请妥善保管您的API Key,不要泄露给他人。 + ## 认证 -所有MCP API端点都需要认证。支持以下认证方式: +所有MCP API端点都需要认证。**仅支持API Key认证方式**: -1. **Bearer Token**: 在请求头中添加 `Authorization: Bearer ` -2. **API Key**: 在请求头中添加 `X-API-KEY: ` 或在查询参数中添加 `apikey=` +- **请求头方式**: 在请求头中添加 `X-API-KEY: ` +- **查询参数方式**: 在URL查询参数中添加 `apikey=` -获取Token的方式: -- 通过登录API: `POST /api/v1/login/access-token` -- 通过API Key: 在系统设置中生成API Key +**获取API Key**: 在MoviePilot系统设置中生成和查看API Key。请妥善保管您的API Key,不要泄露给他人。 ## 错误处理 @@ -267,7 +254,7 @@ API会返回标准的HTTP状态码: - `200 OK`: 请求成功 - `400 Bad Request`: 请求参数错误 -- `401 Unauthorized`: 未认证或Token无效 +- `401 Unauthorized`: 未认证或API Key无效 - `404 Not Found`: 工具不存在 - `500 Internal Server Error`: 服务器内部错误 diff --git a/requirements.in b/requirements.in index b10dd956..c8f56f38 100644 --- a/requirements.in +++ b/requirements.in @@ -80,11 +80,11 @@ pympler~=1.1 smbprotocol~=1.15.0 setproctitle~=1.3.6 httpx[socks]~=0.28.1 -langchain==0.3.27 -langchain-core==0.3.76 -langchain-community==0.3.29 -langchain-openai==0.3.33 -langchain-google-genai==2.0.10 -langchain-deepseek==0.1.4 -langchain-experimental==0.3.4 -openai~=2.8.1 \ No newline at end of file +langchain~=0.3.27 +langchain-core~=0.3.76 +langchain-community~=0.3.29 +langchain-openai~=0.3.33 +langchain-google-genai~=2.0.10 +langchain-deepseek~=0.1.4 +langchain-experimental~=0.3.4 +openai~=1.108.2 \ No newline at end of file diff --git a/version.py b/version.py index 6c859fc6..a4579fe9 100644 --- a/version.py +++ b/version.py @@ -1,2 +1,2 @@ -APP_VERSION = 'v2.8.3' +APP_VERSION = 'v2.8.4' FRONTEND_VERSION = 'v2.8.3'