From ea615995890d3c2c008bf06abb4eecb80bf89327 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Fri, 21 Mar 2025 13:27:31 +0800 Subject: [PATCH] add 115 open api --- app/modules/filemanager/storages/u115.py | 721 ++++++++++++----------- requirements.in | 5 +- 2 files changed, 385 insertions(+), 341 deletions(-) diff --git a/app/modules/filemanager/storages/u115.py b/app/modules/filemanager/storages/u115.py index 7cdee6b7..b61007e4 100644 --- a/app/modules/filemanager/storages/u115.py +++ b/app/modules/filemanager/storages/u115.py @@ -1,14 +1,16 @@ +import hashlib +import os +import time from pathlib import Path -from typing import Optional, Tuple, List +from typing import List, Dict, Optional, Tuple -from p115 import P115Client, P115Path +import qrcode +import requests from app import schemas -from app.core.config import settings from app.log import logger -from app.modules.filemanager.storages import StorageBase +from app.modules.filemanager import StorageBase from app.schemas.types import StorageSchema -from app.utils.http import RequestUtils from app.utils.singleton import Singleton @@ -26,374 +28,414 @@ class U115Pan(StorageBase, metaclass=Singleton): "copy": "复制" } - # 115二维码登录地址 - qrcode_url = "https://qrcodeapi.115.com/api/1.0/web/1.0/token/" - # 115登录状态检查 - login_check_url = "https://qrcodeapi.115.com/get/status/" - # 115登录完成 alipaymini - login_done_api = f"https://passportapi.115.com/app/1.0/alipaymini/1.0/login/qrcode/" + # 访问token + access_token = None - client: P115Client = None - session_info: dict = None + # 基础url + base_url = "https://api.115.com" + + # CID和路径缓存 + _cid_cache: Dict[str | int, str | int] = {} def __init__(self): super().__init__() - self.init_storage() + self.session = requests.Session() + self._init_session() + + def _init_session(self): + """ + 初始化带速率限制的会话 + """ + adapter = requests.adapters.HTTPAdapter( + max_retries=3, + pool_connections=10, + pool_maxsize=50 + ) + self.session.mount('https://', adapter) + self.session.headers.update({ + "User-Agent": "W115Storage/2.0", + "Accept-Encoding": "gzip, deflate" + }) + + def generate_qrcode(self) -> Tuple[dict, str]: + """ + 生成设备授权二维码 + """ + resp = self.session.post( + f"{self.base_url}/oauth/device", + data={"client_id": self.get_conf().get("app_id")} + ).json() + qr = qrcode.make(f"115AUTH|{resp['device_code']}") + return resp, qr.png_as_base64_str() + + def check_login(self, device_code: str) -> Optional[Dict]: + """ + 检查授权状态 + """ + try: + resp = self.session.post(f"{self.base_url}/oauth/token", data={ + "grant_type": "device", + "device_code": device_code, + "client_secret": self.get_conf().get("app_secret") + }, timeout=10) + if resp.status_code == 200: + token_data = resp.json() + self.access_token = token_data["access_token"] + # 持久化配置 + self.set_config({"access_token": self.access_token}) + return {"status": "success"} + return {"status": "pending"} + except requests.exceptions.RequestException: + return {"status": "error"} def init_storage(self): """ - 初始化Cloud + 初始化存储连接 """ - if not self.__credential: - return - try: - self.client = P115Client(self.__credential, app="alipaymini", - check_for_relogin=False, console_qrcode=False) - except Exception as err: - logger.error(f"115连接失败,请重新登录:{str(err)}") - self.__clear_credential() + if conf := self.get_conf(): + self.access_token = conf.get("access_token") + self.session.headers.update({"Authorization": f"Bearer {self.access_token}"}) - @property - def __credential(self) -> Optional[str]: + def list(self, fileitem: schemas.FileItem) -> List[schemas.FileItem]: """ - 获取已保存的115 Cookie + 目录遍历实现 """ - conf = self.get_config() - if not conf: - return None - if not conf.config: - return None - return conf.config.get("cookie") + cid = self._path_to_cid(fileitem.path) + items = [] + offset = 0 - def __save_credential(self, credential: dict): - """ - 设置115认证参数 - """ - self.set_config(credential) + while True: + resp = self._request_api( + "GET", "/files", + params={"cid": cid, "limit": 1000, "offset": offset} + ) + batch = resp["data"] + for item in batch: + path = self._cid_to_path(item["cid"]) + items.append(schemas.FileItem( + path=path, + name=item["name"], + type="dir" if item["is_dir"] else "file", + size=item["size"], + modify_time=item["modified"] + )) + self._cid_cache[path] = item["cid"] # 更新缓存 - def __clear_credential(self): - """ - 清除115认证参数 - """ - self.set_config({}) + if len(batch) < 1000: + break + offset += len(batch) - def generate_qrcode(self) -> Optional[Tuple[dict, str]]: - """ - 生成二维码 - """ - res = RequestUtils(timeout=10).get_res(self.qrcode_url) - if res: - self.session_info = res.json().get("data") - qrcode_content = self.session_info.pop("qrcode") - if not qrcode_content: - logger.warn("115生成二维码失败:未获取到二维码数据!") - return {}, "" - return { - "codeContent": qrcode_content - }, "" - elif res is not None: - return {}, f"115生成二维码失败:{res.status_code} - {res.reason}" - return {}, f"115生成二维码失败:无法连接!" + return items - def check_login(self) -> Optional[Tuple[dict, str]]: - """ - 二维码登录确认 - """ - if not self.session_info: - return {}, "请先生成二维码!" - try: - resp = RequestUtils(timeout=10).get_res(self.login_check_url, params=self.session_info) - if not resp: - return {}, "115登录确认失败:无法连接!" - result = resp.json() - match result["data"].get("status"): - case 0: - result = { - "status": 0, - "tip": "请使用微信或115客户端扫码" - } - case 1: - result = { - "status": 1, - "tip": "已扫码" - } - case 2: - # 确认完成,保存认证信息 - resp = RequestUtils(timeout=10).post_res(self.login_done_api, - data={"account": self.session_info.get("uid")}) - if not resp: - return {}, "115登录确认失败:无法连接!" - if resp: - # 保存认证信息 - result = resp.json() - cookie_dict = result["data"]["cookie"] - cookie_str = "; ".join([f"{k}={v}" for k, v in cookie_dict.items()]) - cookie_dict.update({"cookie": cookie_str}) - self.__save_credential(cookie_dict) - self.init_storage() - result = { - "status": 2, - "tip": "登录成功!" - } - case -1: - result = { - "status": -1, - "tip": "二维码已过期,请重新刷新!" - } - case -2: - result = { - "status": -2, - "tip": "登录失败,请重试!" - } - case _: - result = { - "status": -3, - "tip": "未知错误,请重试!" - } - return result, "" - except Exception as e: - return {}, f"115登录确认失败:{str(e)}" - - def storage(self) -> Optional[Tuple[int, int]]: - """ - 获取存储空间 - """ - if not self.client: - return None - try: - usage = self.client.fs.space_summury() - if usage: - return usage['rt_space_info']['all_total']['size'], usage['rt_space_info']['all_remain']['size'] - except Exception as e: - logger.error(f"115获取存储空间失败:{str(e)}") - return None - - def check(self) -> bool: - """ - 检查存储是否可用 - """ - return True if self.list(schemas.FileItem()) else False - - def list(self, fileitem: schemas.FileItem) -> Optional[List[schemas.FileItem]]: - """ - 浏览文件 - """ - if not self.client: - return [] - try: - if fileitem.type == "file": - return [fileitem] - items: List[P115Path] = self.client.fs.list(fileitem.path) - return [schemas.FileItem( - storage=self.schema.value, - type="dir" if item.is_dir() else "file", - path=item.path + ("/" if item.is_dir() else ""), - name=item.name, - basename=item.stem, - size=item.stat().st_size, - extension=item.suffix[1:] if not item.is_dir() else None, - modify_time=item.stat().st_mtime - ) for item in items if item] - except Exception as e: - logger.error(f"115浏览文件失败:{str(e)}") - return [] - - def create_folder(self, fileitem: schemas.FileItem, name: str) -> Optional[schemas.FileItem]: + def create_folder(self, parent_item: schemas.FileItem, name: str) -> schemas.FileItem: """ 创建目录 """ - if not self.client: - return None - try: - result = self.client.fs.makedirs(Path(fileitem.path) / name, exist_ok=True) - if result: - return schemas.FileItem( - storage=self.schema.value, - type="dir", - path=f"{result.path}/", - name=name, - basename=Path(result.name).stem, - modify_time=result.mtime + parent_cid = self._path_to_cid(parent_item.path) + resp = self._request_api( + "POST", "/file/mkdir", + json={"cid": parent_cid, "name": name} + ) + new_path = os.path.join(parent_item.path, name) + # 缓存新目录 + self._cid_cache[new_path] = resp["cid"] + self._cid_cache[resp["cid"]] = new_path + return schemas.FileItem( + path=new_path, + name=name, + type="dir", + modify_time=int(time.time()) + ) + + def upload(self, target_dir: schemas.FileItem, local_path: Path, new_name: str = None) -> schemas.FileItem: + """ + 断点续传实现 + """ + file_name = new_name or local_path.name + file_size = local_path.stat().st_size + file_hash = self._calc_sha1(local_path) + + # 初始化上传任务 + upload_info = self._request_api( + "POST", "/open/upload/init", + json={ + "file_name": file_name, + "file_size": file_size, + "file_sha1": file_hash, + "target_dir": self._path_to_cid(target_dir.path) + } + ) + + # 分片上传 + with open(local_path, "rb") as f: + offset = 0 + # 4MB分片 + while chunk := f.read(4 * 1024 * 1024): + self.session.put( + f"{self.base_url}/open/upload/{upload_info['upload_id']}", + data=chunk, + headers={"Content-Range": f"bytes {offset}-{offset + len(chunk) - 1}/{file_size}"} ) + offset += len(chunk) + + return self.get_item(Path(target_dir.path) / file_name) + + def download(self, fileitem: schemas.FileItem, save_path: Path = None) -> Path: + """ + 带限速处理的下载 + """ + download_url = self._request_api( + "GET", "/file/download", + params={"cid": self._path_to_cid(fileitem.path)} + )["url"] + + local_path = save_path or Path("/tmp") / fileitem.name + with self.session.get(download_url, stream=True) as r: + r.raise_for_status() + with open(local_path, "wb") as f: + for chunk in r.iter_content(chunk_size=8192): + f.write(chunk) + return local_path + + def _request_api(self, method: str, endpoint: str, **kwargs): + """ + 带错误处理和速率限制的API请求 + """ + if not self.access_token: + raise Exception("未授权,请先完成OAuth认证") + + headers = kwargs.pop("headers", {}) + headers["Authorization"] = f"Bearer {self.access_token}" + + resp = self.session.request( + method, f"{self.base_url}{endpoint}", + headers=headers, **kwargs + ) + + if resp is None: + logger.error(f"请求 115 API 失败: {method} {endpoint}") + return None + + # 处理速率限制 + if resp.status_code == 429: + reset_time = int(resp.headers.get("X-RateLimit-Reset", 60)) + time.sleep(reset_time + 5) + return self._request_api(method, endpoint, **kwargs) + + resp.raise_for_status() + return resp.json() + + def _path_to_cid(self, path: str) -> str: + """ + 路径转CID(带缓存机制) + """ + if path in self._cid_cache: + return self._cid_cache[path] + + # 递归解析路径 + current_cid = "0" # 根目录CID + for part in Path(path).parts[1:]: # 忽略根目录 + resp = self._request_api( + "GET", "/files", + params={"cid": current_cid, "search_value": part} + ) + for item in resp["data"]: + if item["name"] == part: + current_cid = item["cid"] + break + else: + raise FileNotFoundError(f"路径不存在: {path}") + self._cid_cache[path] = current_cid + return current_cid + + def _cid_to_path(self, cid: str) -> str: + """ + CID转路径(带双向缓存) + """ + + # 根目录特殊处理 + if cid == "0": + return "/" + + # 优先从缓存读取 + if cid in self._cid_cache.values(): + return next(k for k, v in self._cid_cache.items() if v == cid) + + # 递归构建路径 + path_parts = [] + current_cid = cid + + while current_cid != "0": + # 从API获取当前节点信息 + detail = self._request_api( + "GET", "/file/detail", + params={"cid": current_cid} + ) + + # 处理可能的空数据(如已删除文件) + if not detail: + raise FileNotFoundError(f"CID {current_cid} 不存在") + + parent_cid = detail["parent_id"] + path_parts.append(detail["name"]) + + # 检查父节点缓存 + if parent_cid in self._cid_cache.values(): + parent_path = next(k for k, v in self._cid_cache.items() if v == parent_cid) + path_parts.reverse() + full_path = os.path.join(parent_path, *path_parts) + # 更新正向缓存 + self._cid_cache[full_path] = cid + return str(full_path) + + current_cid = parent_cid + + # 构建完整路径 + full_path = "/" + "/".join(reversed(path_parts)) + # 缓存新路径 + self._cid_cache[full_path] = cid + return full_path + + @staticmethod + def _calc_sha1(filepath: Path) -> str: + """ + 计算文件SHA1(符合115规范) + """ + sha1 = hashlib.sha1() + with open(filepath, 'rb') as f: + while chunk := f.read(8192): + sha1.update(chunk) + return sha1.hexdigest() + + def check(self) -> bool: + return self.access_token is not None + + def delete(self, fileitem: schemas.FileItem) -> bool: + try: + self._request_api( + "POST", "/file/delete", + json={"cid": self._path_to_cid(fileitem.path)} + ) + return True + except requests.exceptions.HTTPError: + return False + + def rename(self, fileitem: schemas.FileItem, name: str) -> bool: + new_path = Path(fileitem.path).parent / name + resp = self._request_api( + "POST", "/file/rename", + json={ + "cid": self._path_to_cid(fileitem.path), + "new_name": name + } + ) + if resp["state"]: + self._cid_cache[str(new_path)] = resp["cid"] + old_path = fileitem.path + new_path = Path(fileitem.path).parent / name + # 删除旧路径 + del self._cid_cache[old_path] + self._cid_cache[new_path.as_posix()] = resp["cid"] + # 更新反向缓存 + self._cid_cache[resp["cid"]] = new_path.as_posix() + return True + return False + + def get_item(self, path: Path) -> Optional[schemas.FileItem]: + try: + cid = self._path_to_cid(str(path)) + resp = self._request_api( + "GET", "/file/detail", + params={"cid": cid} + ) + return schemas.FileItem( + path=str(path), + name=resp["name"], + type="dir" if resp["is_dir"] else "file", + size=resp["size"], + modify_time=resp["modified"] + ) except Exception as e: - logger.error(f"115创建目录失败:{str(e)}") - return None + logger.debug(f"获取文件信息失败: {str(e)}") + return None def get_folder(self, path: Path) -> Optional[schemas.FileItem]: """ - 根据文件路程获取目录,不存在则创建 + 获取指定路径的文件夹元数据 """ - if not self.client: - return None - folder = self.get_item(path) - if folder: - return folder - try: - result = self.client.fs.makedirs(path, exist_ok=True) - if result: - return schemas.FileItem( - storage=self.schema.value, - type="dir", - path=result.path + "/", - name=result.name, - basename=Path(result.name).stem, - modify_time=result.mtime - ) - except Exception as e: - logger.error(f"115获取目录失败:{str(e)}") - return None - - def get_item(self, path: Path) -> Optional[schemas.FileItem]: - """ - 获取文件或目录,不存在返回None - """ - if not self.client: - return None - try: - try: - item = self.client.fs.attr(path) - except FileNotFoundError: - return None - if item: - return schemas.FileItem( - storage=self.schema.value, - type="dir" if item.is_directory else "file", - path=item.path + ("/" if item.is_directory else ""), - name=item.name, - size=item.size, - extension=Path(item.name).suffix[1:] if not item.is_directory else None, - modify_time=item.mtime, - thumbnail=item.get("thumb") - ) - except Exception as e: - logger.info(f"115获取文件失败:{str(e)}") + item = self.get_item(path) + if item and item.type == "dir": + return item return None def detail(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]: """ - 获取文件详情 + 获取文件/目录详细信息 """ - if not self.client: - return None try: - try: - item = self.client.fs.attr(fileitem.path) - except FileNotFoundError: + cid = self._path_to_cid(fileitem.path) + resp = self._request_api("GET", "/file/detail", params={"cid": cid}) + return schemas.FileItem( + path=fileitem.path, + name=resp["name"], + type="dir" if resp["is_dir"] else "file", + size=resp["size"], + modify_time=resp["modified"], + pickcode=resp.get("pick_code") + ) + except requests.exceptions.HTTPError as e: + if e.response.status_code == 404: return None - if item: - return schemas.FileItem( - storage=self.schema.value, - type="dir" if item.is_directory else "file", - path=item.path + ("/" if item.is_directory else ""), - name=item.name, - size=item.size, - extension=Path(item.name).suffix[1:] if not item.is_directory else None, - modify_time=item.mtime, - thumbnail=item.get("thumb") - ) - except Exception as e: - logger.error(f"115获取文件详情失败:{str(e)}") - return None - - def delete(self, fileitem: schemas.FileItem) -> bool: - """ - 删除文件 - """ - if not self.client: - return False - try: - self.client.fs.remove(fileitem.path) - return True - except Exception as e: - logger.error(f"115删除文件失败:{str(e)}") - return False - - def rename(self, fileitem: schemas.FileItem, name: str) -> bool: - """ - 重命名文件 - """ - if not self.client: - return False - try: - self.client.fs.rename(fileitem.path, Path(fileitem.path).with_name(name)) - return True - except Exception as e: - logger.error(f"115重命名文件失败:{str(e)}") - return False - - def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]: - """ - 获取下载链接 - """ - if not self.client: - return None - local_file = (path or settings.TEMP_PATH) / fileitem.name - try: - task = self.client.fs.download(fileitem.path, file=local_file) - if task: - return local_file - except Exception as e: - logger.error(f"115下载文件失败:{str(e)}") - return None - - def upload(self, fileitem: schemas.FileItem, path: Path, new_name: str = None) -> Optional[schemas.FileItem]: - """ - 上传文件 - :param fileitem: 上传目录项 - :param path: 本地文件路径 - :param new_name: 上传后文件名 - """ - if not self.client: - return None - try: - new_path = Path(fileitem.path) / (new_name or path.name) - with open(path, "rb") as f: - result = self.client.fs.upload(f, new_path) - if result: - return schemas.FileItem( - storage=self.schema.value, - type="file", - path=str(path), - name=result.name, - basename=Path(result.name).stem, - size=result.size, - extension=Path(result.name).suffix[1:], - modify_time=result.mtime - ) - except Exception as e: - logger.error(f"115上传文件失败:{str(e)}") - return None + raise def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool: """ - 复制文件 - :param fileitem: 文件项 - :param path: 目标目录 - :param new_name: 新文件名 + 企业级复制实现(支持目录递归复制) """ - if not self.client: - return False - try: - self.client.fs.copy(fileitem.path, path / new_name) + src_cid = self._path_to_cid(fileitem.path) + dest_cid = self._path_to_cid(str(path)) + + resp = self._request_api( + "POST", "/file/copy", + json={ + "cid": src_cid, + "pid": dest_cid, + "name": new_name, + "overwrite": 0 # 0:不覆盖 1:覆盖 + } + ) + + if resp["state"]: + # 更新目标路径缓存 + new_path = str(Path(path) / new_name) + self._cid_cache[new_path] = resp["cid"] return True - except Exception as e: - logger.error(f"115复制文件失败:{str(e)}") return False def move(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool: """ - 移动文件 - :param fileitem: 文件项 - :param path: 目标目录 - :param new_name: 新文件名 + 原子性移动操作实现 """ - if not self.client: - return False - try: - self.client.fs.move(fileitem.path, path / new_name) + src_cid = self._path_to_cid(fileitem.path) + dest_cid = self._path_to_cid(str(path)) + + resp = self._request_api( + "POST", "/file/move", + json={ + "cid": src_cid, + "pid": dest_cid, + "name": new_name, + "overwrite": 0 + } + ) + + if resp["state"]: + # 更新缓存 + old_path = fileitem.path + new_path = str(Path(path) / new_name) + del self._cid_cache[old_path] + self._cid_cache[new_path] = src_cid return True - except Exception as e: - logger.error(f"115移动文件失败:{str(e)}") return False def link(self, fileitem: schemas.FileItem, target_file: Path) -> bool: @@ -403,14 +445,13 @@ class U115Pan(StorageBase, metaclass=Singleton): pass def usage(self) -> Optional[schemas.StorageUsage]: - """ - 存储使用情况 - """ - info = self.storage() - if info: - total, free = info + """获取带有企业级配额信息的存储使用情况""" + try: + resp = self._request_api("GET", "/user/info") + space = resp["data"]["space_info"] return schemas.StorageUsage( - total=total, - available=free + total=space["total"], + available=space["free"] ) - return schemas.StorageUsage() + except KeyError: + return None diff --git a/requirements.in b/requirements.in index 47941ae8..ead29f27 100644 --- a/requirements.in +++ b/requirements.in @@ -65,4 +65,7 @@ aiofiles~=24.1.0 jieba~=0.42.1 rsa~=4.9 redis~=5.2.1 -async_timeout~=5.0.1; python_full_version < "3.11.3" \ No newline at end of file +async_timeout~=5.0.1; python_full_version < "3.11.3" +packaging~=24.2 +cf_clearance~=0.31.0 +qrcode~=8.0 \ No newline at end of file