From b8cd1c46c1d8c5f1291f76d967ea1eeeeae3949d Mon Sep 17 00:00:00 2001 From: jxxghp Date: Fri, 28 Mar 2025 13:40:29 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9AAlipan=20Open=20Api?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/core/config.py | 2 + app/modules/filemanager/storages/alipan.py | 1108 ++++++++++++++------ app/modules/filemanager/storages/u115.py | 4 +- 3 files changed, 766 insertions(+), 348 deletions(-) diff --git a/app/core/config.py b/app/core/config.py index 96b34da1..96258a7b 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -111,6 +111,8 @@ class ConfigModel(BaseModel): FANART_API_KEY: str = "d2d31f9ecabea050fc7d68aa3146015f" # 115 AppId U115_APP_ID: str = "100196807" + # Alipan AppId + ALIPAN_APP_ID: str = "ac1bf04dc9fd4d9aaabb65b4a668d403" # 元数据识别缓存过期时间(小时) META_CACHE_EXPIRE: int = 0 # 电视剧动漫的分类genre_ids diff --git a/app/modules/filemanager/storages/alipan.py b/app/modules/filemanager/storages/alipan.py index 68f75656..8da438b0 100644 --- a/app/modules/filemanager/storages/alipan.py +++ b/app/modules/filemanager/storages/alipan.py @@ -1,28 +1,31 @@ import base64 +import hashlib import json -import logging -import subprocess +import secrets +import threading import time from pathlib import Path -from typing import Optional, Tuple, List +from typing import List, Dict, Optional, Tuple, Union -from aligo.response import CreateFileResponse +import oss2 +import requests +from oss2 import SizedFileAdapter, determine_part_size +from oss2.models import PartInfo from app import schemas from app.core.config import settings from app.log import logger -from app.modules.filemanager.storages import StorageBase +from app.modules.filemanager import StorageBase from app.schemas.types import StorageSchema -from app.utils.http import RequestUtils -from aligo import Aligo, BaseFile - from app.utils.singleton import Singleton from app.utils.string import StringUtils +lock = threading.Lock() + class AliPan(StorageBase, metaclass=Singleton): """ - 阿里云相关操作 + 阿里云盘相关操作 """ # 存储类型 @@ -30,264 +33,728 @@ class AliPan(StorageBase, metaclass=Singleton): # 支持的整理方式 transtype = { - "copy": "复制", "move": "移动", + "copy": "复制" } - # 是否有aria2c - _has_aria2c: bool = False + # 验证参数 + _auth_state = {} - # aligo - aligo: Aligo = None + # 上传进度值 + _last_progress = 0 - # 生成二维码 - qrcode_url = ("https://passport.aliyundrive.com/newlogin/qrcode/generate.do?" - "appName=aliyun_drive&fromSite=52&appEntrance=web&isMobile=false" - "&lang=zh_CN&returnUrl=&bizParams=&_bx-v=2.0.31") - # 二维码登录确认 - check_url = "https://passport.aliyundrive.com/newlogin/qrcode/query.do?appName=aliyun_drive&fromSite=52&_bx-v=2.0.31" + # 基础url + base_url = "https://openapi.alipan.com" + + # CID和路径缓存 + _id_cache: Dict[str, int] = {} def __init__(self): super().__init__() - try: - subprocess.run(['aria2c', '-h'], capture_output=True) - self._has_aria2c = True - logger.debug('【alipan】发现 aria2c, 将使用 aria2c 下载文件') - except FileNotFoundError: - logger.debug('【alipan】未发现 aria2c') - self._has_aria2c = False - self.init_storage() + self.session = requests.Session() + self._init_session() - def init_storage(self): + def _init_session(self): """ - 初始化 aligo + 初始化带速率限制的会话 """ + self.session.headers.update({ + "Content-Type": "application/json" + }) - def show_qrcode(qr_link: str): - """ - 显示二维码 - """ - logger.info(f"【alipan】请用阿里云盘 App 扫码登录:{qr_link}") - - refresh_token = self.__auth_params.get("refreshToken") - if refresh_token: - try: - self.aligo = Aligo(refresh_token=refresh_token, show=show_qrcode, use_aria2=self._has_aria2c, # noqa - name="MoviePilot V2", level=logging.ERROR, re_login=False) - except Exception as err: - logger.error(f"【alipan】初始化阿里云盘失败:{str(err)}") - self.__clear_params() + def _check_session(self): + """ + 检查会话是否过期 + """ + if not self.access_token: + raise Exception("【阿里云盘】请先扫码登录!") @property - def __auth_params(self): + def access_token(self) -> Optional[str]: """ - 获取阿里云盘认证参数并初始化参数格式 + 访问token """ - conf = self.get_config() - return conf.config if conf else {} + with lock: + tokens = self.get_conf() + refresh_token = tokens.get("refresh_token") + if not refresh_token: + return None + expires_in = tokens.get("expires_in", 0) + refresh_time = tokens.get("refresh_time", 0) + if expires_in and refresh_time + expires_in < int(time.time()): + tokens = self.__refresh_access_token(refresh_token) + if tokens: + self.set_config({ + "refresh_time": int(time.time()), + **tokens + }) + access_token = tokens.get("access_token") + if access_token: + self.session.headers.update({"Authorization": f"Bearer {access_token}"}) + return access_token - def __update_params(self, params: dict): + def generate_qrcode(self) -> Tuple[dict, str]: """ - 设置阿里云盘认证参数 + 实现PKCE规范的设备授权二维码生成 """ - current_params = self.__auth_params - current_params.update(params) - self.set_config(current_params) - def __clear_params(self): - """ - 清除阿里云盘认证参数 - """ - self.set_config({}) + # 生成PKCE参数 + code_verifier = secrets.token_urlsafe(96)[:128] + # 请求设备码 + resp = self.session.post( + f"{self.base_url}/oauth/authorize/qrcode", + json={ + "client_id": settings.ALIPAN_APP_ID, + "scopes": ["user:base", "file:all:read", "file:all:write", "file:share:write"], + "code_challenge": code_verifier, + "code_challenge_method": "plain" + } + ) + if resp is None: + return {}, "网络错误" + result = resp.json() + if result.get("code"): + return {}, result.get("message") + # 持久化验证参数 + self._auth_state = { + "sid": result.get("sid"), + "code_verifier": code_verifier + } + # 生成二维码内容 + return { + "codeUrl": result.get("qrCodeUrl") + }, "" - def generate_qrcode(self) -> Optional[Tuple[dict, str]]: + def check_login(self) -> Optional[Tuple[dict, str]]: """ - 生成二维码 + 改进的带PKCE校验的登录状态检查 """ - res = RequestUtils(timeout=10).get_res(self.qrcode_url) - if res: - data = res.json().get("content", {}).get("data") - return { - "codeContent": data.get("codeContent"), - "ck": data.get("ck"), - "t": data.get("t") - }, "" - elif res is not None: - return {}, f"请求阿里云盘二维码失败:{res.status_code} - {res.reason}" - return {}, f"请求阿里云盘二维码失败:无法连接!" - def check_login(self, ck: str, t: str) -> Optional[Tuple[dict, str]]: - """ - 二维码登录确认 - """ - params = { - "t": t, - "ck": ck, - "appName": "aliyun_drive", - "appEntrance": "web", - "isMobile": "false", - "lang": "zh_CN", - "returnUrl": "", - "fromSite": "52", - "bizParams": "", - "navlanguage": "zh-CN", - "navPlatform": "MacIntel", + _status_text = { + "WaitLogin": "等待登录", + "ScanSuccess": "扫码成功", + "LoginSuccess": "登录成功", + "QRCodeExpired": "二维码过期" } - body = "&".join([f"{key}={value}" for key, value in params.items()]) - - status = { - "NEW": "请用阿里云盘 App 扫码", - "SCANED": "请在手机上确认", - "EXPIRED": "二维码已过期", - "CANCELED": "已取消", - "CONFIRMED": "已确认", - } - - headers = { - "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", - } - - res = RequestUtils(headers=headers, timeout=5).post_res(self.check_url, data=body) - if res: - data = res.json().get("content", {}).get("data") or {} - qrCodeStatus = data.get("qrCodeStatus") - data["tip"] = status.get(qrCodeStatus) or "未知" - if data.get("bizExt"): - try: - bizExt = json.loads(base64.b64decode(data["bizExt"]).decode('GBK')) - pds_login_result = bizExt.get("pds_login_result") - if pds_login_result: - data.pop('bizExt') - data.update({ - 'userId': pds_login_result.get('userId'), - 'expiresIn': pds_login_result.get('expiresIn'), - 'nickName': pds_login_result.get('nickName'), - 'avatar': pds_login_result.get('avatar'), - 'tokenType': pds_login_result.get('tokenType'), - "refreshToken": pds_login_result.get('refreshToken'), - "accessToken": pds_login_result.get('accessToken'), - "defaultDriveId": pds_login_result.get('defaultDriveId'), - "updateTime": time.time(), - }) - self.__update_params(data) - self.__update_drives() - self.init_storage() - except Exception as e: - return {}, f"bizExt 解码失败:{str(e)}" - return data, "" - elif res is not None: - return {}, f"阿里云盘登录确认失败:{res.status_code} - {res.reason}" - return {}, "阿里云盘登录确认失败:无法连接!" - - def check(self) -> bool: - """ - 检查存储是否可用 - """ - if not self.aligo: - return False - return True if self.aligo.get_user() else False - - def user_info(self) -> dict: - """ - 获取用户信息(drive_id等) - """ - if not self.aligo: - return {} - return self.aligo.get_user() - - def __update_drives(self): - """ - 更新用户存储根目录 - """ - if not self.aligo: - return - drivers = self.aligo.list_my_drives() - for driver in drivers: - if driver.category == "resource": - self.__update_params({"resourceDriveId": driver.drive_id}) - elif driver.category == "backup": - self.__update_params({"backDriveId": driver.drive_id}) - - def __get_fileitem(self, fileinfo: BaseFile, parent: Optional[str] = "/") -> schemas.FileItem: - """ - 获取文件信息 - """ - if not fileinfo: - return schemas.FileItem() - if fileinfo.type == "folder": - return schemas.FileItem( - storage=self.schema.value, - fileid=fileinfo.file_id, - parent_fileid=fileinfo.parent_file_id, - type="dir", - path=f"{parent}{fileinfo.name}" + "/", - name=fileinfo.name, - basename=fileinfo.name, - size=fileinfo.size, - modify_time=StringUtils.str_to_timestamp(fileinfo.updated_at), - drive_id=fileinfo.drive_id, - ) - else: - return schemas.FileItem( - storage=self.schema.value, - fileid=fileinfo.file_id, - parent_fileid=fileinfo.parent_file_id, - type="file", - path=f"{parent}{fileinfo.name}", - name=fileinfo.name, - basename=Path(fileinfo.name).stem, - size=fileinfo.size, - extension=fileinfo.file_extension, - modify_time=StringUtils.str_to_timestamp(fileinfo.updated_at), - thumbnail=fileinfo.thumbnail, - drive_id=fileinfo.drive_id, + if not self._auth_state: + return {}, "生成二维码失败" + try: + resp = self.session.get( + f"{self.base_url}/oauth/qrcode/{self._auth_state['sid']}/status" ) + if resp is None: + return {}, "网络错误" + result = resp.json() + # 扫码结果 + status = result.get("status") + if status == "LoginSuccess": + authCode = result.get("authCode") + self._auth_state["authCode"] = authCode + tokens = self.__get_access_token() + if tokens: + self.set_config({ + "refresh_time": int(time.time()), + **tokens + }) + return {"status": status, "tip": _status_text.get(status, "未知错误")}, "" + except Exception as e: + return {}, str(e) - def list(self, fileitem: schemas.FileItem = None) -> List[schemas.FileItem]: + def __get_access_token(self) -> dict: """ - 浏览文件 - limit 返回文件数量,默认 50,最大 100 - order_by created_at/updated_at/name/size - parent_file_id 根目录为root - type all | file | folder + 确认登录后,获取相关token """ - if not self.aligo: + if not self._auth_state: + raise Exception("请先生成二维码") + resp = self.session.post( + f"{self.base_url}/oauth/access_token", + json={ + "client_id": settings.ALIPAN_APP_ID, + "grant_type": "authorization_code", + "code": self._auth_state["authCode"], + "code_verifier": self._auth_state["code_verifier"] + } + ) + if resp is None: + raise Exception("获取 access_token 失败") + result = resp.json() + if result.get("code"): + raise Exception(f"{result.get('code')} - {result.get('message')}!") + return result + + def __refresh_access_token(self, refresh_token: str) -> Optional[dict]: + """ + 刷新access_token + """ + resp = self.session.post( + f"{self.base_url}/oauth/access_token", + json={ + "client_id": settings.ALIPAN_APP_ID, + "grant_type": "refresh_token", + "refresh_token": refresh_token + } + ) + if resp is None: + logger.error(f"【阿里云盘】刷新 access_token 失败:refresh_token={refresh_token}") + return None + result = resp.json() + if result.get("code"): + logger.warn(f"【阿里云盘】刷新 access_token 失败:{result.get('code')} - {result.get('message')}!") + return result + + def _request_api(self, method: str, endpoint: str, + result_key: Optional[str] = None, **kwargs) -> Optional[Union[dict, list]]: + """ + 带错误处理和速率限制的API请求 + """ + # 检查会话 + self._check_session() + + resp = self.session.request( + method, f"{self.base_url}{endpoint}", + **kwargs + ) + if resp is None: + logger.warn(f"【阿里云盘】{method} 请求 {endpoint} 失败!") + return None + + # 处理速率限制 + if resp.status_code == 429: + reset_time = int(resp.headers.get("X-RateLimit-Reset", 60)) + time.sleep(reset_time + 5) + return self._request_api(method, endpoint, result_key, **kwargs) + + # 处理请求错误 + resp.raise_for_status() + + # 返回数据 + ret_data = resp.json() + if ret_data.get("code"): + logger.warn(f"【阿里云盘】{method} 请求 {endpoint} 出错:{ret_data.get('message')}!") + + if result_key: + return ret_data.get(result_key) + return ret_data + + def _path_to_id(self, path: str) -> int: + """ + 路径转FID(带缓存机制) + """ + # 根目录 + if path == "/": + return 0 + if len(path) > 1 and path.endswith("/"): + path = path[:-1] + # 检查缓存 + if path in self._id_cache: + return self._id_cache[path] + # 逐级查找缓存 + current_id = 0 + parent_path = "/" + for p in Path(path).parents: + if str(p) in self._id_cache: + parent_path = str(p) + current_id = self._id_cache[parent_path] + break + # 计算相对路径 + rel_path = Path(path).relative_to(parent_path) + for part in Path(rel_path).parts: + offset = 0 + find_part = False + while True: + resp = self._request_api( + "GET", + "/open/ufile/files", + "data", + params={"cid": current_id, "limit": 1000, "offset": offset, "cur": True, "show_dir": 1} + ) + if not resp: + break + for item in resp: + if item["fn"] == part: + current_id = item["fid"] + find_part = True + break + if find_part: + break + if len(resp) < 1000: + break + offset += len(resp) + if not find_part: + raise FileNotFoundError(f"【阿里云盘】{path} 不存在") + if not current_id: + raise FileNotFoundError(f"【阿里云盘】{path} 不存在") + # 缓存路径 + self._id_cache[path] = current_id + return current_id + + def _id_to_path(self, fid: int) -> str: + """ + CID转路径(带双向缓存) + """ + # 根目录特殊处理 + if fid == 0: + return "/" + # 优先从缓存读取 + if fid in self._id_cache.values(): + return next(k for k, v in self._id_cache.items() if v == fid) + # 从API获取当前节点信息 + detail = self._request_api( + "GET", + "/open/folder/get_info", + "data", + params={ + "file_id": fid + } + ) + # 处理可能的空数据(如已删除文件) + if not detail: + raise FileNotFoundError(f"【阿里云盘】{fid} 不存在") + paths = detail["paths"] + path_parts = [item["file_name"] for item in paths] + # 构建完整路径 + full_path = "/" + "/".join(reversed(path_parts)) + # 缓存新路径 + self._id_cache[full_path] = fid + return full_path + + @staticmethod + def _calc_sha1(filepath: Path, size: Optional[int] = None) -> str: + """ + 计算文件SHA1(符合阿里云盘规范) + size: 前多少字节 + """ + sha1 = hashlib.sha1() + with open(filepath, 'rb') as f: + if size: + chunk = f.read(size) + sha1.update(chunk) + else: + while chunk := f.read(8192): + sha1.update(chunk) + return sha1.hexdigest() + + def init_storage(self): + pass + + def list(self, fileitem: schemas.FileItem) -> List[schemas.FileItem]: + """ + 目录遍历实现 + """ + + if fileitem.type == "file": + item = self.detail(fileitem) + if item: + return [item] return [] - # 根目录处理 - if not fileitem or not fileitem.drive_id: - items = self.aligo.get_file_list() - if items: - return [self.__get_fileitem(item) for item in items] - elif fileitem.type == "file": - # 文件处理 - file = self.detail(fileitem) - if file: - return [file] - else: - items = self.aligo.get_file_list(parent_file_id=fileitem.fileid, drive_id=fileitem.drive_id) - if items: - return [self.__get_fileitem(item, parent=fileitem.path) for item in items] - return [] - def create_folder(self, fileitem: schemas.FileItem, name: str) -> Optional[schemas.FileItem]: + cid = self._path_to_id(fileitem.path) + items = [] + offset = 0 + + while True: + resp = self._request_api( + "GET", + "/open/ufile/files", + "data", + params={"cid": cid, "limit": 1000, "offset": offset, "cur": True, "show_dir": 1} + ) + if resp is None: + raise FileNotFoundError(f"【阿里云盘】{fileitem.path} 检索出错!") + if not resp: + break + for item in resp: + # 更新缓存 + path = f"{fileitem.path}{item['fn']}" + self._id_cache[path] = item["fid"] + + file_path = path + ("/" if item["fc"] == "0" else "") + items.append(schemas.FileItem( + storage=self.schema.value, + fileid=item["fid"], + name=item["fn"], + basename=Path(item["fn"]).stem, + extension=item["ico"] if item["fc"] == "1" else None, + type="dir" if item["fc"] == "0" else "file", + path=file_path, + size=item["fs"] if item["fc"] == "1" else None, + modify_time=item["upt"], + pickcode=item["pc"] + )) + + if len(resp) < 1000: + break + offset += len(resp) + + return items + + def create_folder(self, parent_item: schemas.FileItem, name: str) -> Optional[schemas.FileItem]: """ 创建目录 - :param fileitem: 父目录 - :param name: 目录名 """ - if not self.aligo: + parent_id = self._path_to_id(parent_item.path) + new_path = Path(parent_item.path) / name + resp = self._request_api( + "POST", + "/open/folder/add", + json={ + "pid": parent_id, + "file_name": name + } + ) + if not resp: + return None + if not resp.get("state"): + if resp.get("code") == 20004: + # 目录已存在 + return self.get_item(new_path) + logger.warn(f"【阿里云盘】创建目录失败: {resp.get('error')}") + return None + # 缓存新目录 + self._id_cache[str(new_path)] = resp["data"]["file_id"] + return schemas.FileItem( + storage=self.schema.value, + fileid=resp["data"]["file_id"], + path=str(new_path) + "/", + name=name, + basename=name, + type="dir", + modify_time=int(time.time()) + ) + + def upload(self, target_dir: schemas.FileItem, local_path: Path, + new_name: Optional[str] = None) -> Optional[schemas.FileItem]: + """ + 实现带秒传、断点续传和二次认证的文件上传 + """ + + def progress_callback(consumed_bytes: int, total_bytes: int): + """ + 上传进度回调 + """ + progress = round(consumed_bytes / total_bytes * 100) + if round(progress, -1) != self._last_progress: + logger.info(f"【阿里云盘】已上传: {StringUtils.str_filesize(consumed_bytes)}" + f" / {StringUtils.str_filesize(total_bytes)}, 进度: {progress}%") + self._last_progress = round(progress, -1) + + def encode_callback(cb: dict): + """ + 回调参数Base64编码函数 + """ + return oss2.utils.b64encode_as_string(json.dumps(cb).strip()) + + target_name = new_name or local_path.name + target_path = str(Path(target_dir.path) / target_name) + # 计算文件特征值 + file_size = local_path.stat().st_size + file_sha1 = self._calc_sha1(local_path) + file_preid = self._calc_sha1(local_path, 128 * 1024 * 1024) + + # 获取目标目录CID + target_cid = self._path_to_id(target_dir.path) + target_param = f"U_1_{target_cid}" + + # Step 1: 初始化上传 + init_data = { + "file_name": target_name, + "file_size": file_size, + "target": target_param, + "fileid": file_sha1, + "preid": file_preid + } + init_resp = self._request_api( + "POST", + "/open/upload/init", + json=init_data + ) + if not init_resp: + return None + if not init_resp.get("state"): + logger.warn(f"【阿里云盘】初始化上传失败: {init_resp.get('error')}") + return None + # 结果 + init_result = init_resp.get("data") + logger.debug(f"【阿里云盘】上传 Step 1 初始化结果: {init_result}") + file_id = init_result.get("file_id") + # 回调信息 + bucket_name = init_result.get("bucket") + object_name = init_result.get("object") + callback = init_result.get("callback") + # 二次认证信息 + sign_check = init_result.get("sign_check") + pick_code = init_result.get("pick_code") + sign_key = init_result.get("sign_key") + + # Step 2: 处理二次认证 + if init_result.get("code") in [700, 701] and sign_check: + sign_checks = sign_check.split("-") + start = int(sign_checks[0]) + end = int(sign_checks[1]) + # 计算指定区间的SHA1 + # sign_check (用下划线隔开,截取上传文内容的sha1)(单位是byte): "2392148-2392298" + with open(local_path, "rb") as f: + # 取2392148-2392298之间的内容(包含2392148、2392298)的sha1 + f.seek(start) + chunk = f.read(end - start + 1) + sign_val = hashlib.sha1(chunk).hexdigest().upper() + # 重新初始化请求 + # sign_key,sign_val(根据sign_check计算的值大写的sha1值) + init_data.update({ + "pick_code": pick_code, + "sign_key": sign_key, + "sign_val": sign_val + }) + init_resp = self._request_api( + "POST", + "/open/upload/init", + json=init_data + ) + if not init_resp: + return None + # 二次认证结果 + init_result = init_resp.get("data") + logger.debug(f"【阿里云盘】上传 Step 2 二次认证结果: {init_result}") + if not pick_code: + pick_code = init_result.get("pick_code") + if not bucket_name: + bucket_name = init_result.get("bucket") + if not object_name: + object_name = init_result.get("object") + if not file_id: + file_id = init_result.get("file_id") + if not callback: + callback = init_result.get("callback") + + # Step 3: 秒传 + if init_result.get("status") == 2: + logger.info(f"【阿里云盘】{target_name} 秒传成功") + return schemas.FileItem( + storage=self.schema.value, + fileid=file_id, + path=target_path, + name=target_name, + basename=Path(target_name).stem, + extension=Path(target_name).suffix[1:], + size=file_size, + type="file", + pickcode=pick_code, + modify_time=int(time.time()) + ) + + # Step 4: 获取上传凭证 + token_resp = self._request_api( + "GET", + "/open/upload/get_token", + "data" + ) + if not token_resp: + logger.warn("【阿里云盘】获取上传凭证失败") + return None + logger.debug(f"【阿里云盘】上传 Step 4 获取上传凭证结果: {token_resp}") + # 上传凭证 + endpoint = token_resp.get("endpoint") + AccessKeyId = token_resp.get("AccessKeyId") + AccessKeySecret = token_resp.get("AccessKeySecret") + SecurityToken = token_resp.get("SecurityToken") + + # Step 5: 断点续传 + resume_resp = self._request_api( + "POST", + "/open/upload/resume", + "data", + json={ + "file_size": file_size, + "target": target_param, + "fileid": file_sha1, + "pick_code": pick_code + } + ) + if resume_resp: + logger.debug(f"【阿里云盘】上传 Step 5 断点续传结果: {resume_resp}") + if resume_resp.get("callback"): + callback = resume_resp["callback"] + + # Step 6: 对象存储上传 + auth = oss2.StsAuth( + access_key_id=AccessKeyId, + access_key_secret=AccessKeySecret, + security_token=SecurityToken + ) + bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa + # 处理oss请求回调 + callback_dict = json.loads(callback.get("callback")) + callback_var_dict = json.loads(callback.get("callback_var")) + # 补充参数 + logger.debug(f"【阿里云盘】上传 Step 6 回调参数:{callback_dict} {callback_var_dict}") + # 填写不能包含Bucket名称在内的Object完整路径,例如exampledir/exampleobject.txt。 + # determine_part_size方法用于确定分片大小,设置分片大小为 1GB + part_size = determine_part_size(file_size, preferred_size=1 * 1024 * 1024 * 1024) + logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}") + # 初始化分片 + upload_id = bucket.init_multipart_upload(object_name, + params={ + "encoding-type": "url", + "sequential": "" + }).upload_id + parts = [] + # 逐个上传分片 + with open(local_path, 'rb') as fileobj: + part_number = 1 + offset = 0 + while offset < file_size: + num_to_upload = min(part_size, file_size - offset) + # 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。 + logger.info(f"【阿里云盘】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}") + result = bucket.upload_part(object_name, upload_id, part_number, + json=SizedFileAdapter(fileobj, num_to_upload), + progress_callback=progress_callback) + parts.append(PartInfo(part_number, result.etag)) + logger.info(f"【阿里云盘】{target_name} 分片 {part_number} 上传完成") + offset += num_to_upload + part_number += 1 + # 请求头 + headers = { + 'X-oss-callback': encode_callback(callback_dict), + 'x-oss-callback-var': encode_callback(callback_var_dict), + 'x-oss-forbid-overwrite': 'false' + } + try: + result = bucket.complete_multipart_upload(object_name, upload_id, parts, + headers=headers) + if result.status == 200: + logger.debug(f"【阿里云盘】上传 Step 6 回调结果:{result.resp.response.json()}") + logger.info(f"【阿里云盘】{target_name} 上传成功") + else: + logger.warn(f"【阿里云盘】{target_name} 上传失败,错误码: {result.status}") + return None + except oss2.exceptions.OssError as e: + if e.code == "FileAlreadyExists": + logger.warn(f"【阿里云盘】{target_name} 已存在") + else: + logger.error(f"【阿里云盘】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}") + return None + # 返回结果 + return schemas.FileItem( + storage=self.schema.value, + fileid=file_id, + type="file", + path=target_path, + name=target_name, + basename=Path(target_name).stem, + extension=Path(target_name).suffix[1:], + size=file_size, + pickcode=pick_code, + modify_time=int(time.time()) + ) + + def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]: + """ + 带限速处理的下载 + """ + detail = self.get_item(Path(fileitem.path)) + local_path = path or settings.TEMP_PATH / fileitem.name + download_info = self._request_api( + "POST", + "/open/ufile/downurl", + "data", + json={ + "pick_code": detail.pickcode + } + ) + if not download_info: + return None + download_url = list(download_info.values())[0].get("url", {}).get("url") + with self.session.get(download_url, stream=True) as r: + r.raise_for_status() + with open(local_path, "wb") as f: + for chunk in r.iter_content(chunk_size=8192): + f.write(chunk) + return local_path + + def check(self) -> bool: + return self.access_token is not None + + def delete(self, fileitem: schemas.FileItem) -> bool: + """ + 删除文件/目录 + """ + try: + self._request_api( + "POST", + "/open/ufile/delete", + json={ + "file_ids": self._path_to_id(fileitem.path) + } + ) + return True + except requests.exceptions.HTTPError: + return False + + def rename(self, fileitem: schemas.FileItem, name: str) -> bool: + """ + 重命名文件/目录 + """ + file_id = self._path_to_id(fileitem.path) + resp = self._request_api( + "POST", + "/open/ufile/update", + json={ + "file_id": file_id, + "file_name": name + } + ) + if not resp: + return False + if resp["state"]: + if fileitem.path in self._id_cache: + del self._id_cache[fileitem.path] + for key in list(self._id_cache.keys()): + if key.startswith(fileitem.path): + del self._id_cache[key] + new_path = Path(fileitem.path).parent / name + self._id_cache[str(new_path)] = file_id + return True + return False + + def get_item(self, path: Path) -> Optional[schemas.FileItem]: + """ + 获取指定路径的文件/目录项 + """ + try: + file_id = self._path_to_id(str(path)) + if not file_id: + return None + resp = self._request_api( + "GET", + "/open/folder/get_info", + "data", + params={ + "file_id": file_id + } + ) + if not resp: + return None + return schemas.FileItem( + storage=self.schema.value, + fileid=resp["file_id"], + path=str(path) + ("/" if resp["file_category"] == "1" else ""), + type="file" if resp["file_category"] == "1" else "dir", + name=resp["file_name"], + basename=Path(resp["file_name"]).stem, + extension=Path(resp["file_name"]).suffix[1:], + pickcode=resp["pick_code"], + size=StringUtils.num_filesize(resp['size']) if resp["file_category"] == "1" else None, + modify_time=resp["utime"] + ) + except Exception as e: + logger.debug(f"【阿里云盘】获取文件信息失败: {str(e)}") return None - item = self.aligo.create_folder(name=name, parent_file_id=fileitem.fileid, drive_id=fileitem.drive_id) - if item: - if isinstance(item, CreateFileResponse): - item = self.aligo.get_file(file_id=item.file_id, drive_id=item.drive_id) - return self.__get_fileitem(item, parent=fileitem.path) - return None def get_folder(self, path: Path) -> Optional[schemas.FileItem]: """ - 根据文件路程获取目录,不存在则创建 + 获取指定路径的文件夹,如不存在则创建 """ def __find_dir(_fileitem: schemas.FileItem, _name: str) -> Optional[schemas.FileItem]: @@ -307,153 +774,102 @@ class AliPan(StorageBase, metaclass=Singleton): return folder # 逐级查找和创建目录 fileitem = schemas.FileItem(storage=self.schema.value, path="/") - for part in path.parts: - if part == "/": - continue + for part in path.parts[1:]: dir_file = __find_dir(fileitem, part) if dir_file: fileitem = dir_file else: dir_file = self.create_folder(fileitem, part) if not dir_file: + logger.warn(f"【阿里云盘】创建目录 {fileitem.path}{part} 失败!") return None fileitem = dir_file return fileitem - def get_item(self, path: Path) -> Optional[schemas.FileItem]: - """ - 获取文件或目录,不存在返回None - """ - if not self.aligo: - return None - item = self.aligo.get_file_by_path(path=str(path)) - if item: - return self.__get_fileitem(item, parent=str(path.parent)) - return None - - def delete(self, fileitem: schemas.FileItem) -> bool: - """ - 删除文件 - """ - if not self.aligo: - return False - if self.aligo.move_file_to_trash(file_id=fileitem.fileid, drive_id=fileitem.drive_id): - return True - return False - def detail(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]: """ - 获取文件详情 + 获取文件/目录详细信息 """ - if not self.aligo: - return None - item = self.aligo.get_file(file_id=fileitem.fileid, drive_id=fileitem.drive_id) - if item: - return self.__get_fileitem(item, parent=fileitem.path) - return None - - def rename(self, fileitem: schemas.FileItem, name: str) -> bool: - """ - 重命名文件 - """ - if not self.aligo: - return False - if self.aligo.rename_file(file_id=fileitem.fileid, name=name, drive_id=fileitem.drive_id): - return True - return False - - def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]: - """ - 下载文件,保存到本地 - """ - if not self.aligo: - return None - local_path = self.aligo.download_file(file_id=fileitem.fileid, drive_id=fileitem.drive_id, # noqa - local_folder=str(path or settings.TEMP_PATH)) - if local_path: - return Path(local_path) - return None - - def upload(self, fileitem: schemas.FileItem, path: Path, - new_name: Optional[str] = None) -> Optional[schemas.FileItem]: - """ - 上传文件,并标记完成 - :param fileitem: 上传目录项 - :param path: 本地文件路径 - :param new_name: 上传后文件名 - """ - if not self.aligo: - return None - # 上传文件 - result = self.aligo.upload_file(file_path=str(path), parent_file_id=fileitem.fileid, - drive_id=fileitem.drive_id, name=new_name or path.name, - check_name_mode="refuse") - if result: - item = self.aligo.get_file(file_id=result.file_id, drive_id=result.drive_id) - if item: - return self.__get_fileitem(item, parent=fileitem.path) - return None - - def move(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool: - """ - 移动文件 - :param fileitem: 文件项 - :param path: 目标目录 - :param new_name: 新文件名 - """ - if not self.aligo: - return False - target = self.get_folder(path) - if not target: - return False - if self.aligo.move_file(file_id=fileitem.fileid, drive_id=fileitem.drive_id, - to_parent_file_id=target.fileid, to_drive_id=target.drive_id, - new_name=new_name): - return True - return False + return self.get_item(Path(fileitem.path)) def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool: """ - 复制文件 - :param fileitem: 文件项 - :param path: 目标目录 - :param new_name: 新文件名 + 企业级复制实现(支持目录递归复制) """ - if not self.aligo: + src_fid = self._path_to_id(fileitem.path) + dest_cid = self._path_to_id(str(path)) + + resp = self._request_api( + "POST", + "/open/ufile/copy", + json={ + "file_id": src_fid, + "pid": dest_cid + } + ) + if not resp: return False - target = self.get_folder(path) - if not target: + if resp["state"]: + new_path = Path(path) / fileitem.name + new_file = self.get_item(new_path) + self.rename(new_file, new_name) + # 更新缓存 + del self._id_cache[fileitem.path] + rename_new_path = Path(path) / new_name + self._id_cache[str(rename_new_path)] = int(new_file.fileid) + return True + return False + + def move(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool: + """ + 原子性移动操作实现 + """ + src_fid = self._path_to_id(fileitem.path) + dest_cid = self._path_to_id(str(path)) + + resp = self._request_api( + "POST", + "/open/ufile/move", + json={ + "file_ids": src_fid, + "to_cid": dest_cid + } + ) + if not resp: return False - if self.aligo.copy_file(file_id=fileitem.fileid, drive_id=fileitem.drive_id, - to_parent_file_id=target.fileid, to_drive_id=target.drive_id, - new_name=new_name): + if resp["state"]: + new_path = Path(path) / fileitem.name + new_file = self.get_item(new_path) + self.rename(new_file, new_name) + # 更新缓存 + del self._id_cache[fileitem.path] + rename_new_path = Path(path) / new_name + self._id_cache[str(rename_new_path)] = src_fid return True return False def link(self, fileitem: schemas.FileItem, target_file: Path) -> bool: - """ - 硬链接文件 - """ pass def softlink(self, fileitem: schemas.FileItem, target_file: Path) -> bool: - """ - 软链接文件 - """ pass def usage(self) -> Optional[schemas.StorageUsage]: """ - 存储使用情况 + 获取带有企业级配额信息的存储使用情况 """ - if not self.aligo: + try: + resp = self._request_api( + "GET", + "/open/user/info", + "data" + ) + if not resp: + return None + space = resp["rt_space_info"] + return schemas.StorageUsage( + total=space["all_total"]["size"], + available=space["all_remain"]["size"] + ) + except KeyError: return None - user_capacity = self.aligo.get_user_capacity_info() - if user_capacity: - drive_capacity = user_capacity.drive_capacity_details - if drive_capacity: - return schemas.StorageUsage( - total=drive_capacity.drive_total_size, - available=drive_capacity.drive_total_size - drive_capacity.drive_used_size - ) - return None diff --git a/app/modules/filemanager/storages/u115.py b/app/modules/filemanager/storages/u115.py index c77ec664..bd73d56a 100644 --- a/app/modules/filemanager/storages/u115.py +++ b/app/modules/filemanager/storages/u115.py @@ -166,7 +166,7 @@ class U115Pan(StorageBase, metaclass=Singleton): 确认登录后,获取相关token """ if not self._auth_state: - raise Exception("【115】请先调用生成二维码方法") + raise Exception("请先生成二维码") resp = self.session.post( "https://passportapi.115.com/open/deviceCodeToToken", data={ @@ -175,7 +175,7 @@ class U115Pan(StorageBase, metaclass=Singleton): } ) if resp is None: - raise Exception("【115】获取 access_token 失败") + raise Exception("获取 access_token 失败") result = resp.json() if result.get("code") != 0: raise Exception(result.get("message"))