From ea31072ae59fa01aaa93ac102d9b346a1601df4c Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 8 May 2025 09:52:32 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96AliPan=E7=B1=BB=E7=9A=84?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E4=B8=8A=E4=BC=A0=E5=8A=9F=E8=83=BD=EF=BC=8C?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=A4=9A=E7=BA=BF=E7=A8=8B=E5=88=86=E7=89=87?= =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E5=92=8C=E5=8A=A8=E6=80=81=E5=88=86=E7=89=87?= =?UTF-8?q?=E8=AE=A1=E7=AE=97=EF=BC=8C=E6=8F=90=E5=8D=87=E4=B8=8A=E4=BC=A0?= =?UTF-8?q?=E6=95=88=E7=8E=87=E5=92=8C=E8=BF=9B=E5=BA=A6=E7=9B=91=E6=8E=A7?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/modules/filemanager/storages/alipan.py | 201 +++++++++++++-------- 1 file changed, 130 insertions(+), 71 deletions(-) diff --git a/app/modules/filemanager/storages/alipan.py b/app/modules/filemanager/storages/alipan.py index 1c3db540..9b02f4e1 100644 --- a/app/modules/filemanager/storages/alipan.py +++ b/app/modules/filemanager/storages/alipan.py @@ -3,8 +3,11 @@ import hashlib import secrets import threading import time +from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path +from queue import Queue from typing import List, Dict, Optional, Tuple, Union +import io import requests from tqdm import tqdm @@ -50,6 +53,11 @@ class AliPan(StorageBase, metaclass=Singleton): # CID和路径缓存 _id_cache: Dict[str, Tuple[str, str]] = {} + # 最大线程数 + MAX_WORKERS = 10 + # 最大分片大小(1GB) + MAX_PART_SIZE = 1024 * 1024 * 1024 + def __init__(self): super().__init__() self.session = requests.Session() @@ -628,22 +636,77 @@ class AliPan(StorageBase, metaclass=Singleton): raise Exception(resp.get("message")) return resp + def _calc_parts(self, file_size: int) -> Tuple[int, int]: + """ + 计算最优分片大小和线程数 + """ + # 根据文件大小计算合适的分片数 + if file_size <= 100 * 1024 * 1024: # 小于100MB + return 10 * 1024 * 1024, min(3, self.MAX_WORKERS) # 10MB分片 + elif file_size <= 1024 * 1024 * 1024: # 小于1GB + return 100 * 1024 * 1024, min(5, self.MAX_WORKERS) # 100MB分片 + else: + # 文件较大,使用较大分片 + part_size = min(file_size // self.MAX_WORKERS, self.MAX_PART_SIZE) + return part_size, min(file_size // part_size + 1, self.MAX_WORKERS) + + def _log_progress(self, desc: str, total: int) -> tqdm: + """ + 创建一个可以输出到日志的进度条 + """ + class TqdmToLogger(io.StringIO): + def write(s, buf): + buf = buf.strip('\r\n\t ') + if buf: + logger.info(buf) + + return tqdm( + total=total, + unit='B', + unit_scale=True, + desc=desc, + file=TqdmToLogger(), + mininterval=1.0, + maxinterval=5.0, + miniters=1 + ) + + def _upload_part(self, upload_url: str, data: bytes, part_num: int, + progress_queue: Queue) -> Tuple[int, str]: + """ + 上传单个分片 + """ + try: + response = requests.put(upload_url, data=data) + if response and response.status_code == 200: + # 将上传进度放入队列 + progress_queue.put(len(data)) + logger.info(f"【阿里云盘】分片 {part_num} 上传完成") + return part_num, response.headers.get('ETag', '') + else: + raise Exception(f"上传失败: {response.status_code if response else 'No Response'}") + except Exception as e: + logger.error(f"【阿里云盘】分片 {part_num} 上传失败: {str(e)}") + raise + def upload(self, target_dir: schemas.FileItem, local_path: Path, new_name: Optional[str] = None) -> Optional[schemas.FileItem]: """ - 文件上传:分片、支持秒传 + 文件上传:多线程分片、支持秒传 """ target_name = new_name or local_path.name target_path = Path(target_dir.path) / target_name file_size = local_path.stat().st_size - # 1. 创建文件并检查秒传 - chunk_size = 100 * 1024 * 1024 # 分片大小 100M + # 1. 计算分片大小和线程数 + part_size, workers = self._calc_parts(file_size) + + # 2. 创建文件并检查秒传 create_res = self._create_file(drive_id=target_dir.drive_id, - parent_file_id=target_dir.fileid, - file_name=target_name, - file_path=local_path, - chunk_size=chunk_size) + parent_file_id=target_dir.fileid, + file_name=target_name, + file_path=local_path, + chunk_size=part_size) if create_res.get('rapid_upload', False): logger.info(f"【阿里云盘】{target_name} 秒传完成!") return self.get_item(target_path) @@ -652,93 +715,89 @@ class AliPan(StorageBase, metaclass=Singleton): logger.info(f"【阿里云盘】{target_name} 已存在") return self.get_item(target_path) - # 2. 准备分片上传参数 + # 3. 准备分片上传参数 file_id = create_res.get('file_id') if not file_id: logger.warn(f"【阿里云盘】创建 {target_name} 文件失败!") return None upload_id = create_res.get('upload_id') part_info_list = create_res.get('part_info_list') - uploaded_parts = set() + uploaded_parts = {} - # 3. 获取已上传分片 + # 4. 获取已上传分片 uploaded_info = self._list_uploaded_parts(drive_id=target_dir.drive_id, file_id=file_id, upload_id=upload_id) for part in uploaded_info.get('uploaded_parts', []): - uploaded_parts.add(part['part_number']) + uploaded_parts[part['part_number']] = part.get('etag', '') - # 4. 初始化进度条 - logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path},分片数:{len(part_info_list)}") - progress_bar = tqdm( - total=file_size, - unit='B', - unit_scale=True, - desc="上传进度", - ascii=True - ) + # 5. 初始化进度条 + logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path}," + f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}") + progress_bar = self._log_progress(f"【阿里云盘】{target_name} 上传进度", file_size) - # 5. 分片上传循环 - with open(local_path, 'rb') as f: - for part_info in part_info_list: - part_num = part_info['part_number'] + # 6. 创建进度队列 + progress_queue = Queue() - # 计算分片参数 - start = (part_num - 1) * chunk_size - end = min(start + chunk_size, file_size) - current_chunk_size = end - start + # 7. 创建线程池 + with ThreadPoolExecutor(max_workers=workers) as pool: + futures = [] + + # 提交上传任务 + with open(local_path, 'rb') as f: + for part_info in part_info_list: + part_num = part_info['part_number'] + + # 跳过已上传的分片 + if part_num in uploaded_parts: + start = (part_num - 1) * part_size + end = min(start + part_size, file_size) + progress_bar.update(end - start) + continue - # 更新进度条(已存在的分片) - if part_num in uploaded_parts: - progress_bar.update(current_chunk_size) - continue + # 准备分片数据 + start = (part_num - 1) * part_size + end = min(start + part_size, file_size) + f.seek(start) + data = f.read(end - start) - # 准备分片数据 - f.seek(start) - data = f.read(current_chunk_size) + # 提交上传任务 + future = pool.submit( + self._upload_part, + part_info['upload_url'], + data, + part_num, + progress_queue + ) + futures.append((part_num, future)) - # 上传分片(带重试逻辑) - success = False - for attempt in range(3): # 最大重试次数 - try: - # 获取当前上传地址(可能刷新) - if attempt > 0: - new_urls = self._refresh_upload_urls(drive_id=target_dir.drive_id, file_id=file_id, - upload_id=upload_id, part_numbers=[part_num]) - upload_url = new_urls[0]['upload_url'] - else: - upload_url = part_info['upload_url'] + # 更新进度条 + while len(uploaded_parts) < len(part_info_list): + try: + uploaded = progress_queue.get(timeout=1) + progress_bar.update(uploaded) + except: + pass - # 执行上传 - logger.info( - f"【阿里云盘】开始 第{attempt + 1}次 上传 {target_name} 分片 {part_num} ...") - response = self._upload_part(upload_url=upload_url, data=data) - if response is None: - continue - if response.status_code == 200: - success = True - break - else: - logger.warn( - f"【阿里云盘】{target_name} 分片 {part_num} 第 {attempt + 1} 次上传失败:{response.text}!") - except Exception as e: - logger.warn(f"【阿里云盘】{target_name} 分片 {part_num} 上传异常: {str(e)}!") + # 等待所有任务完成 + for part_num, future in futures: + try: + num, etag = future.result() + uploaded_parts[num] = etag + except Exception as e: + logger.error(f"【阿里云盘】分片上传失败: {str(e)}") + progress_bar.close() + return None - # 处理上传结果 - if success: - uploaded_parts.add(part_num) - progress_bar.update(current_chunk_size) - else: - raise Exception(f"【阿里云盘】{target_name} 分片 {part_num} 上传失败!") + # 8. 关闭进度条 + progress_bar.close() - # 6. 关闭进度条 - if progress_bar: - progress_bar.close() - - # 7. 完成上传 + # 9. 完成上传 result = self._complete_upload(drive_id=target_dir.drive_id, file_id=file_id, upload_id=upload_id) if not result: raise Exception("【阿里云盘】完成上传失败!") if result.get("code"): logger.warn(f"【阿里云盘】{target_name} 上传失败:{result.get('message')}!") + return None + return self.__get_fileitem(result, parent=target_dir.path) def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]: