From c2c0946423d6cd0236890bf20f969c22db11d188 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Mon, 24 Mar 2025 21:39:03 +0800 Subject: [PATCH] fix 115 upload --- app/modules/filemanager/storages/u115.py | 116 ++++++++++++++--------- requirements.in | 3 +- 2 files changed, 75 insertions(+), 44 deletions(-) diff --git a/app/modules/filemanager/storages/u115.py b/app/modules/filemanager/storages/u115.py index a1dd5cac..eae95cff 100644 --- a/app/modules/filemanager/storages/u115.py +++ b/app/modules/filemanager/storages/u115.py @@ -5,10 +5,10 @@ import time from pathlib import Path from typing import List, Dict, Optional, Tuple, Union +import oss2 import requests from app import schemas -from app.api.endpoints.dashboard import storage from app.core.config import settings from app.log import logger from app.modules.filemanager import StorageBase @@ -266,14 +266,19 @@ class U115Pan(StorageBase, metaclass=Singleton): return full_path @staticmethod - def _calc_sha1(filepath: Path) -> str: + def _calc_sha1(filepath: Path, size: Optional[int] = None) -> str: """ 计算文件SHA1(符合115规范) + size: 前多少字节 """ sha1 = hashlib.sha1() with open(filepath, 'rb') as f: - while chunk := f.read(8192): + if size: + chunk = f.read(size) sha1.update(chunk) + else: + while chunk := f.read(8192): + sha1.update(chunk) return sha1.hexdigest() def check_login(self) -> Optional[Tuple[dict, str]]: @@ -395,7 +400,8 @@ class U115Pan(StorageBase, metaclass=Singleton): modify_time=int(time.time()) ) - def upload(self, target_dir: schemas.FileItem, local_path: Path, new_name: Optional[str] = None) -> schemas.FileItem: + def upload(self, target_dir: schemas.FileItem, local_path: Path, + new_name: Optional[str] = None) -> Optional[schemas.FileItem]: """ 实现带秒传、断点续传和二次认证的文件上传 """ @@ -403,6 +409,7 @@ class U115Pan(StorageBase, metaclass=Singleton): target_name = new_name or local_path.name file_size = local_path.stat().st_size file_sha1 = self._calc_sha1(local_path) + file_preid = self._calc_sha1(local_path, 128 * 1024 * 1024) # 获取目标目录CID target_cid = self._path_to_id(target_dir.path) @@ -413,7 +420,8 @@ class U115Pan(StorageBase, metaclass=Singleton): "file_name": target_name, "file_size": file_size, "target": target_param, - "fileid": file_sha1 + "fileid": file_sha1, + "preid": file_preid } init_resp = self._request_api( "POST", @@ -421,23 +429,32 @@ class U115Pan(StorageBase, metaclass=Singleton): data=init_data ) + if not init_resp: + logger.warn("初始化上传失败") + return None + if not init_resp.get("state"): + logger.warn(f"初始化上传失败: {init_resp.get('error')}") + return None + # 处理秒传成功 - if init_resp.get("status") == 2: - return schemas.FileItem( - storage=self.schema.value, - fileid=init_resp["data"]["file_id"], - path=str(Path(target_dir.path) / target_name), - name=target_name, - basename=Path(target_dir.name).stem, - extension=Path(target_dir.name).suffix[1:], - size=file_size, - type="file", - modify_time=int(time.time()) - ) + init_result = init_resp.get("data") + if init_result: + if init_result.get("status") == 2: + return schemas.FileItem( + storage=self.schema.value, + fileid=init_result["file_id"], + path=str(Path(target_dir.path) / target_name), + name=target_name, + basename=Path(target_dir.name).stem, + extension=Path(target_dir.name).suffix[1:], + size=file_size, + type="file", + modify_time=int(time.time()) + ) # Step 2: 处理二次认证 if init_resp.get("code") in [700, 701]: - sign_check = init_resp["data"]["sign_check"].split("-") + sign_check = init_result["sign_check"].split("-") start = int(sign_check[0]) end = int(sign_check[1]) @@ -449,7 +466,8 @@ class U115Pan(StorageBase, metaclass=Singleton): # 重新初始化请求 init_data.update({ - "sign_key": init_resp["data"]["sign_key"], + "pick_code": init_result["pick_code"], + "sign_key": init_result["sign_key"], "sign_val": sign_val }) init_resp = self._request_api( @@ -457,6 +475,7 @@ class U115Pan(StorageBase, metaclass=Singleton): "/open/upload/init", data=init_data ) + init_result = init_resp.get("data") # Step 3: 获取上传凭证 token_resp = self._request_api( @@ -464,37 +483,46 @@ class U115Pan(StorageBase, metaclass=Singleton): "/open/upload/get_token", "data" ) + if not token_resp: + logger.warn("获取上传凭证失败") + return None # Step 4: 对象存储上传 - upload_url = f"https://{token_resp['endpoint']}" + endpoint = token_resp["endpoint"] + auth = oss2.StsAuth( + access_key_id=token_resp['AccessKeyId'], + access_key_secret=token_resp['AccessKeySecret'], + security_token=token_resp['SecurityToken'] + ) + bucket = oss2.Bucket(auth, endpoint, init_result['bucket']) + + # 分片上传 headers = { - "Authorization": f"Bearer {self.access_token}", - "x-oss-security-token": token_resp["SecurityToken"], - "Content-Type": "application/octet-stream" + 'x-oss-callback': init_result['callback']['callback'], + 'x-oss-callback-var': base64.b64encode( + init_result['callback']['callback_var'].encode('utf-8') + ).decode('utf-8') } - - # 断点续传处理 - uploaded = 0 - while uploaded < file_size: - # 10MB分块 - chunk_size = min(1024 * 1024 * 10, file_size - uploaded) - - # 实际上传 - with open(local_path, "rb") as f: - f.seek(uploaded) - chunk = f.read(chunk_size) - requests.put( - upload_url, - headers=headers, - data=chunk - ).raise_for_status() - - uploaded += chunk_size + upload_id = bucket.init_multipart_upload(target_name, headers=headers).upload_id + parts = [] + # 每10M分一片 + chunk_size = 10 * 1024 * 1024 + chunk_num = (file_size + chunk_size - 1) // chunk_size + with open(local_path, 'rb') as f: + for i in range(chunk_num): + part = bucket.upload_part(target_name, upload_id, i + 1, f.read(chunk_size)) + parts.append(oss2.models.PartInfo(i + 1, part.etag)) + try: + bucket.complete_multipart_upload(target_name, upload_id, parts) + except Exception as err: + if "FileAlreadyExists" not in str(err): + logger.error(f"上传文件失败: {str(err)}") + return None # 构造返回结果 return schemas.FileItem( storage=self.schema.value, - fileid=init_resp["data"].get("file_id") or self._path_to_id(str(Path(target_dir.path) / target_name)), + fileid=init_result.get("file_id"), type="file", path=str(Path(target_dir.path) / target_name), name=target_name, @@ -504,7 +532,7 @@ class U115Pan(StorageBase, metaclass=Singleton): modify_time=int(time.time()) ) - def download(self, fileitem: schemas.FileItem, path: Path = None) -> Path: + def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]: """ 带限速处理的下载 """ @@ -518,6 +546,8 @@ class U115Pan(StorageBase, metaclass=Singleton): "pick_code": detail.pickcode } ) + if not download_info: + return None download_url = list(download_info.values())[0].get("url", {}).get("url") with self.session.get(download_url, stream=True) as r: r.raise_for_status() diff --git a/requirements.in b/requirements.in index 419652f5..9896ed44 100644 --- a/requirements.in +++ b/requirements.in @@ -67,4 +67,5 @@ rsa~=4.9 redis~=5.2.1 async_timeout~=5.0.1; python_full_version < "3.11.3" packaging~=24.2 -cf_clearance~=0.31.0 \ No newline at end of file +cf_clearance~=0.31.0 +oss2~=2.19.1 \ No newline at end of file