try fix 115 upload

This commit is contained in:
jxxghp
2025-03-26 07:15:31 +08:00
parent 5d56eb9bef
commit 70c183ae2b

View File

@@ -8,6 +8,9 @@ from typing import List, Dict, Optional, Tuple, Union
import oss2
import requests
from oss2 import SizedFileAdapter, determine_part_size
from oss2.credentials import EnvironmentVariableCredentialsProvider
from oss2.models import PartInfo
from app import schemas
from app.core.config import settings
@@ -421,8 +424,9 @@ class U115Pan(StorageBase, metaclass=Singleton):
"""
return oss2.compat.to_string(base64.b64encode(oss2.compat.to_bytes(cb_str)))
# 计算文件特征值
target_name = new_name or local_path.name
target_path = str(Path(target_dir.path) / target_name)
# 计算文件特征值
file_size = local_path.stat().st_size
file_sha1 = self._calc_sha1(local_path)
file_preid = self._calc_sha1(local_path, 128 * 1024 * 1024)
@@ -451,7 +455,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
return None
# 结果
init_result = init_resp.get("data")
logger.debug(f"【115】上传 Step 1 结果: {init_result}")
logger.debug(f"【115】上传 Step 1 初始化结果: {init_result}")
file_id = init_result.get("file_id")
# 回调信息
bucket_name = init_result.get("bucket")
@@ -490,7 +494,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
return None
# 二次认证结果
init_result = init_resp.get("data")
logger.debug(f"【115】上传 Step 2 结果: {init_result}")
logger.debug(f"【115】上传 Step 2 二次认证结果: {init_result}")
if not pick_code:
pick_code = init_result.get("pick_code")
if not bucket_name:
@@ -504,10 +508,10 @@ class U115Pan(StorageBase, metaclass=Singleton):
return schemas.FileItem(
storage=self.schema.value,
fileid=file_id,
path=str(Path(target_dir.path) / target_name),
path=target_path,
name=target_name,
basename=Path(target_dir.name).stem,
extension=Path(target_dir.name).suffix[1:],
basename=Path(target_name).stem,
extension=Path(target_name).suffix[1:],
size=file_size,
type="file",
pickcode=pick_code,
@@ -523,14 +527,36 @@ class U115Pan(StorageBase, metaclass=Singleton):
if not token_resp:
logger.warn("【115】获取上传凭证失败")
return None
logger.debug(f"【115】上传 Step 4 结果: {token_resp}")
logger.debug(f"【115】上传 Step 4 获取上传凭证结果: {token_resp}")
# 上传凭证
endpoint = token_resp.get("endpoint")
AccessKeyId = token_resp.get("AccessKeyId")
AccessKeySecret = token_resp.get("AccessKeySecret")
SecurityToken = token_resp.get("SecurityToken")
# Step 5: 对象存储上
endpoint = token_resp["endpoint"]
# Step 5: 断点续
resume_resp = self._request_api(
"POST",
"/open/upload/resume",
"data",
data={
"file_size": file_size,
"target": target_param,
"fileid": file_sha1,
"pick_code": pick_code
}
)
if resume_resp:
logger.debug(f"【115】上传 Step 5 断点续传结果: {resume_resp}")
if resume_resp.get("callback"):
callback_str = resume_resp["callback"].get("callback")
callback_var_str = resume_resp["callback"].get("callback_var")
# Step 6: 对象存储上传
auth = oss2.StsAuth(
access_key_id=token_resp['AccessKeyId'],
access_key_secret=token_resp['AccessKeySecret'],
security_token=token_resp['SecurityToken']
access_key_id=AccessKeyId,
access_key_secret=AccessKeySecret,
security_token=SecurityToken
)
bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa
headers = {
@@ -538,35 +564,79 @@ class U115Pan(StorageBase, metaclass=Singleton):
'x-oss-callback-var': encode_callback(callback_var_str)
}
logger.info(f"【115】开始上传: {local_path} -> {target_name}")
with open(local_path, "rb") as f:
try:
result = bucket.put_object(
target_name,
data=f,
headers=headers,
progress_callback=progress_callback
# 填写不能包含Bucket名称在内的Object完整路径例如exampledir/exampleobject.txt。
key = target_path[1:]
# determine_part_size方法用于确定分片大小。分片最小值为100 KB最大值为5 GB。最后一个分片的大小允许小于100 KB。设置分片大小为 1GB。
preferred_size = 1 * 1024 * 1024 * 1024
part_size = determine_part_size(file_size, preferred_size=preferred_size)
# 初始化分片。
# 如需在初始化分片时设置文件存储类型请在init_multipart_upload中设置相关Headers参考如下。
# headers = dict()
# 指定该Object的网页缓存行为。
# headers['Cache-Control'] = 'no-cache'
# 指定该Object被下载时的名称。
# headers['Content-Disposition'] = 'oss_MultipartUpload.txt'
# 指定过期时间,单位为毫秒。
# headers['Expires'] = '1000'
# 指定初始化分片上传时是否覆盖同名Object。此处设置为true表示禁止覆盖同名Object。
# headers['x-oss-forbid-overwrite'] = 'true'
# 指定上传该Object的每个Part时使用的服务器端加密方式。
# headers[OSS_SERVER_SIDE_ENCRYPTION] = SERVER_SIDE_ENCRYPTION_KMS
# 指定Object的加密算法。如果未指定此选项表明Object使用AES256加密算法。
# headers[OSS_SERVER_SIDE_DATA_ENCRYPTION] = SERVER_SIDE_ENCRYPTION_KMS
# 表示KMS托管的用户主密钥。
# headers[OSS_SERVER_SIDE_ENCRYPTION_KEY_ID] = '9468da86-3509-4f8d-a61e-6eab1eac****'
# 指定Object的存储类型。
# headers['x-oss-storage-class'] = oss2.BUCKET_STORAGE_CLASS_STANDARD
# 指定Object的对象标签可同时设置多个标签。
# headers[OSS_OBJECT_TAGGING] = 'k1=v1&k2=v2&k3=v3'
# upload_id = bucket.init_multipart_upload(key, headers=headers).upload_id
upload_id = bucket.init_multipart_upload(key).upload_id
# 根据upload_id执行取消分片上传事件或者列举已上传分片的操作。
# 如果您需要根据您需要uploadId执行取消分片上传事件的操作您需要在调用InitiateMultipartUpload完成初始化分片之后获取uploadId。
# 如果您需要根据您需要uploadId执行列举已上传分片的操作您需要在调用InitiateMultipartUpload完成初始化分片之后且在调用CompleteMultipartUpload完成分片上传之前获取uploadId。
# print("UploadID:", upload_id)
parts = []
# 逐个上传分片。
with open(local_path, 'rb') as fileobj:
part_number = 1
offset = 0
while offset < file_size:
num_to_upload = min(part_size, file_size - offset)
# 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。
result = bucket.upload_part(key, upload_id, part_number, SizedFileAdapter(fileobj, num_to_upload))
parts.append(PartInfo(part_number, result.etag))
offset += num_to_upload
part_number += 1
# 完成分片上传。
# 设置文件访问权限ACL。此处设置为OBJECT_ACL_PRIVATE表示私有权限。
# headers["x-oss-object-acl"] = oss2.OBJECT_ACL_PRIVATE
try:
result = bucket.complete_multipart_upload(key, upload_id, parts,
headers=headers,
progress_callback=progress_callback)
logger.debug(f"【115】上传 Step 6 对象存储结果: {result}")
if result.status == 200:
# 构造返回结果
logger.info(f"【115】{target_name} 上传成功")
return schemas.FileItem(
storage=self.schema.value,
fileid=file_id,
type="file",
path=target_path,
name=target_name,
basename=Path(target_name).stem,
extension=Path(target_name).suffix[1:],
size=file_size,
pickcode=pick_code,
modify_time=int(time.time())
)
if result.status == 200:
# 构造返回结果
logger.info(f"【115】{target_name} 上传成功")
return schemas.FileItem(
storage=self.schema.value,
fileid=file_id,
type="file",
path=str(Path(target_dir.path) / target_name),
name=target_name,
basename=Path(target_name).stem,
extension=Path(target_name).suffix[1:],
size=file_size,
pickcode=pick_code,
modify_time=int(time.time())
)
else:
logger.warn(f"【115】{target_name} 上传失败,错误码: {result.status}")
return None
except oss2.exceptions.OssError as e:
logger.error(f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}")
else:
logger.warn(f"【115】{target_name} 上传失败,错误码: {result.status}")
return None
except oss2.exceptions.OssError as e:
logger.error(f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}")
return None
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
"""