diff --git a/app/modules/filemanager/storages/alipan.py b/app/modules/filemanager/storages/alipan.py index 52f92107..9b0490f6 100644 --- a/app/modules/filemanager/storages/alipan.py +++ b/app/modules/filemanager/storages/alipan.py @@ -1,3 +1,4 @@ +import base64 import hashlib import secrets import threading @@ -6,6 +7,7 @@ from pathlib import Path from typing import List, Dict, Optional, Tuple, Union import requests +from tqdm import tqdm from app import schemas from app.core.config import settings @@ -72,7 +74,7 @@ class AliPan(StorageBase, metaclass=Singleton): conf = self.get_conf() drive_id = conf.get("resource_drive_id") or conf.get("backup_drive_id") or conf.get("default_drive_id") if not drive_id: - raise Exception("请先登录阿里云盘!") + raise Exception("【阿里云盘】请先扫码登录!") return drive_id @property @@ -171,7 +173,7 @@ class AliPan(StorageBase, metaclass=Singleton): 确认登录后,获取相关token """ if not self._auth_state: - raise Exception("请先生成二维码") + raise Exception("【阿里云盘】请先生成二维码") resp = self.session.post( f"{self.base_url}/oauth/access_token", json={ @@ -182,10 +184,10 @@ class AliPan(StorageBase, metaclass=Singleton): } ) if resp is None: - raise Exception("获取 access_token 失败") + raise Exception("【阿里云盘】获取 access_token 失败") result = resp.json() if result.get("code"): - raise Exception(f"{result.get('code')} - {result.get('message')}!") + raise Exception(f"【阿里云盘】{result.get('code')} - {result.get('message')}!") return result def __refresh_access_token(self, refresh_token: str) -> Optional[dict]: @@ -193,7 +195,7 @@ class AliPan(StorageBase, metaclass=Singleton): 刷新access_token """ if not refresh_token: - raise Exception("会话失效,请重新扫码登录!") + raise Exception("【阿里云盘】会话失效,请重新扫码登录!") resp = self.session.post( f"{self.base_url}/oauth/access_token", json={ @@ -259,13 +261,10 @@ class AliPan(StorageBase, metaclass=Singleton): time.sleep(reset_time + 5) return self._request_api(method, endpoint, result_key, **kwargs) - # 处理请求错误 - resp.raise_for_status() - # 返回数据 ret_data = resp.json() if ret_data.get("code"): - logger.warn(f"【阿里云盘】{method} 请求 {endpoint} 出错:{ret_data.get('message')}!") + logger.warn(f"【阿里云盘】{method} {endpoint} 返回:{ret_data.get('code')} {ret_data.get('message')}") if result_key: return ret_data.get(result_key) @@ -460,6 +459,38 @@ class AliPan(StorageBase, metaclass=Singleton): sha1.update(data) return sha1.hexdigest() + def _calculate_proof_code(self, file_path: Path): + """ + 计算秒传所需的proof_code + """ + file_size = file_path.stat().st_size + if file_size == 0: + return "" + + # Step 1-3: 计算access_token的MD5并取前16位 + md5 = hashlib.md5(self.access_token.encode()).hexdigest() + hex_str = md5[:16] + + # Step 4: 转换为无符号int64 + try: + tmp_int = int(hex_str, 16) + except ValueError: + raise ValueError("【阿里云盘】Invalid hex string for proof code calculation") + + # Step 5-7: 计算读取范围 + index = tmp_int % file_size + start = index + end = index + 8 + if end > file_size: + end = file_size + + # Step 8: 读取文件范围数据并编码 + with open(file_path, 'rb') as f: + f.seek(start) + chunk = f.read(end - start) + + return base64.b64encode(chunk).decode() + @staticmethod def _calculate_content_hash(file_path: Path): """ @@ -475,19 +506,19 @@ class AliPan(StorageBase, metaclass=Singleton): return sha1.hexdigest() def _create_file(self, drive_id: str, parent_file_id: str, - file_name: str, file_path: Path, check_name_mode="auto_rename", + file_name: str, file_path: Path, check_name_mode="refuse", chunk_size: int = 1 * 1024 * 1024 * 1024): """ 创建文件请求,尝试秒传 """ file_size = file_path.stat().st_size pre_hash = self._calculate_pre_hash(file_path) - content_hash = self._calculate_content_hash(file_path) num_parts = (file_size + chunk_size - 1) // chunk_size # 构建分片信息 part_info_list = [{"part_number": i + 1} for i in range(num_parts)] + # 确定是否能秒传 data = { "drive_id": drive_id, "parent_file_id": parent_file_id, @@ -496,19 +527,36 @@ class AliPan(StorageBase, metaclass=Singleton): "check_name_mode": check_name_mode, "size": file_size, "pre_hash": pre_hash, - "content_hash": content_hash, - "content_hash_name": "sha1", "part_info_list": part_info_list } - resp = self._request_api( "POST", "/adrive/v1.0/openFile/create", json=data ) if not resp: - raise Exception("创建文件失败!") - if resp.get("code"): + raise Exception("【阿里云盘】创建文件失败!") + if resp.get("code") == "PreHashMatched": + # 可以秒传 + proof_code = self._calculate_proof_code(file_path) + content_hash = self._calculate_content_hash(file_path) + data.pop("pre_hash") + data.update({ + "proof_code": proof_code, + "proof_version": "v1", + "content_hash": content_hash, + "content_hash_name": "sha1", + }) + resp = self._request_api( + "POST", + "/adrive/v1.0/openFile/create", + json=data + ) + if not resp: + raise Exception("【阿里云盘】创建文件失败!") + if resp.get("code"): + raise Exception(resp.get("message")) + else: raise Exception(resp.get("message")) return resp @@ -528,7 +576,7 @@ class AliPan(StorageBase, metaclass=Singleton): json=data ) if not resp: - raise Exception("刷新分片上传地址失败!") + raise Exception("【阿里云盘】刷新分片上传地址失败!") if resp.get("code"): raise Exception(resp.get("message")) return resp.get('part_info_list', []) @@ -542,8 +590,7 @@ class AliPan(StorageBase, metaclass=Singleton): 'Content-Length': str(len(data)), 'Content-Type': 'application/octet-stream' } - response = requests.put(upload_url, data=data, headers=headers) - return response + return requests.put(upload_url, data=data, headers=headers) def _list_uploaded_parts(self, drive_id: str, file_id: str, upload_id: str) -> dict: """ @@ -560,7 +607,7 @@ class AliPan(StorageBase, metaclass=Singleton): json=data ) if not resp: - raise Exception("获取已上传分片失败!") + raise Exception("【阿里云盘】获取已上传分片失败!") if resp.get("code"): raise Exception(resp.get("message")) return resp @@ -578,7 +625,7 @@ class AliPan(StorageBase, metaclass=Singleton): json=data ) if not resp: - raise Exception("完成上传失败!") + raise Exception("【阿里云盘】完成上传失败!") if resp.get("code"): raise Exception(resp.get("message")) return resp @@ -590,23 +637,23 @@ class AliPan(StorageBase, metaclass=Singleton): """ target_name = new_name or local_path.name target_path = Path(target_dir.path) / target_name + file_size = local_path.stat().st_size # 1. 创建文件并检查秒传 - chunk_size = 1 * 1024 * 1024 * 1024 + chunk_size = 100 * 1024 * 1024 # 分片大小 100M create_res = self._create_file(drive_id=target_dir.drive_id, parent_file_id=target_dir.fileid, file_name=new_name, file_path=local_path, chunk_size=chunk_size) if create_res.get('rapid_upload', False): - logger.info(f"{target_name} 秒传完成!") + logger.info(f"【阿里云盘】{target_name} 秒传完成!") return self.get_item(target_path) # 2. 准备分片上传参数 - file_id = create_res['file_id'] - upload_id = create_res['upload_id'] - part_info_list = create_res['part_info_list'] - file_size = local_path.stat().st_size + file_id = create_res.get('file_id') + upload_id = create_res.get('upload_id') + part_info_list = create_res.get('part_info_list') uploaded_parts = set() # 3. 获取已上传分片 @@ -614,48 +661,79 @@ class AliPan(StorageBase, metaclass=Singleton): for part in uploaded_info.get('uploaded_parts', []): uploaded_parts.add(part['part_number']) - # 4. 遍历并上传分片 + # 4. 初始化进度条 + logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path},分片数:{len(part_info_list)}") + progress_bar = tqdm( + total=file_size, + unit='B', + unit_scale=True, + desc="上传进度", + ascii=True + ) + + # 5. 分片上传循环 with open(local_path, 'rb') as f: for part_info in part_info_list: part_num = part_info['part_number'] - if part_num in uploaded_parts: - logger.info(f"分片 {part_num} 已存在,跳过上传") - continue - # 计算分片偏移量 + # 计算分片参数 start = (part_num - 1) * chunk_size end = min(start + chunk_size, file_size) + current_chunk_size = end - start + + # 更新进度条(已存在的分片) + if part_num in uploaded_parts: + progress_bar.update(current_chunk_size) + continue + + # 准备分片数据 f.seek(start) - data = f.read(end - start) + data = f.read(current_chunk_size) - # 尝试上传分片 - response = self._upload_part(upload_url=part_info['upload_url'], data=data) - if response.status_code == 200: - logger.info(f"分片 {part_num} 上传成功") - uploaded_parts.add(part_num) - else: - # 刷新地址后重试 - logger.info(f"分片 {part_num} 上传失败,刷新地址...") - new_urls = self._refresh_upload_urls(drive_id=target_dir.drive_id, file_id=file_id, - upload_id=upload_id, - part_numbers=[part_num]) - if new_urls: - new_url = new_urls[0]['upload_url'] - response = self._upload_part(upload_url=new_url, data=data) - if response.status_code == 200: - logger.info(f"分片 {part_num} 重传成功") - uploaded_parts.add(part_num) + # 上传分片(带重试逻辑) + success = False + for attempt in range(3): # 最大重试次数 + try: + # 获取当前上传地址(可能刷新) + if attempt > 0: + new_urls = self._refresh_upload_urls(drive_id=target_dir.drive_id, file_id=file_id, + upload_id=upload_id, part_numbers=[part_num]) + upload_url = new_urls[0]['upload_url'] else: - raise Exception(f"分片 {part_num} 上传失败: {response.text}") - else: - raise Exception("无法获取新的上传地址") + upload_url = part_info['upload_url'] - # 5. 完成上传 + # 执行上传 + logger.info( + f"【阿里云盘】开始 第{attempt + 1}次 上传 {target_name} 分片 {part_num} ...") + response = self._upload_part(upload_url=upload_url, data=data) + if response is None: + continue + if response.status_code == 200: + success = True + break + else: + logger.warn( + f"【阿里云盘】{target_name} 分片 {part_num} 第 {attempt + 1} 次上传失败:{response.text}!") + except Exception as e: + logger.warn(f"【阿里云盘】{target_name} 分片 {part_num} 上传异常: {str(e)}!") + + # 处理上传结果 + if success: + uploaded_parts.add(part_num) + progress_bar.update(current_chunk_size) + else: + raise Exception(f"【阿里云盘】{target_name} 分片 {part_num} 上传失败!") + + # 6. 关闭进度条 + if progress_bar: + progress_bar.close() + + # 7. 完成上传 result = self._complete_upload(drive_id=target_dir.drive_id, file_id=file_id, upload_id=upload_id) if not result: - raise Exception("完成上传失败!") + raise Exception("【阿里云盘】完成上传失败!") if result.get("code"): - logger.warn(f"{target_name} 上传失败:{result.get('message')}") + logger.warn(f"【阿里云盘】{target_name} 上传失败:{result.get('message')}!") return self.__get_fileitem(result) def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]: diff --git a/app/modules/filemanager/storages/u115.py b/app/modules/filemanager/storages/u115.py index fca8d18b..eda7edb0 100644 --- a/app/modules/filemanager/storages/u115.py +++ b/app/modules/filemanager/storages/u115.py @@ -11,6 +11,7 @@ import oss2 import requests from oss2 import SizedFileAdapter, determine_part_size from oss2.models import PartInfo +from tqdm import tqdm from app import schemas from app.core.config import settings @@ -166,7 +167,7 @@ class U115Pan(StorageBase, metaclass=Singleton): 确认登录后,获取相关token """ if not self._auth_state: - raise Exception("请先生成二维码") + raise Exception("【115】请先生成二维码") resp = self.session.post( "https://passportapi.115.com/open/deviceCodeToToken", data={ @@ -398,16 +399,6 @@ class U115Pan(StorageBase, metaclass=Singleton): 实现带秒传、断点续传和二次认证的文件上传 """ - def progress_callback(consumed_bytes: int, total_bytes: int): - """ - 上传进度回调 - """ - progress = round(consumed_bytes / total_bytes * 100) - if round(progress, -1) != self._last_progress: - logger.info(f"【115】已上传: {StringUtils.str_filesize(consumed_bytes)}" - f" / {StringUtils.str_filesize(total_bytes)}, 进度: {progress}%") - self._last_progress = round(progress, -1) - def encode_callback(cb: dict): """ 回调参数Base64编码函数 @@ -559,9 +550,19 @@ class U115Pan(StorageBase, metaclass=Singleton): # 补充参数 logger.debug(f"【115】上传 Step 6 回调参数:{callback_dict} {callback_var_dict}") # 填写不能包含Bucket名称在内的Object完整路径,例如exampledir/exampleobject.txt。 - # determine_part_size方法用于确定分片大小,设置分片大小为 1GB - part_size = determine_part_size(file_size, preferred_size=1 * 1024 * 1024 * 1024) + # determine_part_size方法用于确定分片大小,设置分片大小为 100M + part_size = determine_part_size(file_size, preferred_size=100 * 1024 * 1024) + + # 初始化进度条 logger.info(f"【115】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}") + progress_bar = tqdm( + total=file_size, + unit='B', + unit_scale=True, + desc="上传进度", + ascii=True + ) + # 初始化分片 upload_id = bucket.init_multipart_upload(object_name, params={ @@ -578,12 +579,18 @@ class U115Pan(StorageBase, metaclass=Singleton): # 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。 logger.info(f"【115】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}") result = bucket.upload_part(object_name, upload_id, part_number, - data=SizedFileAdapter(fileobj, num_to_upload), - progress_callback=progress_callback) + data=SizedFileAdapter(fileobj, num_to_upload)) parts.append(PartInfo(part_number, result.etag)) logger.info(f"【115】{target_name} 分片 {part_number} 上传完成") offset += num_to_upload part_number += 1 + # 更新进度 + progress_bar.update(num_to_upload) + + # 关闭进度条 + if progress_bar: + progress_bar.close() + # 请求头 headers = { 'X-oss-callback': encode_callback(callback_dict), diff --git a/requirements.in b/requirements.in index 9896ed44..228e4109 100644 --- a/requirements.in +++ b/requirements.in @@ -68,4 +68,6 @@ redis~=5.2.1 async_timeout~=5.0.1; python_full_version < "3.11.3" packaging~=24.2 cf_clearance~=0.31.0 -oss2~=2.19.1 \ No newline at end of file +oss2~=2.19.1 +tqdm~=4.67.1 +setuptools~=65.5.0 \ No newline at end of file