rollback upload api

This commit is contained in:
jxxghp
2025-05-09 08:16:44 +08:00
parent ef03989c3f
commit 324ae5c883
2 changed files with 113 additions and 185 deletions

View File

@@ -4,7 +4,6 @@ import io
import secrets
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import List, Dict, Optional, Tuple, Union
@@ -56,13 +55,6 @@ class AliPan(StorageBase, metaclass=Singleton):
# CID和路径缓存
_id_cache: Dict[str, Tuple[str, str]] = {}
# 最大线程数
MAX_WORKERS = 10
# 最大分片大小(1GB)
MAX_PART_SIZE = 1024 * 1024 * 1024
# 最小分片大小(100MB)
MIN_PART_SIZE = 100 * 1024 * 1024
def __init__(self):
super().__init__()
self.session = requests.Session()
@@ -598,6 +590,13 @@ class AliPan(StorageBase, metaclass=Singleton):
raise Exception(resp.get("message"))
return resp.get('part_info_list', [])
@staticmethod
def _upload_part(upload_url: str, data: bytes):
"""
上传单个分片
"""
return requests.put(upload_url, data=data)
def _list_uploaded_parts(self, drive_id: str, file_id: str, upload_id: str) -> dict:
"""
获取已上传分片列表
@@ -636,20 +635,6 @@ class AliPan(StorageBase, metaclass=Singleton):
raise Exception(resp.get("message"))
return resp
def _calc_parts(self, file_size: int) -> Tuple[int, int]:
"""
计算最优分片大小和线程数,在最大分片大小和最小分片大小之间取最优值
:param file_size: 文件大小
:return: 分片大小,线程数
"""
if file_size <= self.MIN_PART_SIZE:
return file_size, 1
if file_size >= self.MAX_PART_SIZE:
part_size = self.MAX_PART_SIZE
else:
part_size = max(self.MIN_PART_SIZE, file_size // self.MAX_WORKERS)
return part_size, (file_size + part_size - 1) // part_size
@staticmethod
def _log_progress(desc: str, total: int) -> tqdm:
"""
@@ -673,40 +658,22 @@ class AliPan(StorageBase, metaclass=Singleton):
miniters=1
)
@staticmethod
def _upload_part(upload_url: str, data: bytes, part_num: int) -> Tuple[int, str, int]:
"""
上传单个分片
"""
try:
response = requests.put(upload_url, data=data)
if response and response.status_code == 200:
logger.info(f"【阿里云盘】分片 {part_num} 上传完成")
return part_num, response.headers.get('ETag', ''), len(data)
else:
raise Exception(f"上传失败: {response.status_code if response else 'No Response'}")
except Exception as e:
logger.error(f"【阿里云盘】分片 {part_num} 上传失败: {str(e)}")
raise
def upload(self, target_dir: schemas.FileItem, local_path: Path,
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
"""
文件上传:多线程分片、支持秒传
文件上传:分片、支持秒传
"""
target_name = new_name or local_path.name
target_path = Path(target_dir.path) / target_name
file_size = local_path.stat().st_size
# 1. 计算分片大小和线程数
part_size, workers = self._calc_parts(file_size)
# 2. 创建文件并检查秒传
# 1. 创建文件并检查秒传
chunk_size = 100 * 1024 * 1024 # 分片大小 100M
create_res = self._create_file(drive_id=target_dir.drive_id,
parent_file_id=target_dir.fileid,
file_name=target_name,
file_path=local_path,
chunk_size=part_size)
chunk_size=chunk_size)
if create_res.get('rapid_upload', False):
logger.info(f"【阿里云盘】{target_name} 秒传完成!")
return self.get_item(target_path)
@@ -715,78 +682,87 @@ class AliPan(StorageBase, metaclass=Singleton):
logger.info(f"【阿里云盘】{target_name} 已存在")
return self.get_item(target_path)
# 3. 准备分片上传参数
# 2. 准备分片上传参数
file_id = create_res.get('file_id')
if not file_id:
logger.warn(f"【阿里云盘】创建 {target_name} 文件失败!")
return None
upload_id = create_res.get('upload_id')
part_info_list = create_res.get('part_info_list')
uploaded_parts = {}
uploaded_parts = set()
# 4. 获取已上传分片
# 3. 获取已上传分片
uploaded_info = self._list_uploaded_parts(drive_id=target_dir.drive_id, file_id=file_id, upload_id=upload_id)
for part in uploaded_info.get('uploaded_parts', []):
uploaded_parts[part['part_number']] = part.get('etag', '')
uploaded_parts.add(part['part_number'])
# 5. 初始化进度条
logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path}"
f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}")
# 4. 初始化进度条
logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path}分片数:{len(part_info_list)}")
progress_bar = self._log_progress(f"【阿里云盘】{target_name} 上传进度", file_size)
# 7. 创建线程池
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = []
# 5. 分片上传循环
with open(local_path, 'rb') as f:
for part_info in part_info_list:
part_num = part_info['part_number']
# 提交上传任务
with open(local_path, 'rb') as f:
for part_info in part_info_list:
part_num = part_info['part_number']
# 计算分片参数
start = (part_num - 1) * chunk_size
end = min(start + chunk_size, file_size)
current_chunk_size = end - start
# 跳过已上传的分片
if part_num in uploaded_parts:
start = (part_num - 1) * part_size
end = min(start + part_size, file_size)
progress_bar.update(end - start)
continue
# 更新进度条(已存在的分片
if part_num in uploaded_parts:
progress_bar.update(current_chunk_size)
continue
# 准备分片数据
start = (part_num - 1) * part_size
end = min(start + part_size, file_size)
f.seek(start)
data = f.read(end - start)
# 准备分片数据
f.seek(start)
data = f.read(current_chunk_size)
# 提交上传任务
future = pool.submit(
self._upload_part,
part_info['upload_url'],
data,
part_num
)
futures.append((part_num, future))
# 上传分片(带重试逻辑)
success = False
for attempt in range(3): # 最大重试次数
try:
# 获取当前上传地址(可能刷新)
if attempt > 0:
new_urls = self._refresh_upload_urls(drive_id=target_dir.drive_id, file_id=file_id,
upload_id=upload_id, part_numbers=[part_num])
upload_url = new_urls[0]['upload_url']
else:
upload_url = part_info['upload_url']
# 等待所有任务完成
for part_num, future in futures:
try:
num, etag, uploaded = future.result()
uploaded_parts[num] = etag
progress_bar.update(uploaded)
except Exception as e:
logger.error(f"【阿里云盘】分片上传失败: {str(e)}")
progress_bar.close()
return None
# 执行上传
logger.info(
f"【阿里云盘】开始 第{attempt + 1}次 上传 {target_name} 分片 {part_num} ...")
response = self._upload_part(upload_url=upload_url, data=data)
if response is None:
continue
if response.status_code == 200:
success = True
break
else:
logger.warn(
f"【阿里云盘】{target_name} 分片 {part_num}{attempt + 1} 次上传失败:{response.text}")
except Exception as e:
logger.warn(f"【阿里云盘】{target_name} 分片 {part_num} 上传异常: {str(e)}")
# 8. 关闭进度条
progress_bar.close()
# 处理上传结果
if success:
uploaded_parts.add(part_num)
progress_bar.update(current_chunk_size)
else:
raise Exception(f"【阿里云盘】{target_name} 分片 {part_num} 上传失败!")
# 9. 完成上传
# 6. 关闭进度条
if progress_bar:
progress_bar.close()
# 7. 完成上传
result = self._complete_upload(drive_id=target_dir.drive_id, file_id=file_id, upload_id=upload_id)
if not result:
raise Exception("【阿里云盘】完成上传失败!")
if result.get("code"):
logger.warn(f"【阿里云盘】{target_name} 上传失败:{result.get('message')}")
return None
return self.__get_fileitem(result, parent=target_dir.path)
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:

View File

@@ -4,12 +4,12 @@ import io
import secrets
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import List, Dict, Optional, Tuple, Union
import oss2
import requests
from oss2 import SizedFileAdapter, determine_part_size
from oss2.models import PartInfo
from tqdm import tqdm
@@ -54,13 +54,6 @@ class U115Pan(StorageBase, metaclass=Singleton):
# CID和路径缓存
_id_cache: Dict[str, str] = {}
# 最大线程数
MAX_WORKERS = 10
# 最大分片大小(1GB)
MAX_PART_SIZE = 1024 * 1024 * 1024
# 最小分片大小(100MB)
MIN_PART_SIZE = 100 * 1024 * 1024
def __init__(self):
super().__init__()
self.session = requests.Session()
@@ -406,35 +399,6 @@ class U115Pan(StorageBase, metaclass=Singleton):
modify_time=int(time.time())
)
def _calc_parts(self, file_size: int) -> Tuple[int, int]:
"""
计算最优分片大小和线程数,在最大分片大小和最小分片大小之间取最优值
:param file_size: 文件大小
:return: 分片大小,线程数
"""
if file_size <= self.MIN_PART_SIZE:
return file_size, 1
if file_size >= self.MAX_PART_SIZE:
part_size = self.MAX_PART_SIZE
else:
part_size = max(self.MIN_PART_SIZE, file_size // self.MAX_WORKERS)
return part_size, (file_size + part_size - 1) // part_size
@staticmethod
def _upload_part(bucket: oss2.Bucket, object_name: str, upload_id: str,
part_number: int, part_data: bytes) -> Tuple[PartInfo, int]:
"""
上传单个分片
"""
try:
result = bucket.upload_part(object_name, upload_id, part_number, part_data)
part_info = PartInfo(part_number, result.etag)
logger.info(f"【115】分片 {part_number} 上传完成")
return part_info, len(part_data)
except Exception as e:
logger.error(f"【115】分片 {part_number} 上传失败: {str(e)}")
raise
@staticmethod
def _log_progress(desc: str, total: int) -> tqdm:
"""
@@ -461,10 +425,10 @@ class U115Pan(StorageBase, metaclass=Singleton):
def upload(self, target_dir: schemas.FileItem, local_path: Path,
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
"""
实现带秒传、断点续传和多线程并发上传
实现带秒传、断点续传和二次认证的文件上传
"""
def encode_callback(cb: str):
def encode_callback(cb: str) -> str:
return oss2.utils.b64encode_as_string(cb)
target_name = new_name or local_path.name
@@ -496,7 +460,6 @@ class U115Pan(StorageBase, metaclass=Singleton):
if not init_resp.get("state"):
logger.warn(f"【115】初始化上传失败: {init_resp.get('error')}")
return None
# 结果
init_result = init_resp.get("data")
logger.debug(f"【115】上传 Step 1 初始化结果: {init_result}")
@@ -514,10 +477,15 @@ class U115Pan(StorageBase, metaclass=Singleton):
sign_checks = sign_check.split("-")
start = int(sign_checks[0])
end = int(sign_checks[1])
# 计算指定区间的SHA1
# sign_check (用下划线隔开,截取上传文内容的sha1(单位是byte): "2392148-2392298"
with open(local_path, "rb") as f:
# 取2392148-2392298之间的内容(包含2392148、2392298)的sha1
f.seek(start)
chunk = f.read(end - start + 1)
sign_val = hashlib.sha1(chunk).hexdigest().upper()
# 重新初始化请求
# sign_keysign_val(根据sign_check计算的值大写的sha1值)
init_data.update({
"pick_code": pick_code,
"sign_key": sign_key,
@@ -530,6 +498,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
)
if not init_resp:
return None
# 二次认证结果
init_result = init_resp.get("data")
logger.debug(f"【115】上传 Step 2 二次认证结果: {init_result}")
if not pick_code:
@@ -556,7 +525,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
logger.warn("【115】获取上传凭证失败")
return None
logger.debug(f"【115】上传 Step 4 获取上传凭证结果: {token_resp}")
# 上传凭证
endpoint = token_resp.get("endpoint")
AccessKeyId = token_resp.get("AccessKeyId")
AccessKeySecret = token_resp.get("AccessKeySecret")
@@ -579,74 +548,60 @@ class U115Pan(StorageBase, metaclass=Singleton):
if resume_resp.get("callback"):
callback = resume_resp["callback"]
# Step 6: 多线程分片上传
# Step 6: 对象存储上传
auth = oss2.StsAuth(
access_key_id=AccessKeyId,
access_key_secret=AccessKeySecret,
security_token=SecurityToken
)
bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa
# 计算分片大小和线程数
part_size, workers = self._calc_parts(file_size)
logger.info(f"【115】开始上传: {local_path} -> {target_path}"
f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}")
# determine_part_size方法用于确定分片大小设置分片大小为 100M
part_size = determine_part_size(file_size, preferred_size=100 * 1024 * 1024)
# 初始化进度条
progress_bar = self._log_progress(f"【115】{target_name} 上传进度", file_size)
logger.info(f"【115】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}")
progress_bar = tqdm(
total=file_size,
unit='B',
unit_scale=True,
desc="上传进度",
ascii=True
)
# 初始化分片上传
# 初始化分片
upload_id = bucket.init_multipart_upload(object_name,
params={
"encoding-type": "url"
"encoding-type": "url",
"sequential": ""
}).upload_id
parts = []
# 逐个上传分片
with open(local_path, 'rb') as fileobj:
part_number = 1
offset = 0
while offset < file_size:
num_to_upload = min(part_size, file_size - offset)
# 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。
logger.info(f"【115】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}")
result = bucket.upload_part(object_name, upload_id, part_number,
data=SizedFileAdapter(fileobj, num_to_upload))
parts.append(PartInfo(part_number, result.etag))
logger.info(f"【115】{target_name} 分片 {part_number} 上传完成")
offset += num_to_upload
part_number += 1
# 更新进度
progress_bar.update(num_to_upload)
# 创建线程池
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = []
parts = []
# 关闭进度条
if progress_bar:
progress_bar.close()
# 提交上传任务
with open(local_path, 'rb') as fileobj:
part_number = 1
offset = 0
while offset < file_size:
size = min(part_size, file_size - offset)
fileobj.seek(offset)
part_data = fileobj.read(size)
future = pool.submit(
self._upload_part,
bucket,
object_name,
upload_id,
part_number,
part_data
)
futures.append(future)
offset += size
part_number += 1
# 等待所有任务完成
for future in as_completed(futures):
try:
part_info, uploaded = future.result()
parts.append(part_info)
progress_bar.update(uploaded)
except Exception as e:
logger.error(f"【115】分片上传失败: {str(e)}")
progress_bar.close()
return None
# 按分片号排序
parts.sort(key=lambda x: x.part_number)
# 完成上传
# 请求头
headers = {
'X-oss-callback': encode_callback(callback["callback"]),
'x-oss-callback-var': encode_callback(callback["callback_var"]),
'x-oss-forbid-overwrite': 'false'
}
try:
result = bucket.complete_multipart_upload(object_name, upload_id, parts,
headers=headers)
@@ -662,9 +617,6 @@ class U115Pan(StorageBase, metaclass=Singleton):
else:
logger.error(f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}")
return None
finally:
progress_bar.close()
# 返回结果
return self.get_item(target_path)