From 570dddc1207ef8672027b3772ca902b478332385 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 8 May 2025 09:56:43 +0800 Subject: [PATCH] fix --- app/modules/filemanager/storages/alipan.py | 44 ++++++++++------------ app/modules/filemanager/storages/u115.py | 43 +++++++++++---------- 2 files changed, 43 insertions(+), 44 deletions(-) diff --git a/app/modules/filemanager/storages/alipan.py b/app/modules/filemanager/storages/alipan.py index 9b02f4e1..ead220f7 100644 --- a/app/modules/filemanager/storages/alipan.py +++ b/app/modules/filemanager/storages/alipan.py @@ -1,13 +1,13 @@ import base64 import hashlib +import io import secrets import threading import time -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ThreadPoolExecutor from pathlib import Path -from queue import Queue +from queue import Queue, Empty from typing import List, Dict, Optional, Tuple, Union -import io import requests from tqdm import tqdm @@ -591,13 +591,6 @@ class AliPan(StorageBase, metaclass=Singleton): raise Exception(resp.get("message")) return resp.get('part_info_list', []) - @staticmethod - def _upload_part(upload_url: str, data: bytes): - """ - 上传单个分片 - """ - return requests.put(upload_url, data=data) - def _list_uploaded_parts(self, drive_id: str, file_id: str, upload_id: str) -> dict: """ 获取已上传分片列表 @@ -650,12 +643,14 @@ class AliPan(StorageBase, metaclass=Singleton): part_size = min(file_size // self.MAX_WORKERS, self.MAX_PART_SIZE) return part_size, min(file_size // part_size + 1, self.MAX_WORKERS) - def _log_progress(self, desc: str, total: int) -> tqdm: + @staticmethod + def _log_progress(desc: str, total: int) -> tqdm: """ 创建一个可以输出到日志的进度条 """ + class TqdmToLogger(io.StringIO): - def write(s, buf): + def write(s, buf): # noqa buf = buf.strip('\r\n\t ') if buf: logger.info(buf) @@ -671,8 +666,9 @@ class AliPan(StorageBase, metaclass=Singleton): miniters=1 ) - def _upload_part(self, upload_url: str, data: bytes, part_num: int, - progress_queue: Queue) -> Tuple[int, str]: + @staticmethod + def _upload_part(upload_url: str, data: bytes, part_num: int, + progress_queue: Queue) -> Tuple[int, str]: """ 上传单个分片 """ @@ -700,13 +696,13 @@ class AliPan(StorageBase, metaclass=Singleton): # 1. 计算分片大小和线程数 part_size, workers = self._calc_parts(file_size) - + # 2. 创建文件并检查秒传 create_res = self._create_file(drive_id=target_dir.drive_id, - parent_file_id=target_dir.fileid, - file_name=target_name, - file_path=local_path, - chunk_size=part_size) + parent_file_id=target_dir.fileid, + file_name=target_name, + file_path=local_path, + chunk_size=part_size) if create_res.get('rapid_upload', False): logger.info(f"【阿里云盘】{target_name} 秒传完成!") return self.get_item(target_path) @@ -731,7 +727,7 @@ class AliPan(StorageBase, metaclass=Singleton): # 5. 初始化进度条 logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path}," - f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}") + f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}") progress_bar = self._log_progress(f"【阿里云盘】{target_name} 上传进度", file_size) # 6. 创建进度队列 @@ -740,12 +736,12 @@ class AliPan(StorageBase, metaclass=Singleton): # 7. 创建线程池 with ThreadPoolExecutor(max_workers=workers) as pool: futures = [] - + # 提交上传任务 with open(local_path, 'rb') as f: for part_info in part_info_list: part_num = part_info['part_number'] - + # 跳过已上传的分片 if part_num in uploaded_parts: start = (part_num - 1) * part_size @@ -774,7 +770,7 @@ class AliPan(StorageBase, metaclass=Singleton): try: uploaded = progress_queue.get(timeout=1) progress_bar.update(uploaded) - except: + except Empty: pass # 等待所有任务完成 @@ -797,7 +793,7 @@ class AliPan(StorageBase, metaclass=Singleton): if result.get("code"): logger.warn(f"【阿里云盘】{target_name} 上传失败:{result.get('message')}!") return None - + return self.__get_fileitem(result, parent=target_dir.path) def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]: diff --git a/app/modules/filemanager/storages/u115.py b/app/modules/filemanager/storages/u115.py index 396024e1..584b2c7b 100644 --- a/app/modules/filemanager/storages/u115.py +++ b/app/modules/filemanager/storages/u115.py @@ -1,18 +1,17 @@ import base64 import hashlib +import io import json import secrets import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path -from queue import Queue +from queue import Queue, Empty from typing import List, Dict, Optional, Tuple, Union -import io import oss2 import requests -from oss2 import SizedFileAdapter, determine_part_size from oss2.models import PartInfo from tqdm import tqdm @@ -421,8 +420,9 @@ class U115Pan(StorageBase, metaclass=Singleton): part_size = min(file_size // self.MAX_WORKERS, self.MAX_PART_SIZE) return part_size, min(file_size // part_size + 1, self.MAX_WORKERS) - def _upload_part(self, bucket: oss2.Bucket, object_name: str, upload_id: str, - part_number: int, part_data: bytes, progress_queue: Queue) -> PartInfo: + @staticmethod + def _upload_part(bucket: oss2.Bucket, object_name: str, upload_id: str, + part_number: int, part_data: bytes, progress_queue: Queue) -> PartInfo: """ 上传单个分片 """ @@ -437,12 +437,14 @@ class U115Pan(StorageBase, metaclass=Singleton): logger.error(f"【115】分片 {part_number} 上传失败: {str(e)}") raise - def _log_progress(self, desc: str, total: int) -> tqdm: + @staticmethod + def _log_progress(desc: str, total: int) -> tqdm: """ 创建一个可以输出到日志的进度条 """ + class TqdmToLogger(io.StringIO): - def write(s, buf): + def write(s, buf): # noqa buf = buf.strip('\r\n\t ') if buf: logger.info(buf) @@ -463,6 +465,7 @@ class U115Pan(StorageBase, metaclass=Singleton): """ 实现带秒传、断点续传和多线程并发上传 """ + def encode_callback(cb: dict): return oss2.utils.b64encode_as_string(json.dumps(cb).strip()) @@ -495,7 +498,7 @@ class U115Pan(StorageBase, metaclass=Singleton): if not init_resp.get("state"): logger.warn(f"【115】初始化上传失败: {init_resp.get('error')}") return None - + # 结果 init_result = init_resp.get("data") logger.debug(f"【115】上传 Step 1 初始化结果: {init_result}") @@ -555,7 +558,7 @@ class U115Pan(StorageBase, metaclass=Singleton): logger.warn("【115】获取上传凭证失败") return None logger.debug(f"【115】上传 Step 4 获取上传凭证结果: {token_resp}") - + endpoint = token_resp.get("endpoint") AccessKeyId = token_resp.get("AccessKeyId") AccessKeySecret = token_resp.get("AccessKeySecret") @@ -584,31 +587,31 @@ class U115Pan(StorageBase, metaclass=Singleton): access_key_secret=AccessKeySecret, security_token=SecurityToken ) - bucket = oss2.Bucket(auth, endpoint, bucket_name) + bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa # 计算分片大小和线程数 part_size, workers = self._calc_parts(file_size) logger.info(f"【115】开始上传: {local_path} -> {target_path}," - f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}") + f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}") # 初始化进度条 progress_bar = self._log_progress(f"【115】{target_name} 上传进度", file_size) # 初始化分片上传 upload_id = bucket.init_multipart_upload(object_name, - params={ - "encoding-type": "url", - "sequential": "" - }).upload_id + params={ + "encoding-type": "url", + "sequential": "" + }).upload_id # 创建进度队列 progress_queue = Queue() - + # 创建线程池 with ThreadPoolExecutor(max_workers=workers) as pool: futures = [] parts = [] - + # 提交上传任务 with open(local_path, 'rb') as fileobj: part_number = 1 @@ -635,7 +638,7 @@ class U115Pan(StorageBase, metaclass=Singleton): try: uploaded = progress_queue.get(timeout=1) progress_bar.update(uploaded) - except: + except Empty: pass # 等待所有任务完成 @@ -657,10 +660,10 @@ class U115Pan(StorageBase, metaclass=Singleton): 'x-oss-callback-var': encode_callback(callback["callback_var"]), 'x-oss-forbid-overwrite': 'false' } - + try: result = bucket.complete_multipart_upload(object_name, upload_id, parts, - headers=headers) + headers=headers) if result.status == 200: logger.debug(f"【115】上传 Step 6 回调结果:{result.resp.response.json()}") logger.info(f"【115】{target_name} 上传成功")