fix 115 open api

This commit is contained in:
jxxghp
2025-03-21 18:53:26 +08:00
parent ea61599589
commit f2cbb8d2f7
3 changed files with 314 additions and 205 deletions

View File

@@ -109,6 +109,8 @@ class ConfigModel(BaseModel):
FANART_ENABLE: bool = True
# Fanart API Key
FANART_API_KEY: str = "d2d31f9ecabea050fc7d68aa3146015f"
# 115 AppId
U115_APP_ID: str = ""
# 元数据识别缓存过期时间(小时)
META_CACHE_EXPIRE: int = 0
# 电视剧动漫的分类genre_ids

View File

@@ -1,13 +1,14 @@
import base64
import hashlib
import os
import secrets
import time
from pathlib import Path
from typing import List, Dict, Optional, Tuple
from typing import List, Dict, Optional, Tuple, Union
import qrcode
import requests
from app import schemas
from app.core.config import settings
from app.log import logger
from app.modules.filemanager import StorageBase
from app.schemas.types import StorageSchema
@@ -28,14 +29,14 @@ class U115Pan(StorageBase, metaclass=Singleton):
"copy": "复制"
}
# 访问token
access_token = None
# 验证参数
_auth_state = {}
# 基础url
base_url = "https://api.115.com"
base_url = "https://proapi.115.com"
# CID和路径缓存
_cid_cache: Dict[str | int, str | int] = {}
_id_cache: Dict[str, int] = {}
def __init__(self):
super().__init__()
@@ -46,84 +47,171 @@ class U115Pan(StorageBase, metaclass=Singleton):
"""
初始化带速率限制的会话
"""
adapter = requests.adapters.HTTPAdapter(
max_retries=3,
pool_connections=10,
pool_maxsize=50
)
self.session.mount('https://', adapter)
self.session.headers.update({
"User-Agent": "W115Storage/2.0",
"Accept-Encoding": "gzip, deflate"
"Accept-Encoding": "gzip, deflate",
"Content-Type": "application/x-www-form-urlencoded"
})
@property
def access_token(self) -> Optional[str]:
"""
访问token
"""
tokens = self.get_conf()
refresh_token = tokens.get("refresh_token")
if not refresh_token:
return None
expires_in = tokens.get("expires_in", 0)
refresh_time = tokens.get("refresh_time", 0)
if expires_in and refresh_time + expires_in >= int(time.time()):
tokens = self.__refresh_access_token(refresh_token)
if tokens:
self.set_config({
"refresh_time": int(time.time()),
**tokens
})
return tokens.get("access_token")
def generate_qrcode(self) -> Tuple[dict, str]:
"""
生成设备授权二维码
实现PKCE规范的设备授权二维码生成
"""
# 生成PKCE参数
code_verifier = secrets.token_urlsafe(96)[:128]
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).decode().replace("=", "")
# 请求设备码
resp = self.session.post(
"https://passportapi.115.com/open/authDeviceCode",
data={
"client_id": settings.U115_APP_ID,
"code_challenge": code_challenge,
"code_challenge_method": "sha256"
}
)
if resp is None:
return {}, "网络错误"
result = resp.json()
if result.get("code") != 0:
return {}, result.get("message")
# 持久化验证参数
self._auth_state = {
"code_verifier": code_verifier,
"uid": result["data"]["uid"],
"time": result["data"]["time"],
"sign": result["data"]["sign"]
}
# 生成二维码内容
return {
"codeContent": result['data']['qrcode']
}, ""
def __get_access_token(self) -> dict:
"""
确认登录后获取相关token
"""
if not self._auth_state:
raise Exception("请先调用生成二维码方法")
resp = self.session.post(
"https://passportapi.115.com/open/deviceCodeToToken",
data={
"uid": self._auth_state["uid"],
"code_verifier": self._auth_state["code_verifier"]
}
)
if resp is None:
raise Exception("获取 access_token 失败")
result = resp.json()
if result.get("code") != 0:
raise Exception(result.get("message"))
return result["data"]
def __refresh_access_token(self, refresh_token: str) -> dict:
"""
刷新access_token
"""
resp = self.session.post(
f"{self.base_url}/oauth/device",
data={"client_id": self.get_conf().get("app_id")}
).json()
qr = qrcode.make(f"115AUTH|{resp['device_code']}")
return resp, qr.png_as_base64_str()
"https://passportapi.115.com/open/refreshToken",
data={
"refresh_token": refresh_token
}
)
if resp is None:
raise Exception(f"刷新 access_token 失败refresh_token={refresh_token}")
result = resp.json()
if result.get("code") != 0:
raise Exception(result.get("message"))
return result.get("data")
def check_login(self, device_code: str) -> Optional[Dict]:
def check_login(self) -> Optional[Dict]:
"""
检查授权状态
改进的带PKCE校验的登录状态检查
"""
if not self._auth_state:
return {"status": -1, "tip": "生成二维码失败"}
try:
resp = self.session.post(f"{self.base_url}/oauth/token", data={
"grant_type": "device",
"device_code": device_code,
"client_secret": self.get_conf().get("app_secret")
}, timeout=10)
if resp.status_code == 200:
token_data = resp.json()
self.access_token = token_data["access_token"]
# 持久化配置
self.set_config({"access_token": self.access_token})
return {"status": "success"}
return {"status": "pending"}
except requests.exceptions.RequestException:
return {"status": "error"}
resp = self.session.post(
"https://passportapi.115.com/open/checkDeviceCode",
data={
"uid": self._auth_state["uid"],
"time": self._auth_state["time"],
"sign": self._auth_state["sign"]
}
)
if resp is None:
return {"status": -1, "tip": "网络错误"}
result = resp.json()
if result.get("code") != 0 or not result.get("data"):
return {"status": -1, "tip": result.get("message")}
if result["data"]["status"] == 2:
tokens = self.__get_access_token()
self.set_config({
"refresh_time": int(time.time()),
**tokens
})
return {"status": result["data"]["status"], "tip": result["data"]["msg"]}
except requests.exceptions.RequestException as e:
return {"status": -1, "tip": str(e)}
def init_storage(self):
"""
初始化存储连接
"""
if conf := self.get_conf():
self.access_token = conf.get("access_token")
self.session.headers.update({"Authorization": f"Bearer {self.access_token}"})
self.session.headers.update({
"Authorization": f"Bearer {self.access_token}"
})
def list(self, fileitem: schemas.FileItem) -> List[schemas.FileItem]:
"""
目录遍历实现
"""
cid = self._path_to_cid(fileitem.path)
cid = self._path_to_id(fileitem.path)
items = []
offset = 0
while True:
resp = self._request_api(
"GET", "/files",
"GET",
"/open/ufile/files",
"data",
params={"cid": cid, "limit": 1000, "offset": offset}
)
batch = resp["data"]
for item in batch:
path = self._cid_to_path(item["cid"])
items.append(schemas.FileItem(
path=path,
name=item["name"],
type="dir" if item["is_dir"] else "file",
size=item["size"],
modify_time=item["modified"]
))
self._cid_cache[path] = item["cid"] # 更新缓存
if len(batch) < 1000:
if not resp:
break
offset += len(batch)
for item in resp:
path = self._id_to_path(item.get("fid"))
items.append(schemas.FileItem(
fileid=item["fid"],
))
# 更新缓存
self._id_cache[path] = item["cid"]
if len(resp) < 1000:
break
offset += len(resp)
return items
@@ -131,17 +219,22 @@ class U115Pan(StorageBase, metaclass=Singleton):
"""
创建目录
"""
parent_cid = self._path_to_cid(parent_item.path)
parent_id = self._path_to_id(parent_item.path)
resp = self._request_api(
"POST", "/file/mkdir",
json={"cid": parent_cid, "name": name}
"POST",
"/open/folder/add",
"data",
data={
"pid": parent_id,
"name": name
}
)
new_path = os.path.join(parent_item.path, name)
new_path = Path(parent_item.path) / name
# 缓存新目录
self._cid_cache[new_path] = resp["cid"]
self._cid_cache[resp["cid"]] = new_path
self._id_cache[str(new_path)] = resp["file_id"]
return schemas.FileItem(
path=new_path,
fileid=resp["file_id"],
path=str(new_path),
name=name,
type="dir",
modify_time=int(time.time())
@@ -149,7 +242,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
def upload(self, target_dir: schemas.FileItem, local_path: Path, new_name: str = None) -> schemas.FileItem:
"""
断点续传实现
FIXME 断点续传实现
"""
file_name = new_name or local_path.name
file_size = local_path.stat().st_size
@@ -157,12 +250,13 @@ class U115Pan(StorageBase, metaclass=Singleton):
# 初始化上传任务
upload_info = self._request_api(
"POST", "/open/upload/init",
json={
"POST",
"/open/upload/init",
data={
"file_name": file_name,
"file_size": file_size,
"file_sha1": file_hash,
"target_dir": self._path_to_cid(target_dir.path)
"target_dir": self._path_to_id(target_dir.path)
}
)
@@ -184,12 +278,17 @@ class U115Pan(StorageBase, metaclass=Singleton):
"""
带限速处理的下载
"""
download_url = self._request_api(
"GET", "/file/download",
params={"cid": self._path_to_cid(fileitem.path)}
)["url"]
local_path = save_path or Path("/tmp") / fileitem.name
detail = self.get_item(Path(fileitem.path))
local_path = save_path or settings.TEMP_PATH / fileitem.name
download_info = self._request_api(
"POST",
"/open/ufile/downurl",
"data",
data={
"pick_code": detail.pickcode
}
)
download_url = download_info["url"]
with self.session.get(download_url, stream=True) as r:
r.raise_for_status()
with open(local_path, "wb") as f:
@@ -197,103 +296,97 @@ class U115Pan(StorageBase, metaclass=Singleton):
f.write(chunk)
return local_path
def _request_api(self, method: str, endpoint: str, **kwargs):
def _request_api(self, method: str, endpoint: str,
result_key: str = None, **kwargs) -> Optional[Union[dict, list]]:
"""
带错误处理和速率限制的API请求
"""
if not self.access_token:
raise Exception("未授权请先完成OAuth认证")
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"Bearer {self.access_token}"
resp = self.session.request(
method, f"{self.base_url}{endpoint}",
headers=headers, **kwargs
**kwargs
)
if resp is None:
logger.error(f"请求 115 API 失败: {method} {endpoint}")
return None
return {}
# 处理速率限制
if resp.status_code == 429:
reset_time = int(resp.headers.get("X-RateLimit-Reset", 60))
time.sleep(reset_time + 5)
return self._request_api(method, endpoint, **kwargs)
return self._request_api(method, endpoint, result_key, **kwargs)
resp.raise_for_status()
if result_key:
result = resp.json().get(result_key)
if not result:
raise FileNotFoundError(f"请求 115 API 失败: {method} {endpoint}")
return result
return resp.json()
def _path_to_cid(self, path: str) -> str:
def _path_to_id(self, path: str) -> int:
"""
路径转CID带缓存机制
路径转FID带缓存机制
"""
if path in self._cid_cache:
return self._cid_cache[path]
# 递归解析路径
current_cid = "0" # 根目录CID
for part in Path(path).parts[1:]: # 忽略根目录
# 命中缓存
if path in self._id_cache:
return self._id_cache[path]
# 逐级查找缓存
current_id = 0
parent_path = "/"
for p in Path(path).parents:
if str(p) in self._id_cache:
parent_path = str(p)
current_id = self._id_cache[parent_path]
break
# 计算相对路径
rel_path = Path(path).relative_to(parent_path)
for part in Path(rel_path).parts:
resp = self._request_api(
"GET", "/files",
params={"cid": current_cid, "search_value": part}
"GET",
"/open/ufile/files",
"data",
params={
"cid": current_id
}
)
for item in resp["data"]:
for item in resp:
if item["name"] == part:
current_cid = item["cid"]
current_id = item["fid"]
break
else:
raise FileNotFoundError(f"路径不存在: {path}")
self._cid_cache[path] = current_cid
return current_cid
self._id_cache[path] = current_id
return current_id
def _cid_to_path(self, cid: str) -> str:
def _id_to_path(self, fid: int) -> str:
"""
CID转路径带双向缓存
"""
# 根目录特殊处理
if cid == "0":
if fid == 0:
return "/"
# 优先从缓存读取
if cid in self._cid_cache.values():
return next(k for k, v in self._cid_cache.items() if v == cid)
# 递归构建路径
path_parts = []
current_cid = cid
while current_cid != "0":
# 从API获取当前节点信息
detail = self._request_api(
"GET", "/file/detail",
params={"cid": current_cid}
)
# 处理可能的空数据(如已删除文件)
if not detail:
raise FileNotFoundError(f"CID {current_cid} 不存在")
parent_cid = detail["parent_id"]
path_parts.append(detail["name"])
# 检查父节点缓存
if parent_cid in self._cid_cache.values():
parent_path = next(k for k, v in self._cid_cache.items() if v == parent_cid)
path_parts.reverse()
full_path = os.path.join(parent_path, *path_parts)
# 更新正向缓存
self._cid_cache[full_path] = cid
return str(full_path)
current_cid = parent_cid
if fid in self._id_cache.values():
return next(k for k, v in self._id_cache.items() if v == fid)
# 从API获取当前节点信息
detail = self._request_api(
"GET",
"/open/folder/get_info",
"data",
params={
"file_id": fid
}
)
# 处理可能的空数据(如已删除文件)
if not detail:
raise FileNotFoundError(f"{fid} 不存在")
paths = detail["paths"]
path_parts = [item["file_name"] for item in paths]
# 构建完整路径
full_path = "/" + "/".join(reversed(path_parts))
# 缓存新路径
self._cid_cache[full_path] = cid
self._id_cache[full_path] = fid
return full_path
@staticmethod
@@ -311,49 +404,68 @@ class U115Pan(StorageBase, metaclass=Singleton):
return self.access_token is not None
def delete(self, fileitem: schemas.FileItem) -> bool:
"""
删除文件/目录
"""
try:
self._request_api(
"POST", "/file/delete",
json={"cid": self._path_to_cid(fileitem.path)}
"POST",
"/open/ufile/delete",
data={
"file_ids": self._path_to_id(fileitem.path)
}
)
return True
except requests.exceptions.HTTPError:
return False
def rename(self, fileitem: schemas.FileItem, name: str) -> bool:
new_path = Path(fileitem.path).parent / name
"""
重命名文件/目录
"""
file_id = self._path_to_id(fileitem.path)
resp = self._request_api(
"POST", "/file/rename",
json={
"cid": self._path_to_cid(fileitem.path),
"new_name": name
"POST",
"/open/ufile/update",
data={
"file_id": file_id,
"file_name": name
}
)
if resp["state"]:
self._cid_cache[str(new_path)] = resp["cid"]
old_path = fileitem.path
if fileitem.path in self._id_cache:
del self._id_cache[fileitem.path]
new_path = Path(fileitem.path).parent / name
# 删除旧路径
del self._cid_cache[old_path]
self._cid_cache[new_path.as_posix()] = resp["cid"]
# 更新反向缓存
self._cid_cache[resp["cid"]] = new_path.as_posix()
self._id_cache[str(new_path)] = file_id
return True
return False
def get_item(self, path: Path) -> Optional[schemas.FileItem]:
"""
获取指定路径的文件/目录项
"""
try:
cid = self._path_to_cid(str(path))
file_id = self._path_to_id(str(path))
if not file_id:
return None
resp = self._request_api(
"GET", "/file/detail",
params={"cid": cid}
"GET",
"/open/folder/get_info",
"data",
params={
"file_id": file_id
}
)
return schemas.FileItem(
path=str(path),
name=resp["name"],
type="dir" if resp["is_dir"] else "file",
size=resp["size"],
modify_time=resp["modified"]
fileid=resp["file_id"],
type="file" if resp["file_category"] == "1" else "dir",
name=resp["file_name"],
basename=Path(resp["file_name"]).stem,
extension=Path(resp["file_name"]).suffix[1:],
pickcode=resp["pick_code"],
size=resp["size"] if resp["file_category"] == "1" else None,
modify_time=resp["utime"]
)
except Exception as e:
logger.debug(f"获取文件信息失败: {str(e)}")
@@ -361,54 +473,43 @@ class U115Pan(StorageBase, metaclass=Singleton):
def get_folder(self, path: Path) -> Optional[schemas.FileItem]:
"""
获取指定路径的文件夹元数据
获取指定路径的文件夹,如不存在则创建
"""
item = self.get_item(path)
if item and item.type == "dir":
return item
return None
try:
return self.get_item(path)
except FileNotFoundError:
return self.create_folder(self.get_item(path.parent), path.name)
def detail(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]:
"""
获取文件/目录详细信息
"""
try:
cid = self._path_to_cid(fileitem.path)
resp = self._request_api("GET", "/file/detail", params={"cid": cid})
return schemas.FileItem(
path=fileitem.path,
name=resp["name"],
type="dir" if resp["is_dir"] else "file",
size=resp["size"],
modify_time=resp["modified"],
pickcode=resp.get("pick_code")
)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
return None
raise
return self.get_item(Path(fileitem.path))
def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
企业级复制实现(支持目录递归复制)
"""
src_cid = self._path_to_cid(fileitem.path)
dest_cid = self._path_to_cid(str(path))
src_fid = self._path_to_id(fileitem.path)
dest_cid = self._path_to_id(str(path))
resp = self._request_api(
"POST", "/file/copy",
json={
"cid": src_cid,
"pid": dest_cid,
"name": new_name,
"overwrite": 0 # 0:不覆盖 1:覆盖
"POST",
"/open/ufile/copy",
data={
"file_id": src_fid,
"pid": dest_cid
}
)
if resp["state"]:
# 更新目标路径缓存
new_path = str(Path(path) / new_name)
self._cid_cache[new_path] = resp["cid"]
new_path = Path(path) / fileitem.name
new_file = self.get_item(new_path)
self.rename(new_file, new_name)
# 更新缓存
del self._id_cache[fileitem.path]
rename_new_path = Path(path) / new_name
self._id_cache[str(rename_new_path)] = int(new_file.fileid)
return True
return False
@@ -416,25 +517,26 @@ class U115Pan(StorageBase, metaclass=Singleton):
"""
原子性移动操作实现
"""
src_cid = self._path_to_cid(fileitem.path)
dest_cid = self._path_to_cid(str(path))
src_fid = self._path_to_id(fileitem.path)
dest_cid = self._path_to_id(str(path))
resp = self._request_api(
"POST", "/file/move",
json={
"cid": src_cid,
"pid": dest_cid,
"name": new_name,
"overwrite": 0
"POST",
"/open/ufile/move",
data={
"file_ids": src_fid,
"to_cid": dest_cid
}
)
if resp["state"]:
new_path = Path(path) / fileitem.name
new_file = self.get_item(new_path)
self.rename(new_file, new_name)
# 更新缓存
old_path = fileitem.path
new_path = str(Path(path) / new_name)
del self._cid_cache[old_path]
self._cid_cache[new_path] = src_cid
del self._id_cache[fileitem.path]
rename_new_path = Path(path) / new_name
self._id_cache[str(rename_new_path)] = src_fid
return True
return False
@@ -445,13 +547,19 @@ class U115Pan(StorageBase, metaclass=Singleton):
pass
def usage(self) -> Optional[schemas.StorageUsage]:
"""获取带有企业级配额信息的存储使用情况"""
"""
获取带有企业级配额信息的存储使用情况
"""
try:
resp = self._request_api("GET", "/user/info")
space = resp["data"]["space_info"]
resp = self._request_api(
"GET",
"/open/user/info",
"data"
)
space = resp["rt_space_info"]
return schemas.StorageUsage(
total=space["total"],
available=space["free"]
total=space["all_total"]["size"],
available=space["all_remain"]["size"]
)
except KeyError:
return None

View File

@@ -67,5 +67,4 @@ rsa~=4.9
redis~=5.2.1
async_timeout~=5.0.1; python_full_version < "3.11.3"
packaging~=24.2
cf_clearance~=0.31.0
qrcode~=8.0
cf_clearance~=0.31.0