This commit is contained in:
jxxghp
2025-03-26 08:30:30 +08:00
parent 70c183ae2b
commit 9522888a60

View File

@@ -9,7 +9,6 @@ from typing import List, Dict, Optional, Tuple, Union
import oss2
import requests
from oss2 import SizedFileAdapter, determine_part_size
from oss2.credentials import EnvironmentVariableCredentialsProvider
from oss2.models import PartInfo
from app import schemas
@@ -413,10 +412,10 @@ class U115Pan(StorageBase, metaclass=Singleton):
上传进度回调
"""
progress = round(consumed_bytes / total_bytes * 100)
if progress != self._last_progress:
if round(progress, -1) != self._last_progress:
logger.info(f"【115】已上传: {StringUtils.str_filesize(consumed_bytes)}"
f" / {StringUtils.str_filesize(total_bytes)} 字节, 进度: {progress}%")
self._last_progress = progress
f" / {StringUtils.str_filesize(total_bytes)}, 进度: {progress}%")
self._last_progress = round(progress, -1)
def encode_callback(cb_str: str):
"""
@@ -459,8 +458,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
file_id = init_result.get("file_id")
# 回调信息
bucket_name = init_result.get("bucket")
callback_str = init_result.get("callback", {}).get("callback")
callback_var_str = init_result.get("callback", {}).get("callback_var")
callback = init_result.get("callback")
# 二次认证信息
sign_check = init_result.get("sign_check")
pick_code = init_result.get("pick_code")
@@ -501,6 +499,8 @@ class U115Pan(StorageBase, metaclass=Singleton):
bucket_name = init_result.get("bucket")
if not file_id:
file_id = init_result.get("file_id")
if not callback:
callback = init_result.get("callback")
# Step 3: 秒传
if init_result.get("status") == 2:
@@ -549,8 +549,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
if resume_resp:
logger.debug(f"【115】上传 Step 5 断点续传结果: {resume_resp}")
if resume_resp.get("callback"):
callback_str = resume_resp["callback"].get("callback")
callback_var_str = resume_resp["callback"].get("callback_var")
callback = resume_resp["callback"]
# Step 6: 对象存储上传
auth = oss2.StsAuth(
@@ -560,52 +559,31 @@ class U115Pan(StorageBase, metaclass=Singleton):
)
bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa
headers = {
'x-oss-callback': encode_callback(callback_str),
'x-oss-callback-var': encode_callback(callback_var_str)
'x-oss-callback': encode_callback(callback.get("callback")),
'x-oss-callback-var': encode_callback(callback.get("callback_var"))
}
logger.info(f"【115】开始上传: {local_path} -> {target_name}")
# 填写不能包含Bucket名称在内的Object完整路径例如exampledir/exampleobject.txt。
key = target_path[1:]
# determine_part_size方法用于确定分片大小。分片最小值为100 KB最大值为5 GB。最后一个分片的大小允许小于100 KB。设置分片大小为 1GB
preferred_size = 1 * 1024 * 1024 * 1024
part_size = determine_part_size(file_size, preferred_size=preferred_size)
# 初始化分片。
# 如需在初始化分片时设置文件存储类型请在init_multipart_upload中设置相关Headers参考如下。
# headers = dict()
# 指定该Object的网页缓存行为。
# headers['Cache-Control'] = 'no-cache'
# 指定该Object被下载时的名称。
# headers['Content-Disposition'] = 'oss_MultipartUpload.txt'
# 指定过期时间,单位为毫秒。
# headers['Expires'] = '1000'
# 指定初始化分片上传时是否覆盖同名Object。此处设置为true表示禁止覆盖同名Object。
# headers['x-oss-forbid-overwrite'] = 'true'
# 指定上传该Object的每个Part时使用的服务器端加密方式。
# headers[OSS_SERVER_SIDE_ENCRYPTION] = SERVER_SIDE_ENCRYPTION_KMS
# 指定Object的加密算法。如果未指定此选项表明Object使用AES256加密算法。
# headers[OSS_SERVER_SIDE_DATA_ENCRYPTION] = SERVER_SIDE_ENCRYPTION_KMS
# 表示KMS托管的用户主密钥。
# headers[OSS_SERVER_SIDE_ENCRYPTION_KEY_ID] = '9468da86-3509-4f8d-a61e-6eab1eac****'
# 指定Object的存储类型。
# headers['x-oss-storage-class'] = oss2.BUCKET_STORAGE_CLASS_STANDARD
# 指定Object的对象标签可同时设置多个标签。
# headers[OSS_OBJECT_TAGGING] = 'k1=v1&k2=v2&k3=v3'
# upload_id = bucket.init_multipart_upload(key, headers=headers).upload_id
upload_id = bucket.init_multipart_upload(key).upload_id
# 根据upload_id执行取消分片上传事件或者列举已上传分片的操作。
# 如果您需要根据您需要uploadId执行取消分片上传事件的操作您需要在调用InitiateMultipartUpload完成初始化分片之后获取uploadId。
# 如果您需要根据您需要uploadId执行列举已上传分片的操作您需要在调用InitiateMultipartUpload完成初始化分片之后且在调用CompleteMultipartUpload完成分片上传之前获取uploadId。
# print("UploadID:", upload_id)
# determine_part_size方法用于确定分片大小设置分片大小为 1GB
part_size = determine_part_size(file_size, preferred_size=1 * 1024 * 1024 * 1024)
# 初始化分片
upload_id = bucket.init_multipart_upload(key, headers=headers).upload_id
parts = []
# 逐个上传分片
# 逐个上传分片
with open(local_path, 'rb') as fileobj:
part_number = 1
offset = 0
while offset < file_size:
num_to_upload = min(part_size, file_size - offset)
# 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。
result = bucket.upload_part(key, upload_id, part_number, SizedFileAdapter(fileobj, num_to_upload))
logger.info(f"【115】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}")
result = bucket.upload_part(key, upload_id, part_number,
data=SizedFileAdapter(fileobj, num_to_upload),
progress_callback=progress_callback,
headers=headers)
parts.append(PartInfo(part_number, result.etag))
logger.info(f"【115】{target_name} 分片 {part_number} 上传完成")
offset += num_to_upload
part_number += 1
# 完成分片上传。
@@ -613,30 +591,32 @@ class U115Pan(StorageBase, metaclass=Singleton):
# headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PRIVATE
try:
result = bucket.complete_multipart_upload(key, upload_id, parts,
headers=headers,
progress_callback=progress_callback)
logger.debug(f"【115】上传 Step 6 对象存储结果: {result}")
headers=headers)
if result.status == 200:
# 构造返回结果
logger.info(f"【115】{target_name} 上传成功")
return schemas.FileItem(
storage=self.schema.value,
fileid=file_id,
type="file",
path=target_path,
name=target_name,
basename=Path(target_name).stem,
extension=Path(target_name).suffix[1:],
size=file_size,
pickcode=pick_code,
modify_time=int(time.time())
)
else:
logger.warn(f"【115】{target_name} 上传失败,错误码: {result.status}")
return None
except oss2.exceptions.OssError as e:
logger.error(f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}")
return None
if e.code == "FileAlreadyExists":
logger.warn(f"【115】{target_name} 已存在")
else:
logger.error(f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}")
return None
# 返回结果
return schemas.FileItem(
storage=self.schema.value,
fileid=file_id,
type="file",
path=target_path,
name=target_name,
basename=Path(target_name).stem,
extension=Path(target_name).suffix[1:],
size=file_size,
pickcode=pick_code,
modify_time=int(time.time())
)
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
"""