From 9522888a609352f68cf7a1c2e7ab8c325f7ff5c9 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Wed, 26 Mar 2025 08:30:30 +0800 Subject: [PATCH] fix 115 --- app/modules/filemanager/storages/u115.py | 98 ++++++++++-------------- 1 file changed, 39 insertions(+), 59 deletions(-) diff --git a/app/modules/filemanager/storages/u115.py b/app/modules/filemanager/storages/u115.py index aa9ebb28..c0925907 100644 --- a/app/modules/filemanager/storages/u115.py +++ b/app/modules/filemanager/storages/u115.py @@ -9,7 +9,6 @@ from typing import List, Dict, Optional, Tuple, Union import oss2 import requests from oss2 import SizedFileAdapter, determine_part_size -from oss2.credentials import EnvironmentVariableCredentialsProvider from oss2.models import PartInfo from app import schemas @@ -413,10 +412,10 @@ class U115Pan(StorageBase, metaclass=Singleton): 上传进度回调 """ progress = round(consumed_bytes / total_bytes * 100) - if progress != self._last_progress: + if round(progress, -1) != self._last_progress: logger.info(f"【115】已上传: {StringUtils.str_filesize(consumed_bytes)}" - f" / {StringUtils.str_filesize(total_bytes)} 字节, 进度: {progress}%") - self._last_progress = progress + f" / {StringUtils.str_filesize(total_bytes)}, 进度: {progress}%") + self._last_progress = round(progress, -1) def encode_callback(cb_str: str): """ @@ -459,8 +458,7 @@ class U115Pan(StorageBase, metaclass=Singleton): file_id = init_result.get("file_id") # 回调信息 bucket_name = init_result.get("bucket") - callback_str = init_result.get("callback", {}).get("callback") - callback_var_str = init_result.get("callback", {}).get("callback_var") + callback = init_result.get("callback") # 二次认证信息 sign_check = init_result.get("sign_check") pick_code = init_result.get("pick_code") @@ -501,6 +499,8 @@ class U115Pan(StorageBase, metaclass=Singleton): bucket_name = init_result.get("bucket") if not file_id: file_id = init_result.get("file_id") + if not callback: + callback = init_result.get("callback") # Step 3: 秒传 if init_result.get("status") == 2: @@ -549,8 +549,7 @@ class U115Pan(StorageBase, metaclass=Singleton): if resume_resp: logger.debug(f"【115】上传 Step 5 断点续传结果: {resume_resp}") if resume_resp.get("callback"): - callback_str = resume_resp["callback"].get("callback") - callback_var_str = resume_resp["callback"].get("callback_var") + callback = resume_resp["callback"] # Step 6: 对象存储上传 auth = oss2.StsAuth( @@ -560,52 +559,31 @@ class U115Pan(StorageBase, metaclass=Singleton): ) bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa headers = { - 'x-oss-callback': encode_callback(callback_str), - 'x-oss-callback-var': encode_callback(callback_var_str) + 'x-oss-callback': encode_callback(callback.get("callback")), + 'x-oss-callback-var': encode_callback(callback.get("callback_var")) } logger.info(f"【115】开始上传: {local_path} -> {target_name}") # 填写不能包含Bucket名称在内的Object完整路径,例如exampledir/exampleobject.txt。 key = target_path[1:] - # determine_part_size方法用于确定分片大小。分片最小值为100 KB,最大值为5 GB。最后一个分片的大小允许小于100 KB。设置分片大小为 1GB。 - preferred_size = 1 * 1024 * 1024 * 1024 - part_size = determine_part_size(file_size, preferred_size=preferred_size) - # 初始化分片。 - # 如需在初始化分片时设置文件存储类型,请在init_multipart_upload中设置相关Headers,参考如下。 - # headers = dict() - # 指定该Object的网页缓存行为。 - # headers['Cache-Control'] = 'no-cache' - # 指定该Object被下载时的名称。 - # headers['Content-Disposition'] = 'oss_MultipartUpload.txt' - # 指定过期时间,单位为毫秒。 - # headers['Expires'] = '1000' - # 指定初始化分片上传时是否覆盖同名Object。此处设置为true,表示禁止覆盖同名Object。 - # headers['x-oss-forbid-overwrite'] = 'true' - # 指定上传该Object的每个Part时使用的服务器端加密方式。 - # headers[OSS_SERVER_SIDE_ENCRYPTION] = SERVER_SIDE_ENCRYPTION_KMS - # 指定Object的加密算法。如果未指定此选项,表明Object使用AES256加密算法。 - # headers[OSS_SERVER_SIDE_DATA_ENCRYPTION] = SERVER_SIDE_ENCRYPTION_KMS - # 表示KMS托管的用户主密钥。 - # headers[OSS_SERVER_SIDE_ENCRYPTION_KEY_ID] = '9468da86-3509-4f8d-a61e-6eab1eac****' - # 指定Object的存储类型。 - # headers['x-oss-storage-class'] = oss2.BUCKET_STORAGE_CLASS_STANDARD - # 指定Object的对象标签,可同时设置多个标签。 - # headers[OSS_OBJECT_TAGGING] = 'k1=v1&k2=v2&k3=v3' - # upload_id = bucket.init_multipart_upload(key, headers=headers).upload_id - upload_id = bucket.init_multipart_upload(key).upload_id - # 根据upload_id执行取消分片上传事件或者列举已上传分片的操作。 - # 如果您需要根据您需要uploadId执行取消分片上传事件的操作,您需要在调用InitiateMultipartUpload完成初始化分片之后获取uploadId。 - # 如果您需要根据您需要uploadId执行列举已上传分片的操作,您需要在调用InitiateMultipartUpload完成初始化分片之后,且在调用CompleteMultipartUpload完成分片上传之前获取uploadId。 - # print("UploadID:", upload_id) + # determine_part_size方法用于确定分片大小,设置分片大小为 1GB + part_size = determine_part_size(file_size, preferred_size=1 * 1024 * 1024 * 1024) + # 初始化分片 + upload_id = bucket.init_multipart_upload(key, headers=headers).upload_id parts = [] - # 逐个上传分片。 + # 逐个上传分片 with open(local_path, 'rb') as fileobj: part_number = 1 offset = 0 while offset < file_size: num_to_upload = min(part_size, file_size - offset) # 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。 - result = bucket.upload_part(key, upload_id, part_number, SizedFileAdapter(fileobj, num_to_upload)) + logger.info(f"【115】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}") + result = bucket.upload_part(key, upload_id, part_number, + data=SizedFileAdapter(fileobj, num_to_upload), + progress_callback=progress_callback, + headers=headers) parts.append(PartInfo(part_number, result.etag)) + logger.info(f"【115】{target_name} 分片 {part_number} 上传完成") offset += num_to_upload part_number += 1 # 完成分片上传。 @@ -613,30 +591,32 @@ class U115Pan(StorageBase, metaclass=Singleton): # headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PRIVATE try: result = bucket.complete_multipart_upload(key, upload_id, parts, - headers=headers, - progress_callback=progress_callback) - logger.debug(f"【115】上传 Step 6 对象存储结果: {result}") + headers=headers) if result.status == 200: # 构造返回结果 logger.info(f"【115】{target_name} 上传成功") - return schemas.FileItem( - storage=self.schema.value, - fileid=file_id, - type="file", - path=target_path, - name=target_name, - basename=Path(target_name).stem, - extension=Path(target_name).suffix[1:], - size=file_size, - pickcode=pick_code, - modify_time=int(time.time()) - ) else: logger.warn(f"【115】{target_name} 上传失败,错误码: {result.status}") return None except oss2.exceptions.OssError as e: - logger.error(f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}") - return None + if e.code == "FileAlreadyExists": + logger.warn(f"【115】{target_name} 已存在") + else: + logger.error(f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}") + return None + # 返回结果 + return schemas.FileItem( + storage=self.schema.value, + fileid=file_id, + type="file", + path=target_path, + name=target_name, + basename=Path(target_name).stem, + extension=Path(target_name).suffix[1:], + size=file_size, + pickcode=pick_code, + modify_time=int(time.time()) + ) def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]: """