fix 上传进度条

This commit is contained in:
jxxghp
2025-03-29 14:33:45 +08:00
parent 7ffafb49c4
commit c44aa50ef5
3 changed files with 159 additions and 72 deletions

View File

@@ -1,3 +1,4 @@
import base64
import hashlib
import secrets
import threading
@@ -6,6 +7,7 @@ from pathlib import Path
from typing import List, Dict, Optional, Tuple, Union
import requests
from tqdm import tqdm
from app import schemas
from app.core.config import settings
@@ -72,7 +74,7 @@ class AliPan(StorageBase, metaclass=Singleton):
conf = self.get_conf()
drive_id = conf.get("resource_drive_id") or conf.get("backup_drive_id") or conf.get("default_drive_id")
if not drive_id:
raise Exception("请先登录阿里云盘")
raise Exception("【阿里云盘】请先扫码登录")
return drive_id
@property
@@ -171,7 +173,7 @@ class AliPan(StorageBase, metaclass=Singleton):
确认登录后获取相关token
"""
if not self._auth_state:
raise Exception("请先生成二维码")
raise Exception("【阿里云盘】请先生成二维码")
resp = self.session.post(
f"{self.base_url}/oauth/access_token",
json={
@@ -182,10 +184,10 @@ class AliPan(StorageBase, metaclass=Singleton):
}
)
if resp is None:
raise Exception("获取 access_token 失败")
raise Exception("【阿里云盘】获取 access_token 失败")
result = resp.json()
if result.get("code"):
raise Exception(f"{result.get('code')} - {result.get('message')}")
raise Exception(f"【阿里云盘】{result.get('code')} - {result.get('message')}")
return result
def __refresh_access_token(self, refresh_token: str) -> Optional[dict]:
@@ -193,7 +195,7 @@ class AliPan(StorageBase, metaclass=Singleton):
刷新access_token
"""
if not refresh_token:
raise Exception("会话失效,请重新扫码登录!")
raise Exception("【阿里云盘】会话失效,请重新扫码登录!")
resp = self.session.post(
f"{self.base_url}/oauth/access_token",
json={
@@ -259,13 +261,10 @@ class AliPan(StorageBase, metaclass=Singleton):
time.sleep(reset_time + 5)
return self._request_api(method, endpoint, result_key, **kwargs)
# 处理请求错误
resp.raise_for_status()
# 返回数据
ret_data = resp.json()
if ret_data.get("code"):
logger.warn(f"【阿里云盘】{method} 请求 {endpoint} 出错{ret_data.get('message')}")
logger.warn(f"【阿里云盘】{method} {endpoint} 返回{ret_data.get('code')} {ret_data.get('message')}")
if result_key:
return ret_data.get(result_key)
@@ -460,6 +459,38 @@ class AliPan(StorageBase, metaclass=Singleton):
sha1.update(data)
return sha1.hexdigest()
def _calculate_proof_code(self, file_path: Path):
"""
计算秒传所需的proof_code
"""
file_size = file_path.stat().st_size
if file_size == 0:
return ""
# Step 1-3: 计算access_token的MD5并取前16位
md5 = hashlib.md5(self.access_token.encode()).hexdigest()
hex_str = md5[:16]
# Step 4: 转换为无符号int64
try:
tmp_int = int(hex_str, 16)
except ValueError:
raise ValueError("【阿里云盘】Invalid hex string for proof code calculation")
# Step 5-7: 计算读取范围
index = tmp_int % file_size
start = index
end = index + 8
if end > file_size:
end = file_size
# Step 8: 读取文件范围数据并编码
with open(file_path, 'rb') as f:
f.seek(start)
chunk = f.read(end - start)
return base64.b64encode(chunk).decode()
@staticmethod
def _calculate_content_hash(file_path: Path):
"""
@@ -475,19 +506,19 @@ class AliPan(StorageBase, metaclass=Singleton):
return sha1.hexdigest()
def _create_file(self, drive_id: str, parent_file_id: str,
file_name: str, file_path: Path, check_name_mode="auto_rename",
file_name: str, file_path: Path, check_name_mode="refuse",
chunk_size: int = 1 * 1024 * 1024 * 1024):
"""
创建文件请求,尝试秒传
"""
file_size = file_path.stat().st_size
pre_hash = self._calculate_pre_hash(file_path)
content_hash = self._calculate_content_hash(file_path)
num_parts = (file_size + chunk_size - 1) // chunk_size
# 构建分片信息
part_info_list = [{"part_number": i + 1} for i in range(num_parts)]
# 确定是否能秒传
data = {
"drive_id": drive_id,
"parent_file_id": parent_file_id,
@@ -496,19 +527,36 @@ class AliPan(StorageBase, metaclass=Singleton):
"check_name_mode": check_name_mode,
"size": file_size,
"pre_hash": pre_hash,
"content_hash": content_hash,
"content_hash_name": "sha1",
"part_info_list": part_info_list
}
resp = self._request_api(
"POST",
"/adrive/v1.0/openFile/create",
json=data
)
if not resp:
raise Exception("创建文件失败!")
if resp.get("code"):
raise Exception("【阿里云盘】创建文件失败!")
if resp.get("code") == "PreHashMatched":
# 可以秒传
proof_code = self._calculate_proof_code(file_path)
content_hash = self._calculate_content_hash(file_path)
data.pop("pre_hash")
data.update({
"proof_code": proof_code,
"proof_version": "v1",
"content_hash": content_hash,
"content_hash_name": "sha1",
})
resp = self._request_api(
"POST",
"/adrive/v1.0/openFile/create",
json=data
)
if not resp:
raise Exception("【阿里云盘】创建文件失败!")
if resp.get("code"):
raise Exception(resp.get("message"))
else:
raise Exception(resp.get("message"))
return resp
@@ -528,7 +576,7 @@ class AliPan(StorageBase, metaclass=Singleton):
json=data
)
if not resp:
raise Exception("刷新分片上传地址失败!")
raise Exception("【阿里云盘】刷新分片上传地址失败!")
if resp.get("code"):
raise Exception(resp.get("message"))
return resp.get('part_info_list', [])
@@ -542,8 +590,7 @@ class AliPan(StorageBase, metaclass=Singleton):
'Content-Length': str(len(data)),
'Content-Type': 'application/octet-stream'
}
response = requests.put(upload_url, data=data, headers=headers)
return response
return requests.put(upload_url, data=data, headers=headers)
def _list_uploaded_parts(self, drive_id: str, file_id: str, upload_id: str) -> dict:
"""
@@ -560,7 +607,7 @@ class AliPan(StorageBase, metaclass=Singleton):
json=data
)
if not resp:
raise Exception("获取已上传分片失败!")
raise Exception("【阿里云盘】获取已上传分片失败!")
if resp.get("code"):
raise Exception(resp.get("message"))
return resp
@@ -578,7 +625,7 @@ class AliPan(StorageBase, metaclass=Singleton):
json=data
)
if not resp:
raise Exception("完成上传失败!")
raise Exception("【阿里云盘】完成上传失败!")
if resp.get("code"):
raise Exception(resp.get("message"))
return resp
@@ -590,23 +637,23 @@ class AliPan(StorageBase, metaclass=Singleton):
"""
target_name = new_name or local_path.name
target_path = Path(target_dir.path) / target_name
file_size = local_path.stat().st_size
# 1. 创建文件并检查秒传
chunk_size = 1 * 1024 * 1024 * 1024
chunk_size = 100 * 1024 * 1024 # 分片大小 100M
create_res = self._create_file(drive_id=target_dir.drive_id,
parent_file_id=target_dir.fileid,
file_name=new_name,
file_path=local_path,
chunk_size=chunk_size)
if create_res.get('rapid_upload', False):
logger.info(f"{target_name} 秒传完成!")
logger.info(f"【阿里云盘】{target_name} 秒传完成!")
return self.get_item(target_path)
# 2. 准备分片上传参数
file_id = create_res['file_id']
upload_id = create_res['upload_id']
part_info_list = create_res['part_info_list']
file_size = local_path.stat().st_size
file_id = create_res.get('file_id')
upload_id = create_res.get('upload_id')
part_info_list = create_res.get('part_info_list')
uploaded_parts = set()
# 3. 获取已上传分片
@@ -614,48 +661,79 @@ class AliPan(StorageBase, metaclass=Singleton):
for part in uploaded_info.get('uploaded_parts', []):
uploaded_parts.add(part['part_number'])
# 4. 遍历并上传分片
# 4. 初始化进度条
logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path},分片数:{len(part_info_list)}")
progress_bar = tqdm(
total=file_size,
unit='B',
unit_scale=True,
desc="上传进度",
ascii=True
)
# 5. 分片上传循环
with open(local_path, 'rb') as f:
for part_info in part_info_list:
part_num = part_info['part_number']
if part_num in uploaded_parts:
logger.info(f"分片 {part_num} 已存在,跳过上传")
continue
# 计算分片偏移量
# 计算分片参数
start = (part_num - 1) * chunk_size
end = min(start + chunk_size, file_size)
current_chunk_size = end - start
# 更新进度条(已存在的分片)
if part_num in uploaded_parts:
progress_bar.update(current_chunk_size)
continue
# 准备分片数据
f.seek(start)
data = f.read(end - start)
data = f.read(current_chunk_size)
# 尝试上传分片
response = self._upload_part(upload_url=part_info['upload_url'], data=data)
if response.status_code == 200:
logger.info(f"分片 {part_num} 上传成功")
uploaded_parts.add(part_num)
else:
# 刷新地址后重试
logger.info(f"分片 {part_num} 上传失败,刷新地址...")
new_urls = self._refresh_upload_urls(drive_id=target_dir.drive_id, file_id=file_id,
upload_id=upload_id,
part_numbers=[part_num])
if new_urls:
new_url = new_urls[0]['upload_url']
response = self._upload_part(upload_url=new_url, data=data)
if response.status_code == 200:
logger.info(f"分片 {part_num} 重传成功")
uploaded_parts.add(part_num)
# 上传分片(带重试逻辑)
success = False
for attempt in range(3): # 最大重试次数
try:
# 获取当前上传地址(可能刷新)
if attempt > 0:
new_urls = self._refresh_upload_urls(drive_id=target_dir.drive_id, file_id=file_id,
upload_id=upload_id, part_numbers=[part_num])
upload_url = new_urls[0]['upload_url']
else:
raise Exception(f"分片 {part_num} 上传失败: {response.text}")
else:
raise Exception("无法获取新的上传地址")
upload_url = part_info['upload_url']
# 5. 完成上传
# 执行上传
logger.info(
f"【阿里云盘】开始 第{attempt + 1}次 上传 {target_name} 分片 {part_num} ...")
response = self._upload_part(upload_url=upload_url, data=data)
if response is None:
continue
if response.status_code == 200:
success = True
break
else:
logger.warn(
f"【阿里云盘】{target_name} 分片 {part_num}{attempt + 1} 次上传失败:{response.text}")
except Exception as e:
logger.warn(f"【阿里云盘】{target_name} 分片 {part_num} 上传异常: {str(e)}")
# 处理上传结果
if success:
uploaded_parts.add(part_num)
progress_bar.update(current_chunk_size)
else:
raise Exception(f"【阿里云盘】{target_name} 分片 {part_num} 上传失败!")
# 6. 关闭进度条
if progress_bar:
progress_bar.close()
# 7. 完成上传
result = self._complete_upload(drive_id=target_dir.drive_id, file_id=file_id, upload_id=upload_id)
if not result:
raise Exception("完成上传失败!")
raise Exception("【阿里云盘】完成上传失败!")
if result.get("code"):
logger.warn(f"{target_name} 上传失败:{result.get('message')}")
logger.warn(f"【阿里云盘】{target_name} 上传失败:{result.get('message')}")
return self.__get_fileitem(result)
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:

View File

@@ -11,6 +11,7 @@ import oss2
import requests
from oss2 import SizedFileAdapter, determine_part_size
from oss2.models import PartInfo
from tqdm import tqdm
from app import schemas
from app.core.config import settings
@@ -166,7 +167,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
确认登录后获取相关token
"""
if not self._auth_state:
raise Exception("请先生成二维码")
raise Exception("【115】请先生成二维码")
resp = self.session.post(
"https://passportapi.115.com/open/deviceCodeToToken",
data={
@@ -398,16 +399,6 @@ class U115Pan(StorageBase, metaclass=Singleton):
实现带秒传、断点续传和二次认证的文件上传
"""
def progress_callback(consumed_bytes: int, total_bytes: int):
"""
上传进度回调
"""
progress = round(consumed_bytes / total_bytes * 100)
if round(progress, -1) != self._last_progress:
logger.info(f"【115】已上传: {StringUtils.str_filesize(consumed_bytes)}"
f" / {StringUtils.str_filesize(total_bytes)}, 进度: {progress}%")
self._last_progress = round(progress, -1)
def encode_callback(cb: dict):
"""
回调参数Base64编码函数
@@ -559,9 +550,19 @@ class U115Pan(StorageBase, metaclass=Singleton):
# 补充参数
logger.debug(f"【115】上传 Step 6 回调参数:{callback_dict} {callback_var_dict}")
# 填写不能包含Bucket名称在内的Object完整路径例如exampledir/exampleobject.txt。
# determine_part_size方法用于确定分片大小设置分片大小为 1GB
part_size = determine_part_size(file_size, preferred_size=1 * 1024 * 1024 * 1024)
# determine_part_size方法用于确定分片大小设置分片大小为 100M
part_size = determine_part_size(file_size, preferred_size=100 * 1024 * 1024)
# 初始化进度条
logger.info(f"【115】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}")
progress_bar = tqdm(
total=file_size,
unit='B',
unit_scale=True,
desc="上传进度",
ascii=True
)
# 初始化分片
upload_id = bucket.init_multipart_upload(object_name,
params={
@@ -578,12 +579,18 @@ class U115Pan(StorageBase, metaclass=Singleton):
# 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。
logger.info(f"【115】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}")
result = bucket.upload_part(object_name, upload_id, part_number,
data=SizedFileAdapter(fileobj, num_to_upload),
progress_callback=progress_callback)
data=SizedFileAdapter(fileobj, num_to_upload))
parts.append(PartInfo(part_number, result.etag))
logger.info(f"【115】{target_name} 分片 {part_number} 上传完成")
offset += num_to_upload
part_number += 1
# 更新进度
progress_bar.update(num_to_upload)
# 关闭进度条
if progress_bar:
progress_bar.close()
# 请求头
headers = {
'X-oss-callback': encode_callback(callback_dict),

View File

@@ -68,4 +68,6 @@ redis~=5.2.1
async_timeout~=5.0.1; python_full_version < "3.11.3"
packaging~=24.2
cf_clearance~=0.31.0
oss2~=2.19.1
oss2~=2.19.1
tqdm~=4.67.1
setuptools~=65.5.0