fix upload parts

This commit is contained in:
jxxghp
2025-05-08 12:03:39 +08:00
parent 5788520401
commit ff75db310f
2 changed files with 36 additions and 62 deletions

View File

@@ -6,7 +6,6 @@ import threading
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from queue import Queue, Empty
from typing import List, Dict, Optional, Tuple, Union
import requests
@@ -61,6 +60,8 @@ class AliPan(StorageBase, metaclass=Singleton):
MAX_WORKERS = 10
# 最大分片大小(1GB)
MAX_PART_SIZE = 1024 * 1024 * 1024
# 最小分片大小(100MB)
MIN_PART_SIZE = 100 * 1024 * 1024
def __init__(self):
super().__init__()
@@ -635,17 +636,17 @@ class AliPan(StorageBase, metaclass=Singleton):
def _calc_parts(self, file_size: int) -> Tuple[int, int]:
"""
计算最优分片大小和线程数
计算最优分片大小和线程数,在最大分片大小和最小分片大小之间取最优值
:param file_size: 文件大小
:return: 分片大小,线程数
"""
# 根据文件大小计算合适的分片数
if file_size <= 100 * 1024 * 1024: # 小于100MB
return 10 * 1024 * 1024, min(3, self.MAX_WORKERS) # 10MB分片
elif file_size <= 1024 * 1024 * 1024: # 小于1GB
return 100 * 1024 * 1024, min(5, self.MAX_WORKERS) # 100MB分片
if file_size <= self.MIN_PART_SIZE:
return file_size, 1
if file_size >= self.MAX_PART_SIZE:
part_size = self.MAX_PART_SIZE
else:
# 文件较大,使用较大分片
part_size = min(file_size // self.MAX_WORKERS, self.MAX_PART_SIZE)
return part_size, min(file_size // part_size + 1, self.MAX_WORKERS)
part_size = max(self.MIN_PART_SIZE, file_size // self.MAX_WORKERS)
return part_size, (file_size + part_size - 1) // part_size
@staticmethod
def _log_progress(desc: str, total: int) -> tqdm:
@@ -671,18 +672,15 @@ class AliPan(StorageBase, metaclass=Singleton):
)
@staticmethod
def _upload_part(upload_url: str, data: bytes, part_num: int,
progress_queue: Queue) -> Tuple[int, str]:
def _upload_part(upload_url: str, data: bytes, part_num: int) -> Tuple[int, str, int]:
"""
上传单个分片
"""
try:
response = requests.put(upload_url, data=data)
if response and response.status_code == 200:
# 将上传进度放入队列
progress_queue.put(len(data))
logger.info(f"【阿里云盘】分片 {part_num} 上传完成")
return part_num, response.headers.get('ETag', '')
return part_num, response.headers.get('ETag', ''), len(data)
else:
raise Exception(f"上传失败: {response.status_code if response else 'No Response'}")
except Exception as e:
@@ -734,9 +732,6 @@ class AliPan(StorageBase, metaclass=Singleton):
f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}")
progress_bar = self._log_progress(f"【阿里云盘】{target_name} 上传进度", file_size)
# 6. 创建进度队列
progress_queue = Queue()
# 7. 创建线程池
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = []
@@ -764,24 +759,16 @@ class AliPan(StorageBase, metaclass=Singleton):
self._upload_part,
part_info['upload_url'],
data,
part_num,
progress_queue
part_num
)
futures.append((part_num, future))
# 更新进度条
while len(uploaded_parts) < len(part_info_list):
try:
uploaded = progress_queue.get(timeout=1)
progress_bar.update(uploaded)
except Empty:
pass
# 等待所有任务完成
for part_num, future in futures:
try:
num, etag = future.result()
num, etag, uploaded = future.result()
uploaded_parts[num] = etag
progress_bar.update(uploaded)
except Exception as e:
logger.error(f"【阿里云盘】分片上传失败: {str(e)}")
progress_bar.close()

View File

@@ -1,13 +1,11 @@
import base64
import hashlib
import io
import json
import secrets
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from queue import Queue, Empty
from typing import List, Dict, Optional, Tuple, Union
import oss2
@@ -60,6 +58,8 @@ class U115Pan(StorageBase, metaclass=Singleton):
MAX_WORKERS = 10
# 最大分片大小(1GB)
MAX_PART_SIZE = 1024 * 1024 * 1024
# 最小分片大小(100MB)
MIN_PART_SIZE = 100 * 1024 * 1024
def __init__(self):
super().__init__()
@@ -382,7 +382,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
"POST",
"/open/folder/add",
data={
"pid": int(parent_item.fileid),
"pid": int(parent_item.fileid or "0"),
"file_name": name
}
)
@@ -408,31 +408,29 @@ class U115Pan(StorageBase, metaclass=Singleton):
def _calc_parts(self, file_size: int) -> Tuple[int, int]:
"""
计算最优分片大小和线程数
计算最优分片大小和线程数,在最大分片大小和最小分片大小之间取最优值
:param file_size: 文件大小
:return: 分片大小,线程数
"""
# 根据文件大小计算合适的分片数
if file_size <= 100 * 1024 * 1024: # 小于100MB
return 10 * 1024 * 1024, min(3, self.MAX_WORKERS) # 10MB分片
elif file_size <= 1024 * 1024 * 1024: # 小于1GB
return 100 * 1024 * 1024, min(5, self.MAX_WORKERS) # 100MB分片
if file_size <= self.MIN_PART_SIZE:
return file_size, 1
if file_size >= self.MAX_PART_SIZE:
part_size = self.MAX_PART_SIZE
else:
# 文件较大,使用较大分片
part_size = min(file_size // self.MAX_WORKERS, self.MAX_PART_SIZE)
return part_size, min(file_size // part_size + 1, self.MAX_WORKERS)
part_size = max(self.MIN_PART_SIZE, file_size // self.MAX_WORKERS)
return part_size, (file_size + part_size - 1) // part_size
@staticmethod
def _upload_part(bucket: oss2.Bucket, object_name: str, upload_id: str,
part_number: int, part_data: bytes, progress_queue: Queue) -> PartInfo:
part_number: int, part_data: bytes) -> Tuple[PartInfo, int]:
"""
上传单个分片
"""
try:
result = bucket.upload_part(object_name, upload_id, part_number, part_data)
part_info = PartInfo(part_number, result.etag)
# 将上传进度放入队列
progress_queue.put(len(part_data))
logger.info(f"【115】分片 {part_number} 上传完成")
return part_info
return part_info, len(part_data)
except Exception as e:
logger.error(f"【115】分片 {part_number} 上传失败: {str(e)}")
raise
@@ -444,7 +442,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
"""
class TqdmToLogger(io.StringIO):
def write(s, buf): # noqa
def write(s, buf): # noqa
buf = buf.strip('\r\n\t ')
if buf:
logger.info(buf)
@@ -466,8 +464,8 @@ class U115Pan(StorageBase, metaclass=Singleton):
实现带秒传、断点续传和多线程并发上传
"""
def encode_callback(cb: dict):
return oss2.utils.b64encode_as_string(json.dumps(cb).strip())
def encode_callback(cb: str):
return oss2.utils.b64encode_as_string(cb)
target_name = new_name or local_path.name
target_path = Path(target_dir.path) / target_name
@@ -604,9 +602,6 @@ class U115Pan(StorageBase, metaclass=Singleton):
"sequential": ""
}).upload_id
# 创建进度队列
progress_queue = Queue()
# 创建线程池
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = []
@@ -626,26 +621,18 @@ class U115Pan(StorageBase, metaclass=Singleton):
object_name,
upload_id,
part_number,
part_data,
progress_queue
part_data
)
futures.append(future)
offset += size
part_number += 1
# 更新进度条
while len(parts) < len(futures):
try:
uploaded = progress_queue.get(timeout=1)
progress_bar.update(uploaded)
except Empty:
pass
# 等待所有任务完成
for future in as_completed(futures):
try:
part_info = future.result()
part_info, uploaded = future.result()
parts.append(part_info)
progress_bar.update(uploaded)
except Exception as e:
logger.error(f"【115】分片上传失败: {str(e)}")
progress_bar.close()