fix alipan upload

This commit is contained in:
jxxghp
2025-03-29 10:26:59 +08:00
parent 9b7d57a853
commit 7ffafb49c4

View File

@@ -449,12 +449,214 @@ class AliPan(StorageBase, metaclass=Singleton):
self._id_cache[str(new_path)] = (resp.get("drive_id"), resp.get("file_id"))
return self.get_item(new_path)
@staticmethod
def _calculate_pre_hash(file_path: Path):
"""
计算文件前1KB的SHA1作为pre_hash
"""
sha1 = hashlib.sha1()
with open(file_path, 'rb') as f:
data = f.read(1024)
sha1.update(data)
return sha1.hexdigest()
@staticmethod
def _calculate_content_hash(file_path: Path):
"""
计算整个文件的SHA1作为content_hash
"""
sha1 = hashlib.sha1()
with open(file_path, 'rb') as f:
while True:
chunk = f.read(8192)
if not chunk:
break
sha1.update(chunk)
return sha1.hexdigest()
def _create_file(self, drive_id: str, parent_file_id: str,
file_name: str, file_path: Path, check_name_mode="auto_rename",
chunk_size: int = 1 * 1024 * 1024 * 1024):
"""
创建文件请求,尝试秒传
"""
file_size = file_path.stat().st_size
pre_hash = self._calculate_pre_hash(file_path)
content_hash = self._calculate_content_hash(file_path)
num_parts = (file_size + chunk_size - 1) // chunk_size
# 构建分片信息
part_info_list = [{"part_number": i + 1} for i in range(num_parts)]
data = {
"drive_id": drive_id,
"parent_file_id": parent_file_id,
"name": file_name,
"type": "file",
"check_name_mode": check_name_mode,
"size": file_size,
"pre_hash": pre_hash,
"content_hash": content_hash,
"content_hash_name": "sha1",
"part_info_list": part_info_list
}
resp = self._request_api(
"POST",
"/adrive/v1.0/openFile/create",
json=data
)
if not resp:
raise Exception("创建文件失败!")
if resp.get("code"):
raise Exception(resp.get("message"))
return resp
def _refresh_upload_urls(self, drive_id: str, file_id: str, upload_id: str, part_numbers: List[int]):
"""
刷新分片上传地址
"""
data = {
"drive_id": drive_id,
"file_id": file_id,
"upload_id": upload_id,
"part_info_list": [{"part_number": num} for num in part_numbers]
}
resp = self._request_api(
"POST",
"/adrive/v1.0/openFile/getUploadUrl",
json=data
)
if not resp:
raise Exception("刷新分片上传地址失败!")
if resp.get("code"):
raise Exception(resp.get("message"))
return resp.get('part_info_list', [])
@staticmethod
def _upload_part(upload_url: str, data: bytes):
"""
上传单个分片
"""
headers = {
'Content-Length': str(len(data)),
'Content-Type': 'application/octet-stream'
}
response = requests.put(upload_url, data=data, headers=headers)
return response
def _list_uploaded_parts(self, drive_id: str, file_id: str, upload_id: str) -> dict:
"""
获取已上传分片列表
"""
data = {
"drive_id": drive_id,
"file_id": file_id,
"upload_id": upload_id
}
resp = self._request_api(
"POST",
"/adrive/v1.0/openFile/listUploadedParts",
json=data
)
if not resp:
raise Exception("获取已上传分片失败!")
if resp.get("code"):
raise Exception(resp.get("message"))
return resp
def _complete_upload(self, drive_id: str, file_id: str, upload_id: str):
"""标记上传完成"""
data = {
"drive_id": drive_id,
"file_id": file_id,
"upload_id": upload_id
}
resp = self._request_api(
"POST",
"/adrive/v1.0/openFile/complete",
json=data
)
if not resp:
raise Exception("完成上传失败!")
if resp.get("code"):
raise Exception(resp.get("message"))
return resp
def upload(self, target_dir: schemas.FileItem, local_path: Path,
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
"""
TODO 文件上传
文件上传:分片、支持秒
"""
pass
target_name = new_name or local_path.name
target_path = Path(target_dir.path) / target_name
# 1. 创建文件并检查秒传
chunk_size = 1 * 1024 * 1024 * 1024
create_res = self._create_file(drive_id=target_dir.drive_id,
parent_file_id=target_dir.fileid,
file_name=new_name,
file_path=local_path,
chunk_size=chunk_size)
if create_res.get('rapid_upload', False):
logger.info(f"{target_name} 秒传完成!")
return self.get_item(target_path)
# 2. 准备分片上传参数
file_id = create_res['file_id']
upload_id = create_res['upload_id']
part_info_list = create_res['part_info_list']
file_size = local_path.stat().st_size
uploaded_parts = set()
# 3. 获取已上传分片
uploaded_info = self._list_uploaded_parts(drive_id=target_dir.drive_id, file_id=file_id, upload_id=upload_id)
for part in uploaded_info.get('uploaded_parts', []):
uploaded_parts.add(part['part_number'])
# 4. 遍历并上传分片
with open(local_path, 'rb') as f:
for part_info in part_info_list:
part_num = part_info['part_number']
if part_num in uploaded_parts:
logger.info(f"分片 {part_num} 已存在,跳过上传")
continue
# 计算分片偏移量
start = (part_num - 1) * chunk_size
end = min(start + chunk_size, file_size)
f.seek(start)
data = f.read(end - start)
# 尝试上传分片
response = self._upload_part(upload_url=part_info['upload_url'], data=data)
if response.status_code == 200:
logger.info(f"分片 {part_num} 上传成功")
uploaded_parts.add(part_num)
else:
# 刷新地址后重试
logger.info(f"分片 {part_num} 上传失败,刷新地址...")
new_urls = self._refresh_upload_urls(drive_id=target_dir.drive_id, file_id=file_id,
upload_id=upload_id,
part_numbers=[part_num])
if new_urls:
new_url = new_urls[0]['upload_url']
response = self._upload_part(upload_url=new_url, data=data)
if response.status_code == 200:
logger.info(f"分片 {part_num} 重传成功")
uploaded_parts.add(part_num)
else:
raise Exception(f"分片 {part_num} 上传失败: {response.text}")
else:
raise Exception("无法获取新的上传地址")
# 5. 完成上传
result = self._complete_upload(drive_id=target_dir.drive_id, file_id=file_id, upload_id=upload_id)
if not result:
raise Exception("完成上传失败!")
if result.get("code"):
logger.warn(f"{target_name} 上传失败:{result.get('message')}")
return self.__get_fileitem(result)
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
"""