From 7ffafb49c417bcc102247554aa65b432610afdf0 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Sat, 29 Mar 2025 10:26:59 +0800 Subject: [PATCH] fix alipan upload --- app/modules/filemanager/storages/alipan.py | 206 ++++++++++++++++++++- 1 file changed, 204 insertions(+), 2 deletions(-) diff --git a/app/modules/filemanager/storages/alipan.py b/app/modules/filemanager/storages/alipan.py index 6f154030..52f92107 100644 --- a/app/modules/filemanager/storages/alipan.py +++ b/app/modules/filemanager/storages/alipan.py @@ -449,12 +449,214 @@ class AliPan(StorageBase, metaclass=Singleton): self._id_cache[str(new_path)] = (resp.get("drive_id"), resp.get("file_id")) return self.get_item(new_path) + @staticmethod + def _calculate_pre_hash(file_path: Path): + """ + 计算文件前1KB的SHA1作为pre_hash + """ + sha1 = hashlib.sha1() + with open(file_path, 'rb') as f: + data = f.read(1024) + sha1.update(data) + return sha1.hexdigest() + + @staticmethod + def _calculate_content_hash(file_path: Path): + """ + 计算整个文件的SHA1作为content_hash + """ + sha1 = hashlib.sha1() + with open(file_path, 'rb') as f: + while True: + chunk = f.read(8192) + if not chunk: + break + sha1.update(chunk) + return sha1.hexdigest() + + def _create_file(self, drive_id: str, parent_file_id: str, + file_name: str, file_path: Path, check_name_mode="auto_rename", + chunk_size: int = 1 * 1024 * 1024 * 1024): + """ + 创建文件请求,尝试秒传 + """ + file_size = file_path.stat().st_size + pre_hash = self._calculate_pre_hash(file_path) + content_hash = self._calculate_content_hash(file_path) + num_parts = (file_size + chunk_size - 1) // chunk_size + + # 构建分片信息 + part_info_list = [{"part_number": i + 1} for i in range(num_parts)] + + data = { + "drive_id": drive_id, + "parent_file_id": parent_file_id, + "name": file_name, + "type": "file", + "check_name_mode": check_name_mode, + "size": file_size, + "pre_hash": pre_hash, + "content_hash": content_hash, + "content_hash_name": "sha1", + "part_info_list": part_info_list + } + + resp = self._request_api( + "POST", + "/adrive/v1.0/openFile/create", + json=data + ) + if not resp: + raise Exception("创建文件失败!") + if resp.get("code"): + raise Exception(resp.get("message")) + return resp + + def _refresh_upload_urls(self, drive_id: str, file_id: str, upload_id: str, part_numbers: List[int]): + """ + 刷新分片上传地址 + """ + data = { + "drive_id": drive_id, + "file_id": file_id, + "upload_id": upload_id, + "part_info_list": [{"part_number": num} for num in part_numbers] + } + resp = self._request_api( + "POST", + "/adrive/v1.0/openFile/getUploadUrl", + json=data + ) + if not resp: + raise Exception("刷新分片上传地址失败!") + if resp.get("code"): + raise Exception(resp.get("message")) + return resp.get('part_info_list', []) + + @staticmethod + def _upload_part(upload_url: str, data: bytes): + """ + 上传单个分片 + """ + headers = { + 'Content-Length': str(len(data)), + 'Content-Type': 'application/octet-stream' + } + response = requests.put(upload_url, data=data, headers=headers) + return response + + def _list_uploaded_parts(self, drive_id: str, file_id: str, upload_id: str) -> dict: + """ + 获取已上传分片列表 + """ + data = { + "drive_id": drive_id, + "file_id": file_id, + "upload_id": upload_id + } + resp = self._request_api( + "POST", + "/adrive/v1.0/openFile/listUploadedParts", + json=data + ) + if not resp: + raise Exception("获取已上传分片失败!") + if resp.get("code"): + raise Exception(resp.get("message")) + return resp + + def _complete_upload(self, drive_id: str, file_id: str, upload_id: str): + """标记上传完成""" + data = { + "drive_id": drive_id, + "file_id": file_id, + "upload_id": upload_id + } + resp = self._request_api( + "POST", + "/adrive/v1.0/openFile/complete", + json=data + ) + if not resp: + raise Exception("完成上传失败!") + if resp.get("code"): + raise Exception(resp.get("message")) + return resp + def upload(self, target_dir: schemas.FileItem, local_path: Path, new_name: Optional[str] = None) -> Optional[schemas.FileItem]: """ - TODO 文件上传 + 文件上传:分片、支持秒传 """ - pass + target_name = new_name or local_path.name + target_path = Path(target_dir.path) / target_name + + # 1. 创建文件并检查秒传 + chunk_size = 1 * 1024 * 1024 * 1024 + create_res = self._create_file(drive_id=target_dir.drive_id, + parent_file_id=target_dir.fileid, + file_name=new_name, + file_path=local_path, + chunk_size=chunk_size) + if create_res.get('rapid_upload', False): + logger.info(f"{target_name} 秒传完成!") + return self.get_item(target_path) + + # 2. 准备分片上传参数 + file_id = create_res['file_id'] + upload_id = create_res['upload_id'] + part_info_list = create_res['part_info_list'] + file_size = local_path.stat().st_size + uploaded_parts = set() + + # 3. 获取已上传分片 + uploaded_info = self._list_uploaded_parts(drive_id=target_dir.drive_id, file_id=file_id, upload_id=upload_id) + for part in uploaded_info.get('uploaded_parts', []): + uploaded_parts.add(part['part_number']) + + # 4. 遍历并上传分片 + with open(local_path, 'rb') as f: + for part_info in part_info_list: + part_num = part_info['part_number'] + if part_num in uploaded_parts: + logger.info(f"分片 {part_num} 已存在,跳过上传") + continue + + # 计算分片偏移量 + start = (part_num - 1) * chunk_size + end = min(start + chunk_size, file_size) + f.seek(start) + data = f.read(end - start) + + # 尝试上传分片 + response = self._upload_part(upload_url=part_info['upload_url'], data=data) + if response.status_code == 200: + logger.info(f"分片 {part_num} 上传成功") + uploaded_parts.add(part_num) + else: + # 刷新地址后重试 + logger.info(f"分片 {part_num} 上传失败,刷新地址...") + new_urls = self._refresh_upload_urls(drive_id=target_dir.drive_id, file_id=file_id, + upload_id=upload_id, + part_numbers=[part_num]) + if new_urls: + new_url = new_urls[0]['upload_url'] + response = self._upload_part(upload_url=new_url, data=data) + if response.status_code == 200: + logger.info(f"分片 {part_num} 重传成功") + uploaded_parts.add(part_num) + else: + raise Exception(f"分片 {part_num} 上传失败: {response.text}") + else: + raise Exception("无法获取新的上传地址") + + # 5. 完成上传 + result = self._complete_upload(drive_id=target_dir.drive_id, file_id=file_id, upload_id=upload_id) + if not result: + raise Exception("完成上传失败!") + if result.get("code"): + logger.warn(f"{target_name} 上传失败:{result.get('message')}") + return self.__get_fileitem(result) def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]: """