diff --git a/app/modules/filemanager/storages/alipan.py b/app/modules/filemanager/storages/alipan.py index 369209d1..14a1f115 100644 --- a/app/modules/filemanager/storages/alipan.py +++ b/app/modules/filemanager/storages/alipan.py @@ -1,17 +1,11 @@ -import base64 import hashlib -import json import secrets import threading import time from pathlib import Path from typing import List, Dict, Optional, Tuple, Union -import oss2 import requests -from oss2 import SizedFileAdapter, determine_part_size -from oss2.models import PartInfo -from requests.packages import target from app import schemas from app.core.config import settings @@ -48,7 +42,7 @@ class AliPan(StorageBase, metaclass=Singleton): base_url = "https://openapi.alipan.com" # CID和路径缓存 - _id_cache: Dict[str, int] = {} + _id_cache: Dict[str, Tuple[str, str]] = {} def __init__(self): super().__init__() @@ -158,6 +152,7 @@ class AliPan(StorageBase, metaclass=Singleton): "refresh_time": int(time.time()), **tokens }) + self.__get_drive_id() return {"status": status, "tip": _status_text.get(status, "未知错误")}, "" except Exception as e: return {}, str(e) @@ -204,6 +199,33 @@ class AliPan(StorageBase, metaclass=Singleton): logger.warn(f"【阿里云盘】刷新 access_token 失败:{result.get('code')} - {result.get('message')}!") return result + def __get_drive_id(self): + """ + 获取默认存储桶ID + """ + resp = self.session.post( + f"{self.base_url}/adrive/v1.0/user/getDriveInfo" + ) + if resp is None: + logger.error("获取默认存储桶ID失败") + return None + result = resp.json() + if result.get("code"): + logger.warn(f"获取默认存储ID失败:{result.get('code')} - {result.get('message')}!") + return None + # 保存用户参数 + """ + user_id string 是 用户ID,具有唯一性 + name string 是 昵称 + avatar string 是 头像地址 + default_drive_id string 是 默认drive + resource_drive_id string 否 资源库。用户选择了授权才会返回 + backup_drive_id string 否 备份盘。用户选择了授权才会返回 + """ + conf = self.get_conf() + conf.update(result) + self.set_config(conf) + def _request_api(self, method: str, endpoint: str, result_key: Optional[str] = None, **kwargs) -> Optional[Union[dict, list]]: """ @@ -238,87 +260,95 @@ class AliPan(StorageBase, metaclass=Singleton): return ret_data.get(result_key) return ret_data - def _path_to_id(self, path: str) -> int: + def _path_to_id(self, drive_id: str, path: str) -> Tuple[str, str]: """ - 路径转FID(带缓存机制) + 路径转drive_id, file_id(带缓存机制) """ # 根目录 if path == "/": - return 0 + return drive_id, "root" if len(path) > 1 and path.endswith("/"): path = path[:-1] # 检查缓存 if path in self._id_cache: return self._id_cache[path] # 逐级查找缓存 - current_id = 0 - parent_path = "/" + file_id = "root" + file_path = "/" for p in Path(path).parents: if str(p) in self._id_cache: - parent_path = str(p) - current_id = self._id_cache[parent_path] + file_path = str(p) + file_id = self._id_cache[file_path] break # 计算相对路径 - rel_path = Path(path).relative_to(parent_path) + rel_path = Path(path).relative_to(file_path) for part in Path(rel_path).parts: - offset = 0 find_part = False + next_marker = None while True: resp = self._request_api( "GET", - "/open/ufile/files", - "data", - params={"cid": current_id, "limit": 1000, "offset": offset, "cur": True, "show_dir": 1} + "/adrive/v1.0/openFile/list", + params={ + "drive_id": drive_id, + "limit": 100, + "marker": next_marker, + "parent_file_id": file_id, + } ) if not resp: break - for item in resp: - if item["fn"] == part: - current_id = item["fid"] + for item in resp.get("items", []): + if item["name"] == part: + file_id = item["file_id"] find_part = True break if find_part: break - if len(resp) < 1000: + if len(resp.get("items")) < 100: break - offset += len(resp) if not find_part: raise FileNotFoundError(f"【阿里云盘】{path} 不存在") - if not current_id: + if file_id == "root": raise FileNotFoundError(f"【阿里云盘】{path} 不存在") # 缓存路径 - self._id_cache[path] = current_id - return current_id + self._id_cache[path] = (drive_id, file_id) + return drive_id, file_id - def _id_to_path(self, fid: int) -> str: + def __get_fileitem(self, fileinfo: dict, parent: str = "/") -> schemas.FileItem: """ - CID转路径(带双向缓存) + 获取文件信息 """ - # 根目录特殊处理 - if fid == 0: - return "/" - # 优先从缓存读取 - if fid in self._id_cache.values(): - return next(k for k, v in self._id_cache.items() if v == fid) - # 从API获取当前节点信息 - detail = self._request_api( - "GET", - "/open/folder/get_info", - "data", - params={ - "file_id": fid - } - ) - # 处理可能的空数据(如已删除文件) - if not detail: - raise FileNotFoundError(f"【阿里云盘】{fid} 不存在") - paths = detail["paths"] - path_parts = [item["file_name"] for item in paths] - # 构建完整路径 - full_path = "/" + "/".join(reversed(path_parts)) - # 缓存新路径 - self._id_cache[full_path] = fid - return full_path + if not fileinfo: + return schemas.FileItem() + if fileinfo.get("type") == "folder": + return schemas.FileItem( + storage=self.schema.value, + fileid=fileinfo.get("file_id"), + parent_fileid=fileinfo.get("parent_file_id"), + type="dir", + path=f"{parent}{fileinfo.get('name')}" + "/", + name=fileinfo.get("name"), + basename=fileinfo.get("name"), + size=fileinfo.get("size"), + modify_time=StringUtils.str_to_timestamp(fileinfo.get("updated_at")), + drive_id=fileinfo.get("drive_id"), + ) + else: + return schemas.FileItem( + storage=self.schema.value, + fileid=fileinfo.get("file_id"), + parent_fileid=fileinfo.get("parent_file_id"), + type="file", + path=f"{parent}{fileinfo.get('name')}", + name=fileinfo.get("name"), + basename=Path(fileinfo.get("name")).stem, + size=fileinfo.get("size"), + extension=fileinfo.get("file_extension"), + modify_time=StringUtils.str_to_timestamp(fileinfo.get("updated_at")), + thumbnail=fileinfo.get("thumbnail"), + drive_id=fileinfo.get("drive_id"), + ) @staticmethod def _calc_sha1(filepath: Path, size: Optional[int] = None) -> str: @@ -343,330 +373,92 @@ class AliPan(StorageBase, metaclass=Singleton): """ 目录遍历实现 """ - if fileitem.type == "file": item = self.detail(fileitem) if item: return [item] return [] - cid = self._path_to_id(fileitem.path) + if fileitem.path == "/": + parent_file_id = "root" + else: + parent_file_id = fileitem.fileid + items = [] - offset = 0 + next_marker = None while True: resp = self._request_api( "GET", - "/open/ufile/files", - "data", - params={"cid": cid, "limit": 1000, "offset": offset, "cur": True, "show_dir": 1} + "/adrive/v1.0/openFile/list", + params={ + "drive_id": fileitem.drive_id, + "limit": 100, + "marker": next_marker, + "parent_file_id": parent_file_id, + } ) if resp is None: raise FileNotFoundError(f"【阿里云盘】{fileitem.path} 检索出错!") if not resp: break - for item in resp: + next_marker = resp.get("next_marker") + for item in resp.get("items", []): # 更新缓存 - path = f"{fileitem.path}{item['fn']}" + path = f"{fileitem.path}{item.get('name')}" self._id_cache[path] = item["fid"] - - file_path = path + ("/" if item["fc"] == "0" else "") - items.append(schemas.FileItem( - storage=self.schema.value, - fileid=item["fid"], - name=item["fn"], - basename=Path(item["fn"]).stem, - extension=item["ico"] if item["fc"] == "1" else None, - type="dir" if item["fc"] == "0" else "file", - path=file_path, - size=item["fs"] if item["fc"] == "1" else None, - modify_time=item["upt"], - pickcode=item["pc"] - )) - - if len(resp) < 1000: + items.append(self.__get_fileitem(item)) + if len(resp.get("items")) < 100: break - offset += len(resp) - return items def create_folder(self, parent_item: schemas.FileItem, name: str) -> Optional[schemas.FileItem]: """ 创建目录 """ - parent_id = self._path_to_id(parent_item.path) - new_path = Path(parent_item.path) / name resp = self._request_api( "POST", - "/open/folder/add", + "/adrive/v1.0/openFile/create", json={ - "pid": parent_id, - "file_name": name + "drive_id": parent_item.drive_id, + "parent_file_id": parent_item.fileid, + "name": name, + "type": "folder" } ) if not resp: return None - if not resp.get("state"): - if resp.get("code") == 20004: - # 目录已存在 - return self.get_item(new_path) - logger.warn(f"【阿里云盘】创建目录失败: {resp.get('error')}") + if resp.get("code"): + logger.warn(f"【阿里云盘】创建目录失败: {resp.get('message')}") return None # 缓存新目录 - self._id_cache[str(new_path)] = resp["data"]["file_id"] - return schemas.FileItem( - storage=self.schema.value, - fileid=resp["data"]["file_id"], - path=str(new_path) + "/", - name=name, - basename=name, - type="dir", - modify_time=int(time.time()) - ) + new_path = Path(parent_item.path) / name + self._id_cache[str(new_path)] = (resp.get("drive_id"), resp.get("file_id")) + return self.get_item(new_path) def upload(self, target_dir: schemas.FileItem, local_path: Path, new_name: Optional[str] = None) -> Optional[schemas.FileItem]: """ - 实现带秒传、断点续传和二次认证的文件上传 + TODO 文件上传 """ - - def progress_callback(consumed_bytes: int, total_bytes: int): - """ - 上传进度回调 - """ - progress = round(consumed_bytes / total_bytes * 100) - if round(progress, -1) != self._last_progress: - logger.info(f"【阿里云盘】已上传: {StringUtils.str_filesize(consumed_bytes)}" - f" / {StringUtils.str_filesize(total_bytes)}, 进度: {progress}%") - self._last_progress = round(progress, -1) - - def encode_callback(cb: dict): - """ - 回调参数Base64编码函数 - """ - return oss2.utils.b64encode_as_string(json.dumps(cb).strip()) - - target_name = new_name or local_path.name - target_path = str(Path(target_dir.path) / target_name) - # 计算文件特征值 - file_size = local_path.stat().st_size - file_sha1 = self._calc_sha1(local_path) - file_preid = self._calc_sha1(local_path, 128 * 1024 * 1024) - - # 获取目标目录CID - target_cid = self._path_to_id(target_dir.path) - target_param = f"U_1_{target_cid}" - - # Step 1: 初始化上传 - init_data = { - "file_name": target_name, - "file_size": file_size, - "target": target_param, - "fileid": file_sha1, - "preid": file_preid - } - init_resp = self._request_api( - "POST", - "/open/upload/init", - json=init_data - ) - if not init_resp: - return None - if not init_resp.get("state"): - logger.warn(f"【阿里云盘】初始化上传失败: {init_resp.get('error')}") - return None - # 结果 - init_result = init_resp.get("data") - logger.debug(f"【阿里云盘】上传 Step 1 初始化结果: {init_result}") - file_id = init_result.get("file_id") - # 回调信息 - bucket_name = init_result.get("bucket") - object_name = init_result.get("object") - callback = init_result.get("callback") - # 二次认证信息 - sign_check = init_result.get("sign_check") - pick_code = init_result.get("pick_code") - sign_key = init_result.get("sign_key") - - # Step 2: 处理二次认证 - if init_result.get("code") in [700, 701] and sign_check: - sign_checks = sign_check.split("-") - start = int(sign_checks[0]) - end = int(sign_checks[1]) - # 计算指定区间的SHA1 - # sign_check (用下划线隔开,截取上传文内容的sha1)(单位是byte): "2392148-2392298" - with open(local_path, "rb") as f: - # 取2392148-2392298之间的内容(包含2392148、2392298)的sha1 - f.seek(start) - chunk = f.read(end - start + 1) - sign_val = hashlib.sha1(chunk).hexdigest().upper() - # 重新初始化请求 - # sign_key,sign_val(根据sign_check计算的值大写的sha1值) - init_data.update({ - "pick_code": pick_code, - "sign_key": sign_key, - "sign_val": sign_val - }) - init_resp = self._request_api( - "POST", - "/open/upload/init", - json=init_data - ) - if not init_resp: - return None - # 二次认证结果 - init_result = init_resp.get("data") - logger.debug(f"【阿里云盘】上传 Step 2 二次认证结果: {init_result}") - if not pick_code: - pick_code = init_result.get("pick_code") - if not bucket_name: - bucket_name = init_result.get("bucket") - if not object_name: - object_name = init_result.get("object") - if not file_id: - file_id = init_result.get("file_id") - if not callback: - callback = init_result.get("callback") - - # Step 3: 秒传 - if init_result.get("status") == 2: - logger.info(f"【阿里云盘】{target_name} 秒传成功") - return schemas.FileItem( - storage=self.schema.value, - fileid=file_id, - path=target_path, - name=target_name, - basename=Path(target_name).stem, - extension=Path(target_name).suffix[1:], - size=file_size, - type="file", - pickcode=pick_code, - modify_time=int(time.time()) - ) - - # Step 4: 获取上传凭证 - token_resp = self._request_api( - "GET", - "/open/upload/get_token", - "data" - ) - if not token_resp: - logger.warn("【阿里云盘】获取上传凭证失败") - return None - logger.debug(f"【阿里云盘】上传 Step 4 获取上传凭证结果: {token_resp}") - # 上传凭证 - endpoint = token_resp.get("endpoint") - AccessKeyId = token_resp.get("AccessKeyId") - AccessKeySecret = token_resp.get("AccessKeySecret") - SecurityToken = token_resp.get("SecurityToken") - - # Step 5: 断点续传 - resume_resp = self._request_api( - "POST", - "/open/upload/resume", - "data", - json={ - "file_size": file_size, - "target": target_param, - "fileid": file_sha1, - "pick_code": pick_code - } - ) - if resume_resp: - logger.debug(f"【阿里云盘】上传 Step 5 断点续传结果: {resume_resp}") - if resume_resp.get("callback"): - callback = resume_resp["callback"] - - # Step 6: 对象存储上传 - auth = oss2.StsAuth( - access_key_id=AccessKeyId, - access_key_secret=AccessKeySecret, - security_token=SecurityToken - ) - bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa - # 处理oss请求回调 - callback_dict = json.loads(callback.get("callback")) - callback_var_dict = json.loads(callback.get("callback_var")) - # 补充参数 - logger.debug(f"【阿里云盘】上传 Step 6 回调参数:{callback_dict} {callback_var_dict}") - # 填写不能包含Bucket名称在内的Object完整路径,例如exampledir/exampleobject.txt。 - # determine_part_size方法用于确定分片大小,设置分片大小为 1GB - part_size = determine_part_size(file_size, preferred_size=1 * 1024 * 1024 * 1024) - logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}") - # 初始化分片 - upload_id = bucket.init_multipart_upload(object_name, - params={ - "encoding-type": "url", - "sequential": "" - }).upload_id - parts = [] - # 逐个上传分片 - with open(local_path, 'rb') as fileobj: - part_number = 1 - offset = 0 - while offset < file_size: - num_to_upload = min(part_size, file_size - offset) - # 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。 - logger.info(f"【阿里云盘】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}") - result = bucket.upload_part(object_name, upload_id, part_number, - json=SizedFileAdapter(fileobj, num_to_upload), - progress_callback=progress_callback) - parts.append(PartInfo(part_number, result.etag)) - logger.info(f"【阿里云盘】{target_name} 分片 {part_number} 上传完成") - offset += num_to_upload - part_number += 1 - # 请求头 - headers = { - 'X-oss-callback': encode_callback(callback_dict), - 'x-oss-callback-var': encode_callback(callback_var_dict), - 'x-oss-forbid-overwrite': 'false' - } - try: - result = bucket.complete_multipart_upload(object_name, upload_id, parts, - headers=headers) - if result.status == 200: - logger.debug(f"【阿里云盘】上传 Step 6 回调结果:{result.resp.response.json()}") - logger.info(f"【阿里云盘】{target_name} 上传成功") - else: - logger.warn(f"【阿里云盘】{target_name} 上传失败,错误码: {result.status}") - return None - except oss2.exceptions.OssError as e: - if e.code == "FileAlreadyExists": - logger.warn(f"【阿里云盘】{target_name} 已存在") - else: - logger.error(f"【阿里云盘】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}") - return None - # 返回结果 - return schemas.FileItem( - storage=self.schema.value, - fileid=file_id, - type="file", - path=target_path, - name=target_name, - basename=Path(target_name).stem, - extension=Path(target_name).suffix[1:], - size=file_size, - pickcode=pick_code, - modify_time=int(time.time()) - ) + pass def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]: """ 带限速处理的下载 """ - detail = self.get_item(Path(fileitem.path)) - local_path = path or settings.TEMP_PATH / fileitem.name download_info = self._request_api( "POST", - "/open/ufile/downurl", - "data", + "/adrive/v1.0/openFile/getDownloadUrl", json={ - "pick_code": detail.pickcode + "drive_id": fileitem.drive_id, + "file_id": fileitem.fileid, } ) if not download_info: return None - download_url = list(download_info.values())[0].get("url", {}).get("url") + download_url = download_info.get("url") + local_path = path or settings.TEMP_PATH / fileitem.name with self.session.get(download_url, stream=True) as r: r.raise_for_status() with open(local_path, "wb") as f: @@ -684,9 +476,10 @@ class AliPan(StorageBase, metaclass=Singleton): try: self._request_api( "POST", - "/open/ufile/delete", + "/adrive/v1.0/openFile/recyclebin/trash", json={ - "file_ids": self._path_to_id(fileitem.path) + "drive_id": fileitem.drive_id, + "file_id": fileitem.fileid } ) return True @@ -697,58 +490,46 @@ class AliPan(StorageBase, metaclass=Singleton): """ 重命名文件/目录 """ - file_id = self._path_to_id(fileitem.path) resp = self._request_api( "POST", - "/open/ufile/update", + "/adrive/v1.0/openFile/update", json={ - "file_id": file_id, - "file_name": name + "drive_id": fileitem.drive_id, + "file_id": fileitem.fileid, + "name": name } ) if not resp: return False - if resp["state"]: - if fileitem.path in self._id_cache: - del self._id_cache[fileitem.path] - for key in list(self._id_cache.keys()): - if key.startswith(fileitem.path): - del self._id_cache[key] - new_path = Path(fileitem.path).parent / name - self._id_cache[str(new_path)] = file_id - return True - return False + if resp.get("code"): + logger.warn(f"【阿里云盘】重命名失败: {resp.get('message')}") + return False + if fileitem.path in self._id_cache: + del self._id_cache[fileitem.path] + for key in list(self._id_cache.keys()): + if key.startswith(fileitem.path): + del self._id_cache[key] + self._id_cache[str(Path(fileitem.path).parent / name)] = (resp.get("drive_id"), resp.get("file_id")) + return True def get_item(self, path: Path) -> Optional[schemas.FileItem]: """ 获取指定路径的文件/目录项 """ try: - file_id = self._path_to_id(str(path)) - if not file_id: - return None resp = self._request_api( "GET", - "/open/folder/get_info", - "data", + "/adrive/v1.0/openFile/get_by_path", params={ - "file_id": file_id + "file_path": str(path) } ) if not resp: return None - return schemas.FileItem( - storage=self.schema.value, - fileid=resp["file_id"], - path=str(path) + ("/" if resp["file_category"] == "1" else ""), - type="file" if resp["file_category"] == "1" else "dir", - name=resp["file_name"], - basename=Path(resp["file_name"]).stem, - extension=Path(resp["file_name"]).suffix[1:], - pickcode=resp["pick_code"], - size=StringUtils.num_filesize(resp['size']) if resp["file_category"] == "1" else None, - modify_time=resp["utime"] - ) + if resp.get("code"): + logger.debug(f"【阿里云盘】获取文件信息失败: {resp.get('message')}") + return None + return self.__get_fileitem(resp) except Exception as e: logger.debug(f"【阿里云盘】获取文件信息失败: {str(e)}") return None @@ -797,15 +578,13 @@ class AliPan(StorageBase, metaclass=Singleton): """ 企业级复制实现(支持目录递归复制) """ - src_fid = self._path_to_id(fileitem.path) - dest_cid = self._path_to_id(str(path)) - + dest_cid = self._path_to_id(fileitem.drive_id, str(path)) resp = self._request_api( "POST", "/adrive/v1.0/openFile/copy", json={ "drive_id": fileitem.drive_id, - "file_id": src_fid, + "file_id": fileitem.fileid, "to_drive_id": fileitem.drive_id, "to_parent_file_id": dest_cid } @@ -822,15 +601,15 @@ class AliPan(StorageBase, metaclass=Singleton): # 更新缓存 del self._id_cache[fileitem.path] rename_new_path = Path(path) / new_name - self._id_cache[str(rename_new_path)] = int(new_file.fileid) + self._id_cache[str(rename_new_path)] = (resp.get("drive_id"), resp.get("file_id")) return True def move(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool: """ 原子性移动操作实现 """ - src_fid = self._path_to_id(fileitem.path) - target_id = self._path_to_id(str(path)) + src_fid = fileitem.fileid + target_id = self._path_to_id(fileitem.drive_id, str(path)) resp = self._request_api( "POST", @@ -850,7 +629,7 @@ class AliPan(StorageBase, metaclass=Singleton): # 更新缓存 del self._id_cache[fileitem.path] rename_new_path = Path(path) / new_name - self._id_cache[str(rename_new_path)] = src_fid + self._id_cache[str(rename_new_path)] = (resp.get("drive_id"), resp.get("file_id")) return True def link(self, fileitem: schemas.FileItem, target_file: Path) -> bool: diff --git a/app/schemas/file.py b/app/schemas/file.py index 4e941e3d..bc662596 100644 --- a/app/schemas/file.py +++ b/app/schemas/file.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, Union from pydantic import BaseModel, Field @@ -23,7 +23,7 @@ class FileItem(BaseModel): # 子节点 children: Optional[list] = Field(default_factory=list) # ID - fileid: Optional[str] = None + fileid: Optional[Union[str, int]] = None # 父ID parent_fileid: Optional[str] = None # 缩略图