diff --git a/app/modules/filemanager/storages/alipan.py b/app/modules/filemanager/storages/alipan.py index 2d76c73e..ed01ed96 100644 --- a/app/modules/filemanager/storages/alipan.py +++ b/app/modules/filemanager/storages/alipan.py @@ -4,7 +4,6 @@ import io import secrets import threading import time -from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import List, Dict, Optional, Tuple, Union @@ -56,13 +55,6 @@ class AliPan(StorageBase, metaclass=Singleton): # CID和路径缓存 _id_cache: Dict[str, Tuple[str, str]] = {} - # 最大线程数 - MAX_WORKERS = 10 - # 最大分片大小(1GB) - MAX_PART_SIZE = 1024 * 1024 * 1024 - # 最小分片大小(100MB) - MIN_PART_SIZE = 100 * 1024 * 1024 - def __init__(self): super().__init__() self.session = requests.Session() @@ -598,6 +590,13 @@ class AliPan(StorageBase, metaclass=Singleton): raise Exception(resp.get("message")) return resp.get('part_info_list', []) + @staticmethod + def _upload_part(upload_url: str, data: bytes): + """ + 上传单个分片 + """ + return requests.put(upload_url, data=data) + def _list_uploaded_parts(self, drive_id: str, file_id: str, upload_id: str) -> dict: """ 获取已上传分片列表 @@ -636,20 +635,6 @@ class AliPan(StorageBase, metaclass=Singleton): raise Exception(resp.get("message")) return resp - def _calc_parts(self, file_size: int) -> Tuple[int, int]: - """ - 计算最优分片大小和线程数,在最大分片大小和最小分片大小之间取最优值 - :param file_size: 文件大小 - :return: 分片大小,线程数 - """ - if file_size <= self.MIN_PART_SIZE: - return file_size, 1 - if file_size >= self.MAX_PART_SIZE: - part_size = self.MAX_PART_SIZE - else: - part_size = max(self.MIN_PART_SIZE, file_size // self.MAX_WORKERS) - return part_size, (file_size + part_size - 1) // part_size - @staticmethod def _log_progress(desc: str, total: int) -> tqdm: """ @@ -673,40 +658,22 @@ class AliPan(StorageBase, metaclass=Singleton): miniters=1 ) - @staticmethod - def _upload_part(upload_url: str, data: bytes, part_num: int) -> Tuple[int, str, int]: - """ - 上传单个分片 - """ - try: - response = requests.put(upload_url, data=data) - if response and response.status_code == 200: - logger.info(f"【阿里云盘】分片 {part_num} 上传完成") - return part_num, response.headers.get('ETag', ''), len(data) - else: - raise Exception(f"上传失败: {response.status_code if response else 'No Response'}") - except Exception as e: - logger.error(f"【阿里云盘】分片 {part_num} 上传失败: {str(e)}") - raise - def upload(self, target_dir: schemas.FileItem, local_path: Path, new_name: Optional[str] = None) -> Optional[schemas.FileItem]: """ - 文件上传:多线程分片、支持秒传 + 文件上传:分片、支持秒传 """ target_name = new_name or local_path.name target_path = Path(target_dir.path) / target_name file_size = local_path.stat().st_size - # 1. 计算分片大小和线程数 - part_size, workers = self._calc_parts(file_size) - - # 2. 创建文件并检查秒传 + # 1. 创建文件并检查秒传 + chunk_size = 100 * 1024 * 1024 # 分片大小 100M create_res = self._create_file(drive_id=target_dir.drive_id, parent_file_id=target_dir.fileid, file_name=target_name, file_path=local_path, - chunk_size=part_size) + chunk_size=chunk_size) if create_res.get('rapid_upload', False): logger.info(f"【阿里云盘】{target_name} 秒传完成!") return self.get_item(target_path) @@ -715,78 +682,87 @@ class AliPan(StorageBase, metaclass=Singleton): logger.info(f"【阿里云盘】{target_name} 已存在") return self.get_item(target_path) - # 3. 准备分片上传参数 + # 2. 准备分片上传参数 file_id = create_res.get('file_id') if not file_id: logger.warn(f"【阿里云盘】创建 {target_name} 文件失败!") return None upload_id = create_res.get('upload_id') part_info_list = create_res.get('part_info_list') - uploaded_parts = {} + uploaded_parts = set() - # 4. 获取已上传分片 + # 3. 获取已上传分片 uploaded_info = self._list_uploaded_parts(drive_id=target_dir.drive_id, file_id=file_id, upload_id=upload_id) for part in uploaded_info.get('uploaded_parts', []): - uploaded_parts[part['part_number']] = part.get('etag', '') + uploaded_parts.add(part['part_number']) - # 5. 初始化进度条 - logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path}," - f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}") + # 4. 初始化进度条 + logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path},分片数:{len(part_info_list)}") progress_bar = self._log_progress(f"【阿里云盘】{target_name} 上传进度", file_size) - # 7. 创建线程池 - with ThreadPoolExecutor(max_workers=workers) as pool: - futures = [] + # 5. 分片上传循环 + with open(local_path, 'rb') as f: + for part_info in part_info_list: + part_num = part_info['part_number'] - # 提交上传任务 - with open(local_path, 'rb') as f: - for part_info in part_info_list: - part_num = part_info['part_number'] + # 计算分片参数 + start = (part_num - 1) * chunk_size + end = min(start + chunk_size, file_size) + current_chunk_size = end - start - # 跳过已上传的分片 - if part_num in uploaded_parts: - start = (part_num - 1) * part_size - end = min(start + part_size, file_size) - progress_bar.update(end - start) - continue + # 更新进度条(已存在的分片) + if part_num in uploaded_parts: + progress_bar.update(current_chunk_size) + continue - # 准备分片数据 - start = (part_num - 1) * part_size - end = min(start + part_size, file_size) - f.seek(start) - data = f.read(end - start) + # 准备分片数据 + f.seek(start) + data = f.read(current_chunk_size) - # 提交上传任务 - future = pool.submit( - self._upload_part, - part_info['upload_url'], - data, - part_num - ) - futures.append((part_num, future)) + # 上传分片(带重试逻辑) + success = False + for attempt in range(3): # 最大重试次数 + try: + # 获取当前上传地址(可能刷新) + if attempt > 0: + new_urls = self._refresh_upload_urls(drive_id=target_dir.drive_id, file_id=file_id, + upload_id=upload_id, part_numbers=[part_num]) + upload_url = new_urls[0]['upload_url'] + else: + upload_url = part_info['upload_url'] - # 等待所有任务完成 - for part_num, future in futures: - try: - num, etag, uploaded = future.result() - uploaded_parts[num] = etag - progress_bar.update(uploaded) - except Exception as e: - logger.error(f"【阿里云盘】分片上传失败: {str(e)}") - progress_bar.close() - return None + # 执行上传 + logger.info( + f"【阿里云盘】开始 第{attempt + 1}次 上传 {target_name} 分片 {part_num} ...") + response = self._upload_part(upload_url=upload_url, data=data) + if response is None: + continue + if response.status_code == 200: + success = True + break + else: + logger.warn( + f"【阿里云盘】{target_name} 分片 {part_num} 第 {attempt + 1} 次上传失败:{response.text}!") + except Exception as e: + logger.warn(f"【阿里云盘】{target_name} 分片 {part_num} 上传异常: {str(e)}!") - # 8. 关闭进度条 - progress_bar.close() + # 处理上传结果 + if success: + uploaded_parts.add(part_num) + progress_bar.update(current_chunk_size) + else: + raise Exception(f"【阿里云盘】{target_name} 分片 {part_num} 上传失败!") - # 9. 完成上传 + # 6. 关闭进度条 + if progress_bar: + progress_bar.close() + + # 7. 完成上传 result = self._complete_upload(drive_id=target_dir.drive_id, file_id=file_id, upload_id=upload_id) if not result: raise Exception("【阿里云盘】完成上传失败!") if result.get("code"): logger.warn(f"【阿里云盘】{target_name} 上传失败:{result.get('message')}!") - return None - return self.__get_fileitem(result, parent=target_dir.path) def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]: diff --git a/app/modules/filemanager/storages/u115.py b/app/modules/filemanager/storages/u115.py index 3a0528ef..08a24aac 100644 --- a/app/modules/filemanager/storages/u115.py +++ b/app/modules/filemanager/storages/u115.py @@ -4,12 +4,12 @@ import io import secrets import threading import time -from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import List, Dict, Optional, Tuple, Union import oss2 import requests +from oss2 import SizedFileAdapter, determine_part_size from oss2.models import PartInfo from tqdm import tqdm @@ -54,13 +54,6 @@ class U115Pan(StorageBase, metaclass=Singleton): # CID和路径缓存 _id_cache: Dict[str, str] = {} - # 最大线程数 - MAX_WORKERS = 10 - # 最大分片大小(1GB) - MAX_PART_SIZE = 1024 * 1024 * 1024 - # 最小分片大小(100MB) - MIN_PART_SIZE = 100 * 1024 * 1024 - def __init__(self): super().__init__() self.session = requests.Session() @@ -406,35 +399,6 @@ class U115Pan(StorageBase, metaclass=Singleton): modify_time=int(time.time()) ) - def _calc_parts(self, file_size: int) -> Tuple[int, int]: - """ - 计算最优分片大小和线程数,在最大分片大小和最小分片大小之间取最优值 - :param file_size: 文件大小 - :return: 分片大小,线程数 - """ - if file_size <= self.MIN_PART_SIZE: - return file_size, 1 - if file_size >= self.MAX_PART_SIZE: - part_size = self.MAX_PART_SIZE - else: - part_size = max(self.MIN_PART_SIZE, file_size // self.MAX_WORKERS) - return part_size, (file_size + part_size - 1) // part_size - - @staticmethod - def _upload_part(bucket: oss2.Bucket, object_name: str, upload_id: str, - part_number: int, part_data: bytes) -> Tuple[PartInfo, int]: - """ - 上传单个分片 - """ - try: - result = bucket.upload_part(object_name, upload_id, part_number, part_data) - part_info = PartInfo(part_number, result.etag) - logger.info(f"【115】分片 {part_number} 上传完成") - return part_info, len(part_data) - except Exception as e: - logger.error(f"【115】分片 {part_number} 上传失败: {str(e)}") - raise - @staticmethod def _log_progress(desc: str, total: int) -> tqdm: """ @@ -461,10 +425,10 @@ class U115Pan(StorageBase, metaclass=Singleton): def upload(self, target_dir: schemas.FileItem, local_path: Path, new_name: Optional[str] = None) -> Optional[schemas.FileItem]: """ - 实现带秒传、断点续传和多线程并发上传 + 实现带秒传、断点续传和二次认证的文件上传 """ - def encode_callback(cb: str): + def encode_callback(cb: str) -> str: return oss2.utils.b64encode_as_string(cb) target_name = new_name or local_path.name @@ -496,7 +460,6 @@ class U115Pan(StorageBase, metaclass=Singleton): if not init_resp.get("state"): logger.warn(f"【115】初始化上传失败: {init_resp.get('error')}") return None - # 结果 init_result = init_resp.get("data") logger.debug(f"【115】上传 Step 1 初始化结果: {init_result}") @@ -514,10 +477,15 @@ class U115Pan(StorageBase, metaclass=Singleton): sign_checks = sign_check.split("-") start = int(sign_checks[0]) end = int(sign_checks[1]) + # 计算指定区间的SHA1 + # sign_check (用下划线隔开,截取上传文内容的sha1)(单位是byte): "2392148-2392298" with open(local_path, "rb") as f: + # 取2392148-2392298之间的内容(包含2392148、2392298)的sha1 f.seek(start) chunk = f.read(end - start + 1) sign_val = hashlib.sha1(chunk).hexdigest().upper() + # 重新初始化请求 + # sign_key,sign_val(根据sign_check计算的值大写的sha1值) init_data.update({ "pick_code": pick_code, "sign_key": sign_key, @@ -530,6 +498,7 @@ class U115Pan(StorageBase, metaclass=Singleton): ) if not init_resp: return None + # 二次认证结果 init_result = init_resp.get("data") logger.debug(f"【115】上传 Step 2 二次认证结果: {init_result}") if not pick_code: @@ -556,7 +525,7 @@ class U115Pan(StorageBase, metaclass=Singleton): logger.warn("【115】获取上传凭证失败") return None logger.debug(f"【115】上传 Step 4 获取上传凭证结果: {token_resp}") - + # 上传凭证 endpoint = token_resp.get("endpoint") AccessKeyId = token_resp.get("AccessKeyId") AccessKeySecret = token_resp.get("AccessKeySecret") @@ -579,74 +548,60 @@ class U115Pan(StorageBase, metaclass=Singleton): if resume_resp.get("callback"): callback = resume_resp["callback"] - # Step 6: 多线程分片上传 + # Step 6: 对象存储上传 auth = oss2.StsAuth( access_key_id=AccessKeyId, access_key_secret=AccessKeySecret, security_token=SecurityToken ) bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa - - # 计算分片大小和线程数 - part_size, workers = self._calc_parts(file_size) - logger.info(f"【115】开始上传: {local_path} -> {target_path}," - f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}") + # determine_part_size方法用于确定分片大小,设置分片大小为 100M + part_size = determine_part_size(file_size, preferred_size=100 * 1024 * 1024) # 初始化进度条 - progress_bar = self._log_progress(f"【115】{target_name} 上传进度", file_size) + logger.info(f"【115】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}") + progress_bar = tqdm( + total=file_size, + unit='B', + unit_scale=True, + desc="上传进度", + ascii=True + ) - # 初始化分片上传 + # 初始化分片 upload_id = bucket.init_multipart_upload(object_name, params={ - "encoding-type": "url" + "encoding-type": "url", + "sequential": "" }).upload_id + parts = [] + # 逐个上传分片 + with open(local_path, 'rb') as fileobj: + part_number = 1 + offset = 0 + while offset < file_size: + num_to_upload = min(part_size, file_size - offset) + # 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。 + logger.info(f"【115】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}") + result = bucket.upload_part(object_name, upload_id, part_number, + data=SizedFileAdapter(fileobj, num_to_upload)) + parts.append(PartInfo(part_number, result.etag)) + logger.info(f"【115】{target_name} 分片 {part_number} 上传完成") + offset += num_to_upload + part_number += 1 + # 更新进度 + progress_bar.update(num_to_upload) - # 创建线程池 - with ThreadPoolExecutor(max_workers=workers) as pool: - futures = [] - parts = [] + # 关闭进度条 + if progress_bar: + progress_bar.close() - # 提交上传任务 - with open(local_path, 'rb') as fileobj: - part_number = 1 - offset = 0 - while offset < file_size: - size = min(part_size, file_size - offset) - fileobj.seek(offset) - part_data = fileobj.read(size) - future = pool.submit( - self._upload_part, - bucket, - object_name, - upload_id, - part_number, - part_data - ) - futures.append(future) - offset += size - part_number += 1 - - # 等待所有任务完成 - for future in as_completed(futures): - try: - part_info, uploaded = future.result() - parts.append(part_info) - progress_bar.update(uploaded) - except Exception as e: - logger.error(f"【115】分片上传失败: {str(e)}") - progress_bar.close() - return None - - # 按分片号排序 - parts.sort(key=lambda x: x.part_number) - - # 完成上传 + # 请求头 headers = { 'X-oss-callback': encode_callback(callback["callback"]), 'x-oss-callback-var': encode_callback(callback["callback_var"]), 'x-oss-forbid-overwrite': 'false' } - try: result = bucket.complete_multipart_upload(object_name, upload_id, parts, headers=headers) @@ -662,9 +617,6 @@ class U115Pan(StorageBase, metaclass=Singleton): else: logger.error(f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}") return None - finally: - progress_bar.close() - # 返回结果 return self.get_item(target_path)