This commit is contained in:
jxxghp
2025-05-08 09:56:43 +08:00
parent ea31072ae5
commit 570dddc120
2 changed files with 43 additions and 44 deletions

View File

@@ -1,13 +1,13 @@
import base64
import hashlib
import io
import secrets
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from queue import Queue
from queue import Queue, Empty
from typing import List, Dict, Optional, Tuple, Union
import io
import requests
from tqdm import tqdm
@@ -591,13 +591,6 @@ class AliPan(StorageBase, metaclass=Singleton):
raise Exception(resp.get("message"))
return resp.get('part_info_list', [])
@staticmethod
def _upload_part(upload_url: str, data: bytes):
"""
上传单个分片
"""
return requests.put(upload_url, data=data)
def _list_uploaded_parts(self, drive_id: str, file_id: str, upload_id: str) -> dict:
"""
获取已上传分片列表
@@ -650,12 +643,14 @@ class AliPan(StorageBase, metaclass=Singleton):
part_size = min(file_size // self.MAX_WORKERS, self.MAX_PART_SIZE)
return part_size, min(file_size // part_size + 1, self.MAX_WORKERS)
def _log_progress(self, desc: str, total: int) -> tqdm:
@staticmethod
def _log_progress(desc: str, total: int) -> tqdm:
"""
创建一个可以输出到日志的进度条
"""
class TqdmToLogger(io.StringIO):
def write(s, buf):
def write(s, buf): # noqa
buf = buf.strip('\r\n\t ')
if buf:
logger.info(buf)
@@ -671,8 +666,9 @@ class AliPan(StorageBase, metaclass=Singleton):
miniters=1
)
def _upload_part(self, upload_url: str, data: bytes, part_num: int,
progress_queue: Queue) -> Tuple[int, str]:
@staticmethod
def _upload_part(upload_url: str, data: bytes, part_num: int,
progress_queue: Queue) -> Tuple[int, str]:
"""
上传单个分片
"""
@@ -700,13 +696,13 @@ class AliPan(StorageBase, metaclass=Singleton):
# 1. 计算分片大小和线程数
part_size, workers = self._calc_parts(file_size)
# 2. 创建文件并检查秒传
create_res = self._create_file(drive_id=target_dir.drive_id,
parent_file_id=target_dir.fileid,
file_name=target_name,
file_path=local_path,
chunk_size=part_size)
parent_file_id=target_dir.fileid,
file_name=target_name,
file_path=local_path,
chunk_size=part_size)
if create_res.get('rapid_upload', False):
logger.info(f"【阿里云盘】{target_name} 秒传完成!")
return self.get_item(target_path)
@@ -731,7 +727,7 @@ class AliPan(StorageBase, metaclass=Singleton):
# 5. 初始化进度条
logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path}"
f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}")
f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}")
progress_bar = self._log_progress(f"【阿里云盘】{target_name} 上传进度", file_size)
# 6. 创建进度队列
@@ -740,12 +736,12 @@ class AliPan(StorageBase, metaclass=Singleton):
# 7. 创建线程池
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = []
# 提交上传任务
with open(local_path, 'rb') as f:
for part_info in part_info_list:
part_num = part_info['part_number']
# 跳过已上传的分片
if part_num in uploaded_parts:
start = (part_num - 1) * part_size
@@ -774,7 +770,7 @@ class AliPan(StorageBase, metaclass=Singleton):
try:
uploaded = progress_queue.get(timeout=1)
progress_bar.update(uploaded)
except:
except Empty:
pass
# 等待所有任务完成
@@ -797,7 +793,7 @@ class AliPan(StorageBase, metaclass=Singleton):
if result.get("code"):
logger.warn(f"【阿里云盘】{target_name} 上传失败:{result.get('message')}")
return None
return self.__get_fileitem(result, parent=target_dir.path)
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:

View File

@@ -1,18 +1,17 @@
import base64
import hashlib
import io
import json
import secrets
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from queue import Queue
from queue import Queue, Empty
from typing import List, Dict, Optional, Tuple, Union
import io
import oss2
import requests
from oss2 import SizedFileAdapter, determine_part_size
from oss2.models import PartInfo
from tqdm import tqdm
@@ -421,8 +420,9 @@ class U115Pan(StorageBase, metaclass=Singleton):
part_size = min(file_size // self.MAX_WORKERS, self.MAX_PART_SIZE)
return part_size, min(file_size // part_size + 1, self.MAX_WORKERS)
def _upload_part(self, bucket: oss2.Bucket, object_name: str, upload_id: str,
part_number: int, part_data: bytes, progress_queue: Queue) -> PartInfo:
@staticmethod
def _upload_part(bucket: oss2.Bucket, object_name: str, upload_id: str,
part_number: int, part_data: bytes, progress_queue: Queue) -> PartInfo:
"""
上传单个分片
"""
@@ -437,12 +437,14 @@ class U115Pan(StorageBase, metaclass=Singleton):
logger.error(f"【115】分片 {part_number} 上传失败: {str(e)}")
raise
def _log_progress(self, desc: str, total: int) -> tqdm:
@staticmethod
def _log_progress(desc: str, total: int) -> tqdm:
"""
创建一个可以输出到日志的进度条
"""
class TqdmToLogger(io.StringIO):
def write(s, buf):
def write(s, buf): # noqa
buf = buf.strip('\r\n\t ')
if buf:
logger.info(buf)
@@ -463,6 +465,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
"""
实现带秒传、断点续传和多线程并发上传
"""
def encode_callback(cb: dict):
return oss2.utils.b64encode_as_string(json.dumps(cb).strip())
@@ -495,7 +498,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
if not init_resp.get("state"):
logger.warn(f"【115】初始化上传失败: {init_resp.get('error')}")
return None
# 结果
init_result = init_resp.get("data")
logger.debug(f"【115】上传 Step 1 初始化结果: {init_result}")
@@ -555,7 +558,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
logger.warn("【115】获取上传凭证失败")
return None
logger.debug(f"【115】上传 Step 4 获取上传凭证结果: {token_resp}")
endpoint = token_resp.get("endpoint")
AccessKeyId = token_resp.get("AccessKeyId")
AccessKeySecret = token_resp.get("AccessKeySecret")
@@ -584,31 +587,31 @@ class U115Pan(StorageBase, metaclass=Singleton):
access_key_secret=AccessKeySecret,
security_token=SecurityToken
)
bucket = oss2.Bucket(auth, endpoint, bucket_name)
bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa
# 计算分片大小和线程数
part_size, workers = self._calc_parts(file_size)
logger.info(f"【115】开始上传: {local_path} -> {target_path}"
f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}")
f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}")
# 初始化进度条
progress_bar = self._log_progress(f"【115】{target_name} 上传进度", file_size)
# 初始化分片上传
upload_id = bucket.init_multipart_upload(object_name,
params={
"encoding-type": "url",
"sequential": ""
}).upload_id
params={
"encoding-type": "url",
"sequential": ""
}).upload_id
# 创建进度队列
progress_queue = Queue()
# 创建线程池
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = []
parts = []
# 提交上传任务
with open(local_path, 'rb') as fileobj:
part_number = 1
@@ -635,7 +638,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
try:
uploaded = progress_queue.get(timeout=1)
progress_bar.update(uploaded)
except:
except Empty:
pass
# 等待所有任务完成
@@ -657,10 +660,10 @@ class U115Pan(StorageBase, metaclass=Singleton):
'x-oss-callback-var': encode_callback(callback["callback_var"]),
'x-oss-forbid-overwrite': 'false'
}
try:
result = bucket.complete_multipart_upload(object_name, upload_id, parts,
headers=headers)
headers=headers)
if result.status == 200:
logger.debug(f"【115】上传 Step 6 回调结果:{result.resp.response.json()}")
logger.info(f"【115】{target_name} 上传成功")