fix alipan api

This commit is contained in:
jxxghp
2025-03-28 21:22:02 +08:00
parent b030317186
commit ac19b3b512
2 changed files with 150 additions and 371 deletions

View File

@@ -1,17 +1,11 @@
import base64
import hashlib
import json
import secrets
import threading
import time
from pathlib import Path
from typing import List, Dict, Optional, Tuple, Union
import oss2
import requests
from oss2 import SizedFileAdapter, determine_part_size
from oss2.models import PartInfo
from requests.packages import target
from app import schemas
from app.core.config import settings
@@ -48,7 +42,7 @@ class AliPan(StorageBase, metaclass=Singleton):
base_url = "https://openapi.alipan.com"
# CID和路径缓存
_id_cache: Dict[str, int] = {}
_id_cache: Dict[str, Tuple[str, str]] = {}
def __init__(self):
super().__init__()
@@ -158,6 +152,7 @@ class AliPan(StorageBase, metaclass=Singleton):
"refresh_time": int(time.time()),
**tokens
})
self.__get_drive_id()
return {"status": status, "tip": _status_text.get(status, "未知错误")}, ""
except Exception as e:
return {}, str(e)
@@ -204,6 +199,33 @@ class AliPan(StorageBase, metaclass=Singleton):
logger.warn(f"【阿里云盘】刷新 access_token 失败:{result.get('code')} - {result.get('message')}")
return result
def __get_drive_id(self):
"""
获取默认存储桶ID
"""
resp = self.session.post(
f"{self.base_url}/adrive/v1.0/user/getDriveInfo"
)
if resp is None:
logger.error("获取默认存储桶ID失败")
return None
result = resp.json()
if result.get("code"):
logger.warn(f"获取默认存储ID失败{result.get('code')} - {result.get('message')}")
return None
# 保存用户参数
"""
user_id string 是 用户ID具有唯一性
name string 是 昵称
avatar string 是 头像地址
default_drive_id string 是 默认drive
resource_drive_id string 否 资源库。用户选择了授权才会返回
backup_drive_id string 否 备份盘。用户选择了授权才会返回
"""
conf = self.get_conf()
conf.update(result)
self.set_config(conf)
def _request_api(self, method: str, endpoint: str,
result_key: Optional[str] = None, **kwargs) -> Optional[Union[dict, list]]:
"""
@@ -238,87 +260,95 @@ class AliPan(StorageBase, metaclass=Singleton):
return ret_data.get(result_key)
return ret_data
def _path_to_id(self, path: str) -> int:
def _path_to_id(self, drive_id: str, path: str) -> Tuple[str, str]:
"""
路径转FID(带缓存机制)
路径转drive_id, file_id(带缓存机制)
"""
# 根目录
if path == "/":
return 0
return drive_id, "root"
if len(path) > 1 and path.endswith("/"):
path = path[:-1]
# 检查缓存
if path in self._id_cache:
return self._id_cache[path]
# 逐级查找缓存
current_id = 0
parent_path = "/"
file_id = "root"
file_path = "/"
for p in Path(path).parents:
if str(p) in self._id_cache:
parent_path = str(p)
current_id = self._id_cache[parent_path]
file_path = str(p)
file_id = self._id_cache[file_path]
break
# 计算相对路径
rel_path = Path(path).relative_to(parent_path)
rel_path = Path(path).relative_to(file_path)
for part in Path(rel_path).parts:
offset = 0
find_part = False
next_marker = None
while True:
resp = self._request_api(
"GET",
"/open/ufile/files",
"data",
params={"cid": current_id, "limit": 1000, "offset": offset, "cur": True, "show_dir": 1}
"/adrive/v1.0/openFile/list",
params={
"drive_id": drive_id,
"limit": 100,
"marker": next_marker,
"parent_file_id": file_id,
}
)
if not resp:
break
for item in resp:
if item["fn"] == part:
current_id = item["fid"]
for item in resp.get("items", []):
if item["name"] == part:
file_id = item["file_id"]
find_part = True
break
if find_part:
break
if len(resp) < 1000:
if len(resp.get("items")) < 100:
break
offset += len(resp)
if not find_part:
raise FileNotFoundError(f"【阿里云盘】{path} 不存在")
if not current_id:
if file_id == "root":
raise FileNotFoundError(f"【阿里云盘】{path} 不存在")
# 缓存路径
self._id_cache[path] = current_id
return current_id
self._id_cache[path] = (drive_id, file_id)
return drive_id, file_id
def _id_to_path(self, fid: int) -> str:
def __get_fileitem(self, fileinfo: dict, parent: str = "/") -> schemas.FileItem:
"""
CID转路径带双向缓存
获取文件信息
"""
# 根目录特殊处理
if fid == 0:
return "/"
# 优先从缓存读取
if fid in self._id_cache.values():
return next(k for k, v in self._id_cache.items() if v == fid)
# 从API获取当前节点信息
detail = self._request_api(
"GET",
"/open/folder/get_info",
"data",
params={
"file_id": fid
}
)
# 处理可能的空数据(如已删除文件)
if not detail:
raise FileNotFoundError(f"【阿里云盘】{fid} 不存在")
paths = detail["paths"]
path_parts = [item["file_name"] for item in paths]
# 构建完整路径
full_path = "/" + "/".join(reversed(path_parts))
# 缓存新路径
self._id_cache[full_path] = fid
return full_path
if not fileinfo:
return schemas.FileItem()
if fileinfo.get("type") == "folder":
return schemas.FileItem(
storage=self.schema.value,
fileid=fileinfo.get("file_id"),
parent_fileid=fileinfo.get("parent_file_id"),
type="dir",
path=f"{parent}{fileinfo.get('name')}" + "/",
name=fileinfo.get("name"),
basename=fileinfo.get("name"),
size=fileinfo.get("size"),
modify_time=StringUtils.str_to_timestamp(fileinfo.get("updated_at")),
drive_id=fileinfo.get("drive_id"),
)
else:
return schemas.FileItem(
storage=self.schema.value,
fileid=fileinfo.get("file_id"),
parent_fileid=fileinfo.get("parent_file_id"),
type="file",
path=f"{parent}{fileinfo.get('name')}",
name=fileinfo.get("name"),
basename=Path(fileinfo.get("name")).stem,
size=fileinfo.get("size"),
extension=fileinfo.get("file_extension"),
modify_time=StringUtils.str_to_timestamp(fileinfo.get("updated_at")),
thumbnail=fileinfo.get("thumbnail"),
drive_id=fileinfo.get("drive_id"),
)
@staticmethod
def _calc_sha1(filepath: Path, size: Optional[int] = None) -> str:
@@ -343,330 +373,92 @@ class AliPan(StorageBase, metaclass=Singleton):
"""
目录遍历实现
"""
if fileitem.type == "file":
item = self.detail(fileitem)
if item:
return [item]
return []
cid = self._path_to_id(fileitem.path)
if fileitem.path == "/":
parent_file_id = "root"
else:
parent_file_id = fileitem.fileid
items = []
offset = 0
next_marker = None
while True:
resp = self._request_api(
"GET",
"/open/ufile/files",
"data",
params={"cid": cid, "limit": 1000, "offset": offset, "cur": True, "show_dir": 1}
"/adrive/v1.0/openFile/list",
params={
"drive_id": fileitem.drive_id,
"limit": 100,
"marker": next_marker,
"parent_file_id": parent_file_id,
}
)
if resp is None:
raise FileNotFoundError(f"【阿里云盘】{fileitem.path} 检索出错!")
if not resp:
break
for item in resp:
next_marker = resp.get("next_marker")
for item in resp.get("items", []):
# 更新缓存
path = f"{fileitem.path}{item['fn']}"
path = f"{fileitem.path}{item.get('name')}"
self._id_cache[path] = item["fid"]
file_path = path + ("/" if item["fc"] == "0" else "")
items.append(schemas.FileItem(
storage=self.schema.value,
fileid=item["fid"],
name=item["fn"],
basename=Path(item["fn"]).stem,
extension=item["ico"] if item["fc"] == "1" else None,
type="dir" if item["fc"] == "0" else "file",
path=file_path,
size=item["fs"] if item["fc"] == "1" else None,
modify_time=item["upt"],
pickcode=item["pc"]
))
if len(resp) < 1000:
items.append(self.__get_fileitem(item))
if len(resp.get("items")) < 100:
break
offset += len(resp)
return items
def create_folder(self, parent_item: schemas.FileItem, name: str) -> Optional[schemas.FileItem]:
"""
创建目录
"""
parent_id = self._path_to_id(parent_item.path)
new_path = Path(parent_item.path) / name
resp = self._request_api(
"POST",
"/open/folder/add",
"/adrive/v1.0/openFile/create",
json={
"pid": parent_id,
"file_name": name
"drive_id": parent_item.drive_id,
"parent_file_id": parent_item.fileid,
"name": name,
"type": "folder"
}
)
if not resp:
return None
if not resp.get("state"):
if resp.get("code") == 20004:
# 目录已存在
return self.get_item(new_path)
logger.warn(f"【阿里云盘】创建目录失败: {resp.get('error')}")
if resp.get("code"):
logger.warn(f"【阿里云盘】创建目录失败: {resp.get('message')}")
return None
# 缓存新目录
self._id_cache[str(new_path)] = resp["data"]["file_id"]
return schemas.FileItem(
storage=self.schema.value,
fileid=resp["data"]["file_id"],
path=str(new_path) + "/",
name=name,
basename=name,
type="dir",
modify_time=int(time.time())
)
new_path = Path(parent_item.path) / name
self._id_cache[str(new_path)] = (resp.get("drive_id"), resp.get("file_id"))
return self.get_item(new_path)
def upload(self, target_dir: schemas.FileItem, local_path: Path,
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
"""
实现带秒传、断点续传和二次认证的文件上传
TODO 文件上传
"""
def progress_callback(consumed_bytes: int, total_bytes: int):
"""
上传进度回调
"""
progress = round(consumed_bytes / total_bytes * 100)
if round(progress, -1) != self._last_progress:
logger.info(f"【阿里云盘】已上传: {StringUtils.str_filesize(consumed_bytes)}"
f" / {StringUtils.str_filesize(total_bytes)}, 进度: {progress}%")
self._last_progress = round(progress, -1)
def encode_callback(cb: dict):
"""
回调参数Base64编码函数
"""
return oss2.utils.b64encode_as_string(json.dumps(cb).strip())
target_name = new_name or local_path.name
target_path = str(Path(target_dir.path) / target_name)
# 计算文件特征值
file_size = local_path.stat().st_size
file_sha1 = self._calc_sha1(local_path)
file_preid = self._calc_sha1(local_path, 128 * 1024 * 1024)
# 获取目标目录CID
target_cid = self._path_to_id(target_dir.path)
target_param = f"U_1_{target_cid}"
# Step 1: 初始化上传
init_data = {
"file_name": target_name,
"file_size": file_size,
"target": target_param,
"fileid": file_sha1,
"preid": file_preid
}
init_resp = self._request_api(
"POST",
"/open/upload/init",
json=init_data
)
if not init_resp:
return None
if not init_resp.get("state"):
logger.warn(f"【阿里云盘】初始化上传失败: {init_resp.get('error')}")
return None
# 结果
init_result = init_resp.get("data")
logger.debug(f"【阿里云盘】上传 Step 1 初始化结果: {init_result}")
file_id = init_result.get("file_id")
# 回调信息
bucket_name = init_result.get("bucket")
object_name = init_result.get("object")
callback = init_result.get("callback")
# 二次认证信息
sign_check = init_result.get("sign_check")
pick_code = init_result.get("pick_code")
sign_key = init_result.get("sign_key")
# Step 2: 处理二次认证
if init_result.get("code") in [700, 701] and sign_check:
sign_checks = sign_check.split("-")
start = int(sign_checks[0])
end = int(sign_checks[1])
# 计算指定区间的SHA1
# sign_check (用下划线隔开,截取上传文内容的sha1(单位是byte): "2392148-2392298"
with open(local_path, "rb") as f:
# 取2392148-2392298之间的内容(包含2392148、2392298)的sha1
f.seek(start)
chunk = f.read(end - start + 1)
sign_val = hashlib.sha1(chunk).hexdigest().upper()
# 重新初始化请求
# sign_keysign_val(根据sign_check计算的值大写的sha1值)
init_data.update({
"pick_code": pick_code,
"sign_key": sign_key,
"sign_val": sign_val
})
init_resp = self._request_api(
"POST",
"/open/upload/init",
json=init_data
)
if not init_resp:
return None
# 二次认证结果
init_result = init_resp.get("data")
logger.debug(f"【阿里云盘】上传 Step 2 二次认证结果: {init_result}")
if not pick_code:
pick_code = init_result.get("pick_code")
if not bucket_name:
bucket_name = init_result.get("bucket")
if not object_name:
object_name = init_result.get("object")
if not file_id:
file_id = init_result.get("file_id")
if not callback:
callback = init_result.get("callback")
# Step 3: 秒传
if init_result.get("status") == 2:
logger.info(f"【阿里云盘】{target_name} 秒传成功")
return schemas.FileItem(
storage=self.schema.value,
fileid=file_id,
path=target_path,
name=target_name,
basename=Path(target_name).stem,
extension=Path(target_name).suffix[1:],
size=file_size,
type="file",
pickcode=pick_code,
modify_time=int(time.time())
)
# Step 4: 获取上传凭证
token_resp = self._request_api(
"GET",
"/open/upload/get_token",
"data"
)
if not token_resp:
logger.warn("【阿里云盘】获取上传凭证失败")
return None
logger.debug(f"【阿里云盘】上传 Step 4 获取上传凭证结果: {token_resp}")
# 上传凭证
endpoint = token_resp.get("endpoint")
AccessKeyId = token_resp.get("AccessKeyId")
AccessKeySecret = token_resp.get("AccessKeySecret")
SecurityToken = token_resp.get("SecurityToken")
# Step 5: 断点续传
resume_resp = self._request_api(
"POST",
"/open/upload/resume",
"data",
json={
"file_size": file_size,
"target": target_param,
"fileid": file_sha1,
"pick_code": pick_code
}
)
if resume_resp:
logger.debug(f"【阿里云盘】上传 Step 5 断点续传结果: {resume_resp}")
if resume_resp.get("callback"):
callback = resume_resp["callback"]
# Step 6: 对象存储上传
auth = oss2.StsAuth(
access_key_id=AccessKeyId,
access_key_secret=AccessKeySecret,
security_token=SecurityToken
)
bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa
# 处理oss请求回调
callback_dict = json.loads(callback.get("callback"))
callback_var_dict = json.loads(callback.get("callback_var"))
# 补充参数
logger.debug(f"【阿里云盘】上传 Step 6 回调参数:{callback_dict} {callback_var_dict}")
# 填写不能包含Bucket名称在内的Object完整路径例如exampledir/exampleobject.txt。
# determine_part_size方法用于确定分片大小设置分片大小为 1GB
part_size = determine_part_size(file_size, preferred_size=1 * 1024 * 1024 * 1024)
logger.info(f"【阿里云盘】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}")
# 初始化分片
upload_id = bucket.init_multipart_upload(object_name,
params={
"encoding-type": "url",
"sequential": ""
}).upload_id
parts = []
# 逐个上传分片
with open(local_path, 'rb') as fileobj:
part_number = 1
offset = 0
while offset < file_size:
num_to_upload = min(part_size, file_size - offset)
# 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。
logger.info(f"【阿里云盘】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}")
result = bucket.upload_part(object_name, upload_id, part_number,
json=SizedFileAdapter(fileobj, num_to_upload),
progress_callback=progress_callback)
parts.append(PartInfo(part_number, result.etag))
logger.info(f"【阿里云盘】{target_name} 分片 {part_number} 上传完成")
offset += num_to_upload
part_number += 1
# 请求头
headers = {
'X-oss-callback': encode_callback(callback_dict),
'x-oss-callback-var': encode_callback(callback_var_dict),
'x-oss-forbid-overwrite': 'false'
}
try:
result = bucket.complete_multipart_upload(object_name, upload_id, parts,
headers=headers)
if result.status == 200:
logger.debug(f"【阿里云盘】上传 Step 6 回调结果:{result.resp.response.json()}")
logger.info(f"【阿里云盘】{target_name} 上传成功")
else:
logger.warn(f"【阿里云盘】{target_name} 上传失败,错误码: {result.status}")
return None
except oss2.exceptions.OssError as e:
if e.code == "FileAlreadyExists":
logger.warn(f"【阿里云盘】{target_name} 已存在")
else:
logger.error(f"【阿里云盘】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}")
return None
# 返回结果
return schemas.FileItem(
storage=self.schema.value,
fileid=file_id,
type="file",
path=target_path,
name=target_name,
basename=Path(target_name).stem,
extension=Path(target_name).suffix[1:],
size=file_size,
pickcode=pick_code,
modify_time=int(time.time())
)
pass
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
"""
带限速处理的下载
"""
detail = self.get_item(Path(fileitem.path))
local_path = path or settings.TEMP_PATH / fileitem.name
download_info = self._request_api(
"POST",
"/open/ufile/downurl",
"data",
"/adrive/v1.0/openFile/getDownloadUrl",
json={
"pick_code": detail.pickcode
"drive_id": fileitem.drive_id,
"file_id": fileitem.fileid,
}
)
if not download_info:
return None
download_url = list(download_info.values())[0].get("url", {}).get("url")
download_url = download_info.get("url")
local_path = path or settings.TEMP_PATH / fileitem.name
with self.session.get(download_url, stream=True) as r:
r.raise_for_status()
with open(local_path, "wb") as f:
@@ -684,9 +476,10 @@ class AliPan(StorageBase, metaclass=Singleton):
try:
self._request_api(
"POST",
"/open/ufile/delete",
"/adrive/v1.0/openFile/recyclebin/trash",
json={
"file_ids": self._path_to_id(fileitem.path)
"drive_id": fileitem.drive_id,
"file_id": fileitem.fileid
}
)
return True
@@ -697,58 +490,46 @@ class AliPan(StorageBase, metaclass=Singleton):
"""
重命名文件/目录
"""
file_id = self._path_to_id(fileitem.path)
resp = self._request_api(
"POST",
"/open/ufile/update",
"/adrive/v1.0/openFile/update",
json={
"file_id": file_id,
"file_name": name
"drive_id": fileitem.drive_id,
"file_id": fileitem.fileid,
"name": name
}
)
if not resp:
return False
if resp["state"]:
if fileitem.path in self._id_cache:
del self._id_cache[fileitem.path]
for key in list(self._id_cache.keys()):
if key.startswith(fileitem.path):
del self._id_cache[key]
new_path = Path(fileitem.path).parent / name
self._id_cache[str(new_path)] = file_id
return True
return False
if resp.get("code"):
logger.warn(f"【阿里云盘】重命名失败: {resp.get('message')}")
return False
if fileitem.path in self._id_cache:
del self._id_cache[fileitem.path]
for key in list(self._id_cache.keys()):
if key.startswith(fileitem.path):
del self._id_cache[key]
self._id_cache[str(Path(fileitem.path).parent / name)] = (resp.get("drive_id"), resp.get("file_id"))
return True
def get_item(self, path: Path) -> Optional[schemas.FileItem]:
"""
获取指定路径的文件/目录项
"""
try:
file_id = self._path_to_id(str(path))
if not file_id:
return None
resp = self._request_api(
"GET",
"/open/folder/get_info",
"data",
"/adrive/v1.0/openFile/get_by_path",
params={
"file_id": file_id
"file_path": str(path)
}
)
if not resp:
return None
return schemas.FileItem(
storage=self.schema.value,
fileid=resp["file_id"],
path=str(path) + ("/" if resp["file_category"] == "1" else ""),
type="file" if resp["file_category"] == "1" else "dir",
name=resp["file_name"],
basename=Path(resp["file_name"]).stem,
extension=Path(resp["file_name"]).suffix[1:],
pickcode=resp["pick_code"],
size=StringUtils.num_filesize(resp['size']) if resp["file_category"] == "1" else None,
modify_time=resp["utime"]
)
if resp.get("code"):
logger.debug(f"【阿里云盘】获取文件信息失败: {resp.get('message')}")
return None
return self.__get_fileitem(resp)
except Exception as e:
logger.debug(f"【阿里云盘】获取文件信息失败: {str(e)}")
return None
@@ -797,15 +578,13 @@ class AliPan(StorageBase, metaclass=Singleton):
"""
企业级复制实现(支持目录递归复制)
"""
src_fid = self._path_to_id(fileitem.path)
dest_cid = self._path_to_id(str(path))
dest_cid = self._path_to_id(fileitem.drive_id, str(path))
resp = self._request_api(
"POST",
"/adrive/v1.0/openFile/copy",
json={
"drive_id": fileitem.drive_id,
"file_id": src_fid,
"file_id": fileitem.fileid,
"to_drive_id": fileitem.drive_id,
"to_parent_file_id": dest_cid
}
@@ -822,15 +601,15 @@ class AliPan(StorageBase, metaclass=Singleton):
# 更新缓存
del self._id_cache[fileitem.path]
rename_new_path = Path(path) / new_name
self._id_cache[str(rename_new_path)] = int(new_file.fileid)
self._id_cache[str(rename_new_path)] = (resp.get("drive_id"), resp.get("file_id"))
return True
def move(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
原子性移动操作实现
"""
src_fid = self._path_to_id(fileitem.path)
target_id = self._path_to_id(str(path))
src_fid = fileitem.fileid
target_id = self._path_to_id(fileitem.drive_id, str(path))
resp = self._request_api(
"POST",
@@ -850,7 +629,7 @@ class AliPan(StorageBase, metaclass=Singleton):
# 更新缓存
del self._id_cache[fileitem.path]
rename_new_path = Path(path) / new_name
self._id_cache[str(rename_new_path)] = src_fid
self._id_cache[str(rename_new_path)] = (resp.get("drive_id"), resp.get("file_id"))
return True
def link(self, fileitem: schemas.FileItem, target_file: Path) -> bool:

View File

@@ -1,4 +1,4 @@
from typing import Optional
from typing import Optional, Union
from pydantic import BaseModel, Field
@@ -23,7 +23,7 @@ class FileItem(BaseModel):
# 子节点
children: Optional[list] = Field(default_factory=list)
# ID
fileid: Optional[str] = None
fileid: Optional[Union[str, int]] = None
# 父ID
parent_fileid: Optional[str] = None
# 缩略图