From ff75db310f1575ecd30507dc1c93aa6c942751f2 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 8 May 2025 12:03:39 +0800 Subject: [PATCH] fix upload parts --- app/modules/filemanager/storages/alipan.py | 45 +++++++----------- app/modules/filemanager/storages/u115.py | 53 ++++++++-------------- 2 files changed, 36 insertions(+), 62 deletions(-) diff --git a/app/modules/filemanager/storages/alipan.py b/app/modules/filemanager/storages/alipan.py index 4190e943..efadc885 100644 --- a/app/modules/filemanager/storages/alipan.py +++ b/app/modules/filemanager/storages/alipan.py @@ -6,7 +6,6 @@ import threading import time from concurrent.futures import ThreadPoolExecutor from pathlib import Path -from queue import Queue, Empty from typing import List, Dict, Optional, Tuple, Union import requests @@ -61,6 +60,8 @@ class AliPan(StorageBase, metaclass=Singleton): MAX_WORKERS = 10 # 最大分片大小(1GB) MAX_PART_SIZE = 1024 * 1024 * 1024 + # 最小分片大小(100MB) + MIN_PART_SIZE = 100 * 1024 * 1024 def __init__(self): super().__init__() @@ -635,17 +636,17 @@ class AliPan(StorageBase, metaclass=Singleton): def _calc_parts(self, file_size: int) -> Tuple[int, int]: """ - 计算最优分片大小和线程数 + 计算最优分片大小和线程数,在最大分片大小和最小分片大小之间取最优值 + :param file_size: 文件大小 + :return: 分片大小,线程数 """ - # 根据文件大小计算合适的分片数 - if file_size <= 100 * 1024 * 1024: # 小于100MB - return 10 * 1024 * 1024, min(3, self.MAX_WORKERS) # 10MB分片 - elif file_size <= 1024 * 1024 * 1024: # 小于1GB - return 100 * 1024 * 1024, min(5, self.MAX_WORKERS) # 100MB分片 + if file_size <= self.MIN_PART_SIZE: + return file_size, 1 + if file_size >= self.MAX_PART_SIZE: + part_size = self.MAX_PART_SIZE else: - # 文件较大,使用较大分片 - part_size = min(file_size // self.MAX_WORKERS, self.MAX_PART_SIZE) - return part_size, min(file_size // part_size + 1, self.MAX_WORKERS) + part_size = max(self.MIN_PART_SIZE, file_size // self.MAX_WORKERS) + return part_size, (file_size + part_size - 1) // part_size @staticmethod def _log_progress(desc: str, total: int) -> tqdm: @@ -671,18 +672,15 @@ class AliPan(StorageBase, metaclass=Singleton): ) @staticmethod - def _upload_part(upload_url: str, data: bytes, part_num: int, - progress_queue: Queue) -> Tuple[int, str]: + def _upload_part(upload_url: str, data: bytes, part_num: int) -> Tuple[int, str, int]: """ 上传单个分片 """ try: response = requests.put(upload_url, data=data) if response and response.status_code == 200: - # 将上传进度放入队列 - progress_queue.put(len(data)) logger.info(f"【阿里云盘】分片 {part_num} 上传完成") - return part_num, response.headers.get('ETag', '') + return part_num, response.headers.get('ETag', ''), len(data) else: raise Exception(f"上传失败: {response.status_code if response else 'No Response'}") except Exception as e: @@ -734,9 +732,6 @@ class AliPan(StorageBase, metaclass=Singleton): f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}") progress_bar = self._log_progress(f"【阿里云盘】{target_name} 上传进度", file_size) - # 6. 创建进度队列 - progress_queue = Queue() - # 7. 创建线程池 with ThreadPoolExecutor(max_workers=workers) as pool: futures = [] @@ -764,24 +759,16 @@ class AliPan(StorageBase, metaclass=Singleton): self._upload_part, part_info['upload_url'], data, - part_num, - progress_queue + part_num ) futures.append((part_num, future)) - # 更新进度条 - while len(uploaded_parts) < len(part_info_list): - try: - uploaded = progress_queue.get(timeout=1) - progress_bar.update(uploaded) - except Empty: - pass - # 等待所有任务完成 for part_num, future in futures: try: - num, etag = future.result() + num, etag, uploaded = future.result() uploaded_parts[num] = etag + progress_bar.update(uploaded) except Exception as e: logger.error(f"【阿里云盘】分片上传失败: {str(e)}") progress_bar.close() diff --git a/app/modules/filemanager/storages/u115.py b/app/modules/filemanager/storages/u115.py index 584b2c7b..e3aa8524 100644 --- a/app/modules/filemanager/storages/u115.py +++ b/app/modules/filemanager/storages/u115.py @@ -1,13 +1,11 @@ import base64 import hashlib import io -import json import secrets import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path -from queue import Queue, Empty from typing import List, Dict, Optional, Tuple, Union import oss2 @@ -60,6 +58,8 @@ class U115Pan(StorageBase, metaclass=Singleton): MAX_WORKERS = 10 # 最大分片大小(1GB) MAX_PART_SIZE = 1024 * 1024 * 1024 + # 最小分片大小(100MB) + MIN_PART_SIZE = 100 * 1024 * 1024 def __init__(self): super().__init__() @@ -382,7 +382,7 @@ class U115Pan(StorageBase, metaclass=Singleton): "POST", "/open/folder/add", data={ - "pid": int(parent_item.fileid), + "pid": int(parent_item.fileid or "0"), "file_name": name } ) @@ -408,31 +408,29 @@ class U115Pan(StorageBase, metaclass=Singleton): def _calc_parts(self, file_size: int) -> Tuple[int, int]: """ - 计算最优分片大小和线程数 + 计算最优分片大小和线程数,在最大分片大小和最小分片大小之间取最优值 + :param file_size: 文件大小 + :return: 分片大小,线程数 """ - # 根据文件大小计算合适的分片数 - if file_size <= 100 * 1024 * 1024: # 小于100MB - return 10 * 1024 * 1024, min(3, self.MAX_WORKERS) # 10MB分片 - elif file_size <= 1024 * 1024 * 1024: # 小于1GB - return 100 * 1024 * 1024, min(5, self.MAX_WORKERS) # 100MB分片 + if file_size <= self.MIN_PART_SIZE: + return file_size, 1 + if file_size >= self.MAX_PART_SIZE: + part_size = self.MAX_PART_SIZE else: - # 文件较大,使用较大分片 - part_size = min(file_size // self.MAX_WORKERS, self.MAX_PART_SIZE) - return part_size, min(file_size // part_size + 1, self.MAX_WORKERS) + part_size = max(self.MIN_PART_SIZE, file_size // self.MAX_WORKERS) + return part_size, (file_size + part_size - 1) // part_size @staticmethod def _upload_part(bucket: oss2.Bucket, object_name: str, upload_id: str, - part_number: int, part_data: bytes, progress_queue: Queue) -> PartInfo: + part_number: int, part_data: bytes) -> Tuple[PartInfo, int]: """ 上传单个分片 """ try: result = bucket.upload_part(object_name, upload_id, part_number, part_data) part_info = PartInfo(part_number, result.etag) - # 将上传进度放入队列 - progress_queue.put(len(part_data)) logger.info(f"【115】分片 {part_number} 上传完成") - return part_info + return part_info, len(part_data) except Exception as e: logger.error(f"【115】分片 {part_number} 上传失败: {str(e)}") raise @@ -444,7 +442,7 @@ class U115Pan(StorageBase, metaclass=Singleton): """ class TqdmToLogger(io.StringIO): - def write(s, buf): # noqa + def write(s, buf): # noqa buf = buf.strip('\r\n\t ') if buf: logger.info(buf) @@ -466,8 +464,8 @@ class U115Pan(StorageBase, metaclass=Singleton): 实现带秒传、断点续传和多线程并发上传 """ - def encode_callback(cb: dict): - return oss2.utils.b64encode_as_string(json.dumps(cb).strip()) + def encode_callback(cb: str): + return oss2.utils.b64encode_as_string(cb) target_name = new_name or local_path.name target_path = Path(target_dir.path) / target_name @@ -604,9 +602,6 @@ class U115Pan(StorageBase, metaclass=Singleton): "sequential": "" }).upload_id - # 创建进度队列 - progress_queue = Queue() - # 创建线程池 with ThreadPoolExecutor(max_workers=workers) as pool: futures = [] @@ -626,26 +621,18 @@ class U115Pan(StorageBase, metaclass=Singleton): object_name, upload_id, part_number, - part_data, - progress_queue + part_data ) futures.append(future) offset += size part_number += 1 - # 更新进度条 - while len(parts) < len(futures): - try: - uploaded = progress_queue.get(timeout=1) - progress_bar.update(uploaded) - except Empty: - pass - # 等待所有任务完成 for future in as_completed(futures): try: - part_info = future.result() + part_info, uploaded = future.result() parts.append(part_info) + progress_bar.update(uploaded) except Exception as e: logger.error(f"【115】分片上传失败: {str(e)}") progress_bar.close()