diff --git a/app/core/config.py b/app/core/config.py index 6fb82368..036eaa14 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -109,6 +109,8 @@ class ConfigModel(BaseModel): FANART_ENABLE: bool = True # Fanart API Key FANART_API_KEY: str = "d2d31f9ecabea050fc7d68aa3146015f" + # 115 AppId + U115_APP_ID: str = "" # 元数据识别缓存过期时间(小时) META_CACHE_EXPIRE: int = 0 # 电视剧动漫的分类genre_ids diff --git a/app/modules/filemanager/storages/u115.py b/app/modules/filemanager/storages/u115.py index b61007e4..d4f5b295 100644 --- a/app/modules/filemanager/storages/u115.py +++ b/app/modules/filemanager/storages/u115.py @@ -1,13 +1,14 @@ +import base64 import hashlib -import os +import secrets import time from pathlib import Path -from typing import List, Dict, Optional, Tuple +from typing import List, Dict, Optional, Tuple, Union -import qrcode import requests from app import schemas +from app.core.config import settings from app.log import logger from app.modules.filemanager import StorageBase from app.schemas.types import StorageSchema @@ -28,14 +29,14 @@ class U115Pan(StorageBase, metaclass=Singleton): "copy": "复制" } - # 访问token - access_token = None + # 验证参数 + _auth_state = {} # 基础url - base_url = "https://api.115.com" + base_url = "https://proapi.115.com" # CID和路径缓存 - _cid_cache: Dict[str | int, str | int] = {} + _id_cache: Dict[str, int] = {} def __init__(self): super().__init__() @@ -46,84 +47,171 @@ class U115Pan(StorageBase, metaclass=Singleton): """ 初始化带速率限制的会话 """ - adapter = requests.adapters.HTTPAdapter( - max_retries=3, - pool_connections=10, - pool_maxsize=50 - ) - self.session.mount('https://', adapter) self.session.headers.update({ "User-Agent": "W115Storage/2.0", - "Accept-Encoding": "gzip, deflate" + "Accept-Encoding": "gzip, deflate", + "Content-Type": "application/x-www-form-urlencoded" }) + @property + def access_token(self) -> Optional[str]: + """ + 访问token + """ + tokens = self.get_conf() + refresh_token = tokens.get("refresh_token") + if not refresh_token: + return None + expires_in = tokens.get("expires_in", 0) + refresh_time = tokens.get("refresh_time", 0) + if expires_in and refresh_time + expires_in >= int(time.time()): + tokens = self.__refresh_access_token(refresh_token) + if tokens: + self.set_config({ + "refresh_time": int(time.time()), + **tokens + }) + return tokens.get("access_token") + def generate_qrcode(self) -> Tuple[dict, str]: """ - 生成设备授权二维码 + 实现PKCE规范的设备授权二维码生成 + """ + # 生成PKCE参数 + code_verifier = secrets.token_urlsafe(96)[:128] + code_challenge = base64.urlsafe_b64encode( + hashlib.sha256(code_verifier.encode()).digest() + ).decode().replace("=", "") + # 请求设备码 + resp = self.session.post( + "https://passportapi.115.com/open/authDeviceCode", + data={ + "client_id": settings.U115_APP_ID, + "code_challenge": code_challenge, + "code_challenge_method": "sha256" + } + ) + if resp is None: + return {}, "网络错误" + result = resp.json() + if result.get("code") != 0: + return {}, result.get("message") + # 持久化验证参数 + self._auth_state = { + "code_verifier": code_verifier, + "uid": result["data"]["uid"], + "time": result["data"]["time"], + "sign": result["data"]["sign"] + } + + # 生成二维码内容 + return { + "codeContent": result['data']['qrcode'] + }, "" + + def __get_access_token(self) -> dict: + """ + 确认登录后,获取相关token + """ + if not self._auth_state: + raise Exception("请先调用生成二维码方法") + resp = self.session.post( + "https://passportapi.115.com/open/deviceCodeToToken", + data={ + "uid": self._auth_state["uid"], + "code_verifier": self._auth_state["code_verifier"] + } + ) + if resp is None: + raise Exception("获取 access_token 失败") + result = resp.json() + if result.get("code") != 0: + raise Exception(result.get("message")) + return result["data"] + + def __refresh_access_token(self, refresh_token: str) -> dict: + """ + 刷新access_token """ resp = self.session.post( - f"{self.base_url}/oauth/device", - data={"client_id": self.get_conf().get("app_id")} - ).json() - qr = qrcode.make(f"115AUTH|{resp['device_code']}") - return resp, qr.png_as_base64_str() + "https://passportapi.115.com/open/refreshToken", + data={ + "refresh_token": refresh_token + } + ) + if resp is None: + raise Exception(f"刷新 access_token 失败:refresh_token={refresh_token}") + result = resp.json() + if result.get("code") != 0: + raise Exception(result.get("message")) + return result.get("data") - def check_login(self, device_code: str) -> Optional[Dict]: + def check_login(self) -> Optional[Dict]: """ - 检查授权状态 + 改进的带PKCE校验的登录状态检查 """ + if not self._auth_state: + return {"status": -1, "tip": "生成二维码失败"} try: - resp = self.session.post(f"{self.base_url}/oauth/token", data={ - "grant_type": "device", - "device_code": device_code, - "client_secret": self.get_conf().get("app_secret") - }, timeout=10) - if resp.status_code == 200: - token_data = resp.json() - self.access_token = token_data["access_token"] - # 持久化配置 - self.set_config({"access_token": self.access_token}) - return {"status": "success"} - return {"status": "pending"} - except requests.exceptions.RequestException: - return {"status": "error"} + resp = self.session.post( + "https://passportapi.115.com/open/checkDeviceCode", + data={ + "uid": self._auth_state["uid"], + "time": self._auth_state["time"], + "sign": self._auth_state["sign"] + } + ) + if resp is None: + return {"status": -1, "tip": "网络错误"} + result = resp.json() + if result.get("code") != 0 or not result.get("data"): + return {"status": -1, "tip": result.get("message")} + if result["data"]["status"] == 2: + tokens = self.__get_access_token() + self.set_config({ + "refresh_time": int(time.time()), + **tokens + }) + return {"status": result["data"]["status"], "tip": result["data"]["msg"]} + except requests.exceptions.RequestException as e: + return {"status": -1, "tip": str(e)} def init_storage(self): """ 初始化存储连接 """ - if conf := self.get_conf(): - self.access_token = conf.get("access_token") - self.session.headers.update({"Authorization": f"Bearer {self.access_token}"}) + self.session.headers.update({ + "Authorization": f"Bearer {self.access_token}" + }) def list(self, fileitem: schemas.FileItem) -> List[schemas.FileItem]: """ 目录遍历实现 """ - cid = self._path_to_cid(fileitem.path) + cid = self._path_to_id(fileitem.path) items = [] offset = 0 while True: resp = self._request_api( - "GET", "/files", + "GET", + "/open/ufile/files", + "data", params={"cid": cid, "limit": 1000, "offset": offset} ) - batch = resp["data"] - for item in batch: - path = self._cid_to_path(item["cid"]) - items.append(schemas.FileItem( - path=path, - name=item["name"], - type="dir" if item["is_dir"] else "file", - size=item["size"], - modify_time=item["modified"] - )) - self._cid_cache[path] = item["cid"] # 更新缓存 - - if len(batch) < 1000: + if not resp: break - offset += len(batch) + for item in resp: + path = self._id_to_path(item.get("fid")) + items.append(schemas.FileItem( + fileid=item["fid"], + )) + # 更新缓存 + self._id_cache[path] = item["cid"] + + if len(resp) < 1000: + break + offset += len(resp) return items @@ -131,17 +219,22 @@ class U115Pan(StorageBase, metaclass=Singleton): """ 创建目录 """ - parent_cid = self._path_to_cid(parent_item.path) + parent_id = self._path_to_id(parent_item.path) resp = self._request_api( - "POST", "/file/mkdir", - json={"cid": parent_cid, "name": name} + "POST", + "/open/folder/add", + "data", + data={ + "pid": parent_id, + "name": name + } ) - new_path = os.path.join(parent_item.path, name) + new_path = Path(parent_item.path) / name # 缓存新目录 - self._cid_cache[new_path] = resp["cid"] - self._cid_cache[resp["cid"]] = new_path + self._id_cache[str(new_path)] = resp["file_id"] return schemas.FileItem( - path=new_path, + fileid=resp["file_id"], + path=str(new_path), name=name, type="dir", modify_time=int(time.time()) @@ -149,7 +242,7 @@ class U115Pan(StorageBase, metaclass=Singleton): def upload(self, target_dir: schemas.FileItem, local_path: Path, new_name: str = None) -> schemas.FileItem: """ - 断点续传实现 + FIXME 断点续传实现 """ file_name = new_name or local_path.name file_size = local_path.stat().st_size @@ -157,12 +250,13 @@ class U115Pan(StorageBase, metaclass=Singleton): # 初始化上传任务 upload_info = self._request_api( - "POST", "/open/upload/init", - json={ + "POST", + "/open/upload/init", + data={ "file_name": file_name, "file_size": file_size, "file_sha1": file_hash, - "target_dir": self._path_to_cid(target_dir.path) + "target_dir": self._path_to_id(target_dir.path) } ) @@ -184,12 +278,17 @@ class U115Pan(StorageBase, metaclass=Singleton): """ 带限速处理的下载 """ - download_url = self._request_api( - "GET", "/file/download", - params={"cid": self._path_to_cid(fileitem.path)} - )["url"] - - local_path = save_path or Path("/tmp") / fileitem.name + detail = self.get_item(Path(fileitem.path)) + local_path = save_path or settings.TEMP_PATH / fileitem.name + download_info = self._request_api( + "POST", + "/open/ufile/downurl", + "data", + data={ + "pick_code": detail.pickcode + } + ) + download_url = download_info["url"] with self.session.get(download_url, stream=True) as r: r.raise_for_status() with open(local_path, "wb") as f: @@ -197,103 +296,97 @@ class U115Pan(StorageBase, metaclass=Singleton): f.write(chunk) return local_path - def _request_api(self, method: str, endpoint: str, **kwargs): + def _request_api(self, method: str, endpoint: str, + result_key: str = None, **kwargs) -> Optional[Union[dict, list]]: """ 带错误处理和速率限制的API请求 """ - if not self.access_token: - raise Exception("未授权,请先完成OAuth认证") - - headers = kwargs.pop("headers", {}) - headers["Authorization"] = f"Bearer {self.access_token}" - resp = self.session.request( method, f"{self.base_url}{endpoint}", - headers=headers, **kwargs + **kwargs ) - if resp is None: logger.error(f"请求 115 API 失败: {method} {endpoint}") - return None + return {} # 处理速率限制 if resp.status_code == 429: reset_time = int(resp.headers.get("X-RateLimit-Reset", 60)) time.sleep(reset_time + 5) - return self._request_api(method, endpoint, **kwargs) + return self._request_api(method, endpoint, result_key, **kwargs) resp.raise_for_status() + + if result_key: + result = resp.json().get(result_key) + if not result: + raise FileNotFoundError(f"请求 115 API 失败: {method} {endpoint}") + return result return resp.json() - def _path_to_cid(self, path: str) -> str: + def _path_to_id(self, path: str) -> int: """ - 路径转CID(带缓存机制) + 路径转FID(带缓存机制) """ - if path in self._cid_cache: - return self._cid_cache[path] - - # 递归解析路径 - current_cid = "0" # 根目录CID - for part in Path(path).parts[1:]: # 忽略根目录 + # 命中缓存 + if path in self._id_cache: + return self._id_cache[path] + # 逐级查找缓存 + current_id = 0 + parent_path = "/" + for p in Path(path).parents: + if str(p) in self._id_cache: + parent_path = str(p) + current_id = self._id_cache[parent_path] + break + # 计算相对路径 + rel_path = Path(path).relative_to(parent_path) + for part in Path(rel_path).parts: resp = self._request_api( - "GET", "/files", - params={"cid": current_cid, "search_value": part} + "GET", + "/open/ufile/files", + "data", + params={ + "cid": current_id + } ) - for item in resp["data"]: + for item in resp: if item["name"] == part: - current_cid = item["cid"] + current_id = item["fid"] break else: raise FileNotFoundError(f"路径不存在: {path}") - self._cid_cache[path] = current_cid - return current_cid + self._id_cache[path] = current_id + return current_id - def _cid_to_path(self, cid: str) -> str: + def _id_to_path(self, fid: int) -> str: """ CID转路径(带双向缓存) """ - # 根目录特殊处理 - if cid == "0": + if fid == 0: return "/" - # 优先从缓存读取 - if cid in self._cid_cache.values(): - return next(k for k, v in self._cid_cache.items() if v == cid) - - # 递归构建路径 - path_parts = [] - current_cid = cid - - while current_cid != "0": - # 从API获取当前节点信息 - detail = self._request_api( - "GET", "/file/detail", - params={"cid": current_cid} - ) - - # 处理可能的空数据(如已删除文件) - if not detail: - raise FileNotFoundError(f"CID {current_cid} 不存在") - - parent_cid = detail["parent_id"] - path_parts.append(detail["name"]) - - # 检查父节点缓存 - if parent_cid in self._cid_cache.values(): - parent_path = next(k for k, v in self._cid_cache.items() if v == parent_cid) - path_parts.reverse() - full_path = os.path.join(parent_path, *path_parts) - # 更新正向缓存 - self._cid_cache[full_path] = cid - return str(full_path) - - current_cid = parent_cid - + if fid in self._id_cache.values(): + return next(k for k, v in self._id_cache.items() if v == fid) + # 从API获取当前节点信息 + detail = self._request_api( + "GET", + "/open/folder/get_info", + "data", + params={ + "file_id": fid + } + ) + # 处理可能的空数据(如已删除文件) + if not detail: + raise FileNotFoundError(f"{fid} 不存在") + paths = detail["paths"] + path_parts = [item["file_name"] for item in paths] # 构建完整路径 full_path = "/" + "/".join(reversed(path_parts)) # 缓存新路径 - self._cid_cache[full_path] = cid + self._id_cache[full_path] = fid return full_path @staticmethod @@ -311,49 +404,68 @@ class U115Pan(StorageBase, metaclass=Singleton): return self.access_token is not None def delete(self, fileitem: schemas.FileItem) -> bool: + """ + 删除文件/目录 + """ try: self._request_api( - "POST", "/file/delete", - json={"cid": self._path_to_cid(fileitem.path)} + "POST", + "/open/ufile/delete", + data={ + "file_ids": self._path_to_id(fileitem.path) + } ) return True except requests.exceptions.HTTPError: return False def rename(self, fileitem: schemas.FileItem, name: str) -> bool: - new_path = Path(fileitem.path).parent / name + """ + 重命名文件/目录 + """ + file_id = self._path_to_id(fileitem.path) resp = self._request_api( - "POST", "/file/rename", - json={ - "cid": self._path_to_cid(fileitem.path), - "new_name": name + "POST", + "/open/ufile/update", + data={ + "file_id": file_id, + "file_name": name } ) if resp["state"]: - self._cid_cache[str(new_path)] = resp["cid"] - old_path = fileitem.path + if fileitem.path in self._id_cache: + del self._id_cache[fileitem.path] new_path = Path(fileitem.path).parent / name - # 删除旧路径 - del self._cid_cache[old_path] - self._cid_cache[new_path.as_posix()] = resp["cid"] - # 更新反向缓存 - self._cid_cache[resp["cid"]] = new_path.as_posix() + self._id_cache[str(new_path)] = file_id return True return False def get_item(self, path: Path) -> Optional[schemas.FileItem]: + """ + 获取指定路径的文件/目录项 + """ try: - cid = self._path_to_cid(str(path)) + file_id = self._path_to_id(str(path)) + if not file_id: + return None resp = self._request_api( - "GET", "/file/detail", - params={"cid": cid} + "GET", + "/open/folder/get_info", + "data", + params={ + "file_id": file_id + } ) return schemas.FileItem( path=str(path), - name=resp["name"], - type="dir" if resp["is_dir"] else "file", - size=resp["size"], - modify_time=resp["modified"] + fileid=resp["file_id"], + type="file" if resp["file_category"] == "1" else "dir", + name=resp["file_name"], + basename=Path(resp["file_name"]).stem, + extension=Path(resp["file_name"]).suffix[1:], + pickcode=resp["pick_code"], + size=resp["size"] if resp["file_category"] == "1" else None, + modify_time=resp["utime"] ) except Exception as e: logger.debug(f"获取文件信息失败: {str(e)}") @@ -361,54 +473,43 @@ class U115Pan(StorageBase, metaclass=Singleton): def get_folder(self, path: Path) -> Optional[schemas.FileItem]: """ - 获取指定路径的文件夹元数据 + 获取指定路径的文件夹,如不存在则创建 """ - item = self.get_item(path) - if item and item.type == "dir": - return item - return None + try: + return self.get_item(path) + except FileNotFoundError: + return self.create_folder(self.get_item(path.parent), path.name) def detail(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]: """ 获取文件/目录详细信息 """ - try: - cid = self._path_to_cid(fileitem.path) - resp = self._request_api("GET", "/file/detail", params={"cid": cid}) - return schemas.FileItem( - path=fileitem.path, - name=resp["name"], - type="dir" if resp["is_dir"] else "file", - size=resp["size"], - modify_time=resp["modified"], - pickcode=resp.get("pick_code") - ) - except requests.exceptions.HTTPError as e: - if e.response.status_code == 404: - return None - raise + return self.get_item(Path(fileitem.path)) def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool: """ 企业级复制实现(支持目录递归复制) """ - src_cid = self._path_to_cid(fileitem.path) - dest_cid = self._path_to_cid(str(path)) + src_fid = self._path_to_id(fileitem.path) + dest_cid = self._path_to_id(str(path)) resp = self._request_api( - "POST", "/file/copy", - json={ - "cid": src_cid, - "pid": dest_cid, - "name": new_name, - "overwrite": 0 # 0:不覆盖 1:覆盖 + "POST", + "/open/ufile/copy", + data={ + "file_id": src_fid, + "pid": dest_cid } ) if resp["state"]: - # 更新目标路径缓存 - new_path = str(Path(path) / new_name) - self._cid_cache[new_path] = resp["cid"] + new_path = Path(path) / fileitem.name + new_file = self.get_item(new_path) + self.rename(new_file, new_name) + # 更新缓存 + del self._id_cache[fileitem.path] + rename_new_path = Path(path) / new_name + self._id_cache[str(rename_new_path)] = int(new_file.fileid) return True return False @@ -416,25 +517,26 @@ class U115Pan(StorageBase, metaclass=Singleton): """ 原子性移动操作实现 """ - src_cid = self._path_to_cid(fileitem.path) - dest_cid = self._path_to_cid(str(path)) + src_fid = self._path_to_id(fileitem.path) + dest_cid = self._path_to_id(str(path)) resp = self._request_api( - "POST", "/file/move", - json={ - "cid": src_cid, - "pid": dest_cid, - "name": new_name, - "overwrite": 0 + "POST", + "/open/ufile/move", + data={ + "file_ids": src_fid, + "to_cid": dest_cid } ) if resp["state"]: + new_path = Path(path) / fileitem.name + new_file = self.get_item(new_path) + self.rename(new_file, new_name) # 更新缓存 - old_path = fileitem.path - new_path = str(Path(path) / new_name) - del self._cid_cache[old_path] - self._cid_cache[new_path] = src_cid + del self._id_cache[fileitem.path] + rename_new_path = Path(path) / new_name + self._id_cache[str(rename_new_path)] = src_fid return True return False @@ -445,13 +547,19 @@ class U115Pan(StorageBase, metaclass=Singleton): pass def usage(self) -> Optional[schemas.StorageUsage]: - """获取带有企业级配额信息的存储使用情况""" + """ + 获取带有企业级配额信息的存储使用情况 + """ try: - resp = self._request_api("GET", "/user/info") - space = resp["data"]["space_info"] + resp = self._request_api( + "GET", + "/open/user/info", + "data" + ) + space = resp["rt_space_info"] return schemas.StorageUsage( - total=space["total"], - available=space["free"] + total=space["all_total"]["size"], + available=space["all_remain"]["size"] ) except KeyError: return None diff --git a/requirements.in b/requirements.in index ead29f27..6027f6f1 100644 --- a/requirements.in +++ b/requirements.in @@ -67,5 +67,4 @@ rsa~=4.9 redis~=5.2.1 async_timeout~=5.0.1; python_full_version < "3.11.3" packaging~=24.2 -cf_clearance~=0.31.0 -qrcode~=8.0 \ No newline at end of file +cf_clearance~=0.31.0 \ No newline at end of file