diff --git a/app/modules/filemanager/storages/u115.py b/app/modules/filemanager/storages/u115.py index f8115260..aa9ebb28 100644 --- a/app/modules/filemanager/storages/u115.py +++ b/app/modules/filemanager/storages/u115.py @@ -8,6 +8,9 @@ from typing import List, Dict, Optional, Tuple, Union import oss2 import requests +from oss2 import SizedFileAdapter, determine_part_size +from oss2.credentials import EnvironmentVariableCredentialsProvider +from oss2.models import PartInfo from app import schemas from app.core.config import settings @@ -421,8 +424,9 @@ class U115Pan(StorageBase, metaclass=Singleton): """ return oss2.compat.to_string(base64.b64encode(oss2.compat.to_bytes(cb_str))) - # 计算文件特征值 target_name = new_name or local_path.name + target_path = str(Path(target_dir.path) / target_name) + # 计算文件特征值 file_size = local_path.stat().st_size file_sha1 = self._calc_sha1(local_path) file_preid = self._calc_sha1(local_path, 128 * 1024 * 1024) @@ -451,7 +455,7 @@ class U115Pan(StorageBase, metaclass=Singleton): return None # 结果 init_result = init_resp.get("data") - logger.debug(f"【115】上传 Step 1 结果: {init_result}") + logger.debug(f"【115】上传 Step 1 初始化结果: {init_result}") file_id = init_result.get("file_id") # 回调信息 bucket_name = init_result.get("bucket") @@ -490,7 +494,7 @@ class U115Pan(StorageBase, metaclass=Singleton): return None # 二次认证结果 init_result = init_resp.get("data") - logger.debug(f"【115】上传 Step 2 结果: {init_result}") + logger.debug(f"【115】上传 Step 2 二次认证结果: {init_result}") if not pick_code: pick_code = init_result.get("pick_code") if not bucket_name: @@ -504,10 +508,10 @@ class U115Pan(StorageBase, metaclass=Singleton): return schemas.FileItem( storage=self.schema.value, fileid=file_id, - path=str(Path(target_dir.path) / target_name), + path=target_path, name=target_name, - basename=Path(target_dir.name).stem, - extension=Path(target_dir.name).suffix[1:], + basename=Path(target_name).stem, + extension=Path(target_name).suffix[1:], size=file_size, type="file", pickcode=pick_code, @@ -523,14 +527,36 @@ class U115Pan(StorageBase, metaclass=Singleton): if not token_resp: logger.warn("【115】获取上传凭证失败") return None - logger.debug(f"【115】上传 Step 4 结果: {token_resp}") + logger.debug(f"【115】上传 Step 4 获取上传凭证结果: {token_resp}") + # 上传凭证 + endpoint = token_resp.get("endpoint") + AccessKeyId = token_resp.get("AccessKeyId") + AccessKeySecret = token_resp.get("AccessKeySecret") + SecurityToken = token_resp.get("SecurityToken") - # Step 5: 对象存储上传 - endpoint = token_resp["endpoint"] + # Step 5: 断点续传 + resume_resp = self._request_api( + "POST", + "/open/upload/resume", + "data", + data={ + "file_size": file_size, + "target": target_param, + "fileid": file_sha1, + "pick_code": pick_code + } + ) + if resume_resp: + logger.debug(f"【115】上传 Step 5 断点续传结果: {resume_resp}") + if resume_resp.get("callback"): + callback_str = resume_resp["callback"].get("callback") + callback_var_str = resume_resp["callback"].get("callback_var") + + # Step 6: 对象存储上传 auth = oss2.StsAuth( - access_key_id=token_resp['AccessKeyId'], - access_key_secret=token_resp['AccessKeySecret'], - security_token=token_resp['SecurityToken'] + access_key_id=AccessKeyId, + access_key_secret=AccessKeySecret, + security_token=SecurityToken ) bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa headers = { @@ -538,35 +564,79 @@ class U115Pan(StorageBase, metaclass=Singleton): 'x-oss-callback-var': encode_callback(callback_var_str) } logger.info(f"【115】开始上传: {local_path} -> {target_name}") - with open(local_path, "rb") as f: - try: - result = bucket.put_object( - target_name, - data=f, - headers=headers, - progress_callback=progress_callback + # 填写不能包含Bucket名称在内的Object完整路径,例如exampledir/exampleobject.txt。 + key = target_path[1:] + # determine_part_size方法用于确定分片大小。分片最小值为100 KB,最大值为5 GB。最后一个分片的大小允许小于100 KB。设置分片大小为 1GB。 + preferred_size = 1 * 1024 * 1024 * 1024 + part_size = determine_part_size(file_size, preferred_size=preferred_size) + # 初始化分片。 + # 如需在初始化分片时设置文件存储类型,请在init_multipart_upload中设置相关Headers,参考如下。 + # headers = dict() + # 指定该Object的网页缓存行为。 + # headers['Cache-Control'] = 'no-cache' + # 指定该Object被下载时的名称。 + # headers['Content-Disposition'] = 'oss_MultipartUpload.txt' + # 指定过期时间,单位为毫秒。 + # headers['Expires'] = '1000' + # 指定初始化分片上传时是否覆盖同名Object。此处设置为true,表示禁止覆盖同名Object。 + # headers['x-oss-forbid-overwrite'] = 'true' + # 指定上传该Object的每个Part时使用的服务器端加密方式。 + # headers[OSS_SERVER_SIDE_ENCRYPTION] = SERVER_SIDE_ENCRYPTION_KMS + # 指定Object的加密算法。如果未指定此选项,表明Object使用AES256加密算法。 + # headers[OSS_SERVER_SIDE_DATA_ENCRYPTION] = SERVER_SIDE_ENCRYPTION_KMS + # 表示KMS托管的用户主密钥。 + # headers[OSS_SERVER_SIDE_ENCRYPTION_KEY_ID] = '9468da86-3509-4f8d-a61e-6eab1eac****' + # 指定Object的存储类型。 + # headers['x-oss-storage-class'] = oss2.BUCKET_STORAGE_CLASS_STANDARD + # 指定Object的对象标签,可同时设置多个标签。 + # headers[OSS_OBJECT_TAGGING] = 'k1=v1&k2=v2&k3=v3' + # upload_id = bucket.init_multipart_upload(key, headers=headers).upload_id + upload_id = bucket.init_multipart_upload(key).upload_id + # 根据upload_id执行取消分片上传事件或者列举已上传分片的操作。 + # 如果您需要根据您需要uploadId执行取消分片上传事件的操作,您需要在调用InitiateMultipartUpload完成初始化分片之后获取uploadId。 + # 如果您需要根据您需要uploadId执行列举已上传分片的操作,您需要在调用InitiateMultipartUpload完成初始化分片之后,且在调用CompleteMultipartUpload完成分片上传之前获取uploadId。 + # print("UploadID:", upload_id) + parts = [] + # 逐个上传分片。 + with open(local_path, 'rb') as fileobj: + part_number = 1 + offset = 0 + while offset < file_size: + num_to_upload = min(part_size, file_size - offset) + # 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。 + result = bucket.upload_part(key, upload_id, part_number, SizedFileAdapter(fileobj, num_to_upload)) + parts.append(PartInfo(part_number, result.etag)) + offset += num_to_upload + part_number += 1 + # 完成分片上传。 + # 设置文件访问权限ACL。此处设置为OBJECT_ACL_PRIVATE,表示私有权限。 + # headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PRIVATE + try: + result = bucket.complete_multipart_upload(key, upload_id, parts, + headers=headers, + progress_callback=progress_callback) + logger.debug(f"【115】上传 Step 6 对象存储结果: {result}") + if result.status == 200: + # 构造返回结果 + logger.info(f"【115】{target_name} 上传成功") + return schemas.FileItem( + storage=self.schema.value, + fileid=file_id, + type="file", + path=target_path, + name=target_name, + basename=Path(target_name).stem, + extension=Path(target_name).suffix[1:], + size=file_size, + pickcode=pick_code, + modify_time=int(time.time()) ) - if result.status == 200: - # 构造返回结果 - logger.info(f"【115】{target_name} 上传成功") - return schemas.FileItem( - storage=self.schema.value, - fileid=file_id, - type="file", - path=str(Path(target_dir.path) / target_name), - name=target_name, - basename=Path(target_name).stem, - extension=Path(target_name).suffix[1:], - size=file_size, - pickcode=pick_code, - modify_time=int(time.time()) - ) - else: - logger.warn(f"【115】{target_name} 上传失败,错误码: {result.status}") - return None - except oss2.exceptions.OssError as e: - logger.error(f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}") + else: + logger.warn(f"【115】{target_name} 上传失败,错误码: {result.status}") return None + except oss2.exceptions.OssError as e: + logger.error(f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}") + return None def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]: """