add 115 open api

This commit is contained in:
jxxghp
2025-03-21 13:27:31 +08:00
parent 0b59c95f63
commit ea61599589
2 changed files with 385 additions and 341 deletions

View File

@@ -1,14 +1,16 @@
import hashlib
import os
import time
from pathlib import Path
from typing import Optional, Tuple, List
from typing import List, Dict, Optional, Tuple
from p115 import P115Client, P115Path
import qrcode
import requests
from app import schemas
from app.core.config import settings
from app.log import logger
from app.modules.filemanager.storages import StorageBase
from app.modules.filemanager import StorageBase
from app.schemas.types import StorageSchema
from app.utils.http import RequestUtils
from app.utils.singleton import Singleton
@@ -26,374 +28,414 @@ class U115Pan(StorageBase, metaclass=Singleton):
"copy": "复制"
}
# 115二维码登录地址
qrcode_url = "https://qrcodeapi.115.com/api/1.0/web/1.0/token/"
# 115登录状态检查
login_check_url = "https://qrcodeapi.115.com/get/status/"
# 115登录完成 alipaymini
login_done_api = f"https://passportapi.115.com/app/1.0/alipaymini/1.0/login/qrcode/"
# 访问token
access_token = None
client: P115Client = None
session_info: dict = None
# 基础url
base_url = "https://api.115.com"
# CID和路径缓存
_cid_cache: Dict[str | int, str | int] = {}
def __init__(self):
super().__init__()
self.init_storage()
self.session = requests.Session()
self._init_session()
def _init_session(self):
"""
初始化带速率限制的会话
"""
adapter = requests.adapters.HTTPAdapter(
max_retries=3,
pool_connections=10,
pool_maxsize=50
)
self.session.mount('https://', adapter)
self.session.headers.update({
"User-Agent": "W115Storage/2.0",
"Accept-Encoding": "gzip, deflate"
})
def generate_qrcode(self) -> Tuple[dict, str]:
"""
生成设备授权二维码
"""
resp = self.session.post(
f"{self.base_url}/oauth/device",
data={"client_id": self.get_conf().get("app_id")}
).json()
qr = qrcode.make(f"115AUTH|{resp['device_code']}")
return resp, qr.png_as_base64_str()
def check_login(self, device_code: str) -> Optional[Dict]:
"""
检查授权状态
"""
try:
resp = self.session.post(f"{self.base_url}/oauth/token", data={
"grant_type": "device",
"device_code": device_code,
"client_secret": self.get_conf().get("app_secret")
}, timeout=10)
if resp.status_code == 200:
token_data = resp.json()
self.access_token = token_data["access_token"]
# 持久化配置
self.set_config({"access_token": self.access_token})
return {"status": "success"}
return {"status": "pending"}
except requests.exceptions.RequestException:
return {"status": "error"}
def init_storage(self):
"""
初始化Cloud
初始化存储连接
"""
if not self.__credential:
return
try:
self.client = P115Client(self.__credential, app="alipaymini",
check_for_relogin=False, console_qrcode=False)
except Exception as err:
logger.error(f"115连接失败请重新登录{str(err)}")
self.__clear_credential()
if conf := self.get_conf():
self.access_token = conf.get("access_token")
self.session.headers.update({"Authorization": f"Bearer {self.access_token}"})
@property
def __credential(self) -> Optional[str]:
def list(self, fileitem: schemas.FileItem) -> List[schemas.FileItem]:
"""
获取已保存的115 Cookie
目录遍历实现
"""
conf = self.get_config()
if not conf:
return None
if not conf.config:
return None
return conf.config.get("cookie")
cid = self._path_to_cid(fileitem.path)
items = []
offset = 0
def __save_credential(self, credential: dict):
"""
设置115认证参数
"""
self.set_config(credential)
while True:
resp = self._request_api(
"GET", "/files",
params={"cid": cid, "limit": 1000, "offset": offset}
)
batch = resp["data"]
for item in batch:
path = self._cid_to_path(item["cid"])
items.append(schemas.FileItem(
path=path,
name=item["name"],
type="dir" if item["is_dir"] else "file",
size=item["size"],
modify_time=item["modified"]
))
self._cid_cache[path] = item["cid"] # 更新缓存
def __clear_credential(self):
"""
清除115认证参数
"""
self.set_config({})
if len(batch) < 1000:
break
offset += len(batch)
def generate_qrcode(self) -> Optional[Tuple[dict, str]]:
"""
生成二维码
"""
res = RequestUtils(timeout=10).get_res(self.qrcode_url)
if res:
self.session_info = res.json().get("data")
qrcode_content = self.session_info.pop("qrcode")
if not qrcode_content:
logger.warn("115生成二维码失败未获取到二维码数据")
return {}, ""
return {
"codeContent": qrcode_content
}, ""
elif res is not None:
return {}, f"115生成二维码失败{res.status_code} - {res.reason}"
return {}, f"115生成二维码失败无法连接"
return items
def check_login(self) -> Optional[Tuple[dict, str]]:
"""
二维码登录确认
"""
if not self.session_info:
return {}, "请先生成二维码!"
try:
resp = RequestUtils(timeout=10).get_res(self.login_check_url, params=self.session_info)
if not resp:
return {}, "115登录确认失败无法连接"
result = resp.json()
match result["data"].get("status"):
case 0:
result = {
"status": 0,
"tip": "请使用微信或115客户端扫码"
}
case 1:
result = {
"status": 1,
"tip": "已扫码"
}
case 2:
# 确认完成,保存认证信息
resp = RequestUtils(timeout=10).post_res(self.login_done_api,
data={"account": self.session_info.get("uid")})
if not resp:
return {}, "115登录确认失败无法连接"
if resp:
# 保存认证信息
result = resp.json()
cookie_dict = result["data"]["cookie"]
cookie_str = "; ".join([f"{k}={v}" for k, v in cookie_dict.items()])
cookie_dict.update({"cookie": cookie_str})
self.__save_credential(cookie_dict)
self.init_storage()
result = {
"status": 2,
"tip": "登录成功!"
}
case -1:
result = {
"status": -1,
"tip": "二维码已过期,请重新刷新!"
}
case -2:
result = {
"status": -2,
"tip": "登录失败,请重试!"
}
case _:
result = {
"status": -3,
"tip": "未知错误,请重试!"
}
return result, ""
except Exception as e:
return {}, f"115登录确认失败{str(e)}"
def storage(self) -> Optional[Tuple[int, int]]:
"""
获取存储空间
"""
if not self.client:
return None
try:
usage = self.client.fs.space_summury()
if usage:
return usage['rt_space_info']['all_total']['size'], usage['rt_space_info']['all_remain']['size']
except Exception as e:
logger.error(f"115获取存储空间失败{str(e)}")
return None
def check(self) -> bool:
"""
检查存储是否可用
"""
return True if self.list(schemas.FileItem()) else False
def list(self, fileitem: schemas.FileItem) -> Optional[List[schemas.FileItem]]:
"""
浏览文件
"""
if not self.client:
return []
try:
if fileitem.type == "file":
return [fileitem]
items: List[P115Path] = self.client.fs.list(fileitem.path)
return [schemas.FileItem(
storage=self.schema.value,
type="dir" if item.is_dir() else "file",
path=item.path + ("/" if item.is_dir() else ""),
name=item.name,
basename=item.stem,
size=item.stat().st_size,
extension=item.suffix[1:] if not item.is_dir() else None,
modify_time=item.stat().st_mtime
) for item in items if item]
except Exception as e:
logger.error(f"115浏览文件失败{str(e)}")
return []
def create_folder(self, fileitem: schemas.FileItem, name: str) -> Optional[schemas.FileItem]:
def create_folder(self, parent_item: schemas.FileItem, name: str) -> schemas.FileItem:
"""
创建目录
"""
if not self.client:
return None
try:
result = self.client.fs.makedirs(Path(fileitem.path) / name, exist_ok=True)
if result:
return schemas.FileItem(
storage=self.schema.value,
type="dir",
path=f"{result.path}/",
name=name,
basename=Path(result.name).stem,
modify_time=result.mtime
parent_cid = self._path_to_cid(parent_item.path)
resp = self._request_api(
"POST", "/file/mkdir",
json={"cid": parent_cid, "name": name}
)
new_path = os.path.join(parent_item.path, name)
# 缓存新目录
self._cid_cache[new_path] = resp["cid"]
self._cid_cache[resp["cid"]] = new_path
return schemas.FileItem(
path=new_path,
name=name,
type="dir",
modify_time=int(time.time())
)
def upload(self, target_dir: schemas.FileItem, local_path: Path, new_name: str = None) -> schemas.FileItem:
"""
断点续传实现
"""
file_name = new_name or local_path.name
file_size = local_path.stat().st_size
file_hash = self._calc_sha1(local_path)
# 初始化上传任务
upload_info = self._request_api(
"POST", "/open/upload/init",
json={
"file_name": file_name,
"file_size": file_size,
"file_sha1": file_hash,
"target_dir": self._path_to_cid(target_dir.path)
}
)
# 分片上传
with open(local_path, "rb") as f:
offset = 0
# 4MB分片
while chunk := f.read(4 * 1024 * 1024):
self.session.put(
f"{self.base_url}/open/upload/{upload_info['upload_id']}",
data=chunk,
headers={"Content-Range": f"bytes {offset}-{offset + len(chunk) - 1}/{file_size}"}
)
offset += len(chunk)
return self.get_item(Path(target_dir.path) / file_name)
def download(self, fileitem: schemas.FileItem, save_path: Path = None) -> Path:
"""
带限速处理的下载
"""
download_url = self._request_api(
"GET", "/file/download",
params={"cid": self._path_to_cid(fileitem.path)}
)["url"]
local_path = save_path or Path("/tmp") / fileitem.name
with self.session.get(download_url, stream=True) as r:
r.raise_for_status()
with open(local_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
return local_path
def _request_api(self, method: str, endpoint: str, **kwargs):
"""
带错误处理和速率限制的API请求
"""
if not self.access_token:
raise Exception("未授权请先完成OAuth认证")
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"Bearer {self.access_token}"
resp = self.session.request(
method, f"{self.base_url}{endpoint}",
headers=headers, **kwargs
)
if resp is None:
logger.error(f"请求 115 API 失败: {method} {endpoint}")
return None
# 处理速率限制
if resp.status_code == 429:
reset_time = int(resp.headers.get("X-RateLimit-Reset", 60))
time.sleep(reset_time + 5)
return self._request_api(method, endpoint, **kwargs)
resp.raise_for_status()
return resp.json()
def _path_to_cid(self, path: str) -> str:
"""
路径转CID带缓存机制
"""
if path in self._cid_cache:
return self._cid_cache[path]
# 递归解析路径
current_cid = "0" # 根目录CID
for part in Path(path).parts[1:]: # 忽略根目录
resp = self._request_api(
"GET", "/files",
params={"cid": current_cid, "search_value": part}
)
for item in resp["data"]:
if item["name"] == part:
current_cid = item["cid"]
break
else:
raise FileNotFoundError(f"路径不存在: {path}")
self._cid_cache[path] = current_cid
return current_cid
def _cid_to_path(self, cid: str) -> str:
"""
CID转路径带双向缓存
"""
# 根目录特殊处理
if cid == "0":
return "/"
# 优先从缓存读取
if cid in self._cid_cache.values():
return next(k for k, v in self._cid_cache.items() if v == cid)
# 递归构建路径
path_parts = []
current_cid = cid
while current_cid != "0":
# 从API获取当前节点信息
detail = self._request_api(
"GET", "/file/detail",
params={"cid": current_cid}
)
# 处理可能的空数据(如已删除文件)
if not detail:
raise FileNotFoundError(f"CID {current_cid} 不存在")
parent_cid = detail["parent_id"]
path_parts.append(detail["name"])
# 检查父节点缓存
if parent_cid in self._cid_cache.values():
parent_path = next(k for k, v in self._cid_cache.items() if v == parent_cid)
path_parts.reverse()
full_path = os.path.join(parent_path, *path_parts)
# 更新正向缓存
self._cid_cache[full_path] = cid
return str(full_path)
current_cid = parent_cid
# 构建完整路径
full_path = "/" + "/".join(reversed(path_parts))
# 缓存新路径
self._cid_cache[full_path] = cid
return full_path
@staticmethod
def _calc_sha1(filepath: Path) -> str:
"""
计算文件SHA1符合115规范
"""
sha1 = hashlib.sha1()
with open(filepath, 'rb') as f:
while chunk := f.read(8192):
sha1.update(chunk)
return sha1.hexdigest()
def check(self) -> bool:
return self.access_token is not None
def delete(self, fileitem: schemas.FileItem) -> bool:
try:
self._request_api(
"POST", "/file/delete",
json={"cid": self._path_to_cid(fileitem.path)}
)
return True
except requests.exceptions.HTTPError:
return False
def rename(self, fileitem: schemas.FileItem, name: str) -> bool:
new_path = Path(fileitem.path).parent / name
resp = self._request_api(
"POST", "/file/rename",
json={
"cid": self._path_to_cid(fileitem.path),
"new_name": name
}
)
if resp["state"]:
self._cid_cache[str(new_path)] = resp["cid"]
old_path = fileitem.path
new_path = Path(fileitem.path).parent / name
# 删除旧路径
del self._cid_cache[old_path]
self._cid_cache[new_path.as_posix()] = resp["cid"]
# 更新反向缓存
self._cid_cache[resp["cid"]] = new_path.as_posix()
return True
return False
def get_item(self, path: Path) -> Optional[schemas.FileItem]:
try:
cid = self._path_to_cid(str(path))
resp = self._request_api(
"GET", "/file/detail",
params={"cid": cid}
)
return schemas.FileItem(
path=str(path),
name=resp["name"],
type="dir" if resp["is_dir"] else "file",
size=resp["size"],
modify_time=resp["modified"]
)
except Exception as e:
logger.error(f"115创建目录失败{str(e)}")
return None
logger.debug(f"获取文件信息失败: {str(e)}")
return None
def get_folder(self, path: Path) -> Optional[schemas.FileItem]:
"""
根据文件路程获取目录,不存在则创建
获取指定路径的文件夹元数据
"""
if not self.client:
return None
folder = self.get_item(path)
if folder:
return folder
try:
result = self.client.fs.makedirs(path, exist_ok=True)
if result:
return schemas.FileItem(
storage=self.schema.value,
type="dir",
path=result.path + "/",
name=result.name,
basename=Path(result.name).stem,
modify_time=result.mtime
)
except Exception as e:
logger.error(f"115获取目录失败{str(e)}")
return None
def get_item(self, path: Path) -> Optional[schemas.FileItem]:
"""
获取文件或目录不存在返回None
"""
if not self.client:
return None
try:
try:
item = self.client.fs.attr(path)
except FileNotFoundError:
return None
if item:
return schemas.FileItem(
storage=self.schema.value,
type="dir" if item.is_directory else "file",
path=item.path + ("/" if item.is_directory else ""),
name=item.name,
size=item.size,
extension=Path(item.name).suffix[1:] if not item.is_directory else None,
modify_time=item.mtime,
thumbnail=item.get("thumb")
)
except Exception as e:
logger.info(f"115获取文件失败{str(e)}")
item = self.get_item(path)
if item and item.type == "dir":
return item
return None
def detail(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]:
"""
获取文件详情
获取文件/目录详细信息
"""
if not self.client:
return None
try:
try:
item = self.client.fs.attr(fileitem.path)
except FileNotFoundError:
cid = self._path_to_cid(fileitem.path)
resp = self._request_api("GET", "/file/detail", params={"cid": cid})
return schemas.FileItem(
path=fileitem.path,
name=resp["name"],
type="dir" if resp["is_dir"] else "file",
size=resp["size"],
modify_time=resp["modified"],
pickcode=resp.get("pick_code")
)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
return None
if item:
return schemas.FileItem(
storage=self.schema.value,
type="dir" if item.is_directory else "file",
path=item.path + ("/" if item.is_directory else ""),
name=item.name,
size=item.size,
extension=Path(item.name).suffix[1:] if not item.is_directory else None,
modify_time=item.mtime,
thumbnail=item.get("thumb")
)
except Exception as e:
logger.error(f"115获取文件详情失败{str(e)}")
return None
def delete(self, fileitem: schemas.FileItem) -> bool:
"""
删除文件
"""
if not self.client:
return False
try:
self.client.fs.remove(fileitem.path)
return True
except Exception as e:
logger.error(f"115删除文件失败{str(e)}")
return False
def rename(self, fileitem: schemas.FileItem, name: str) -> bool:
"""
重命名文件
"""
if not self.client:
return False
try:
self.client.fs.rename(fileitem.path, Path(fileitem.path).with_name(name))
return True
except Exception as e:
logger.error(f"115重命名文件失败{str(e)}")
return False
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
"""
获取下载链接
"""
if not self.client:
return None
local_file = (path or settings.TEMP_PATH) / fileitem.name
try:
task = self.client.fs.download(fileitem.path, file=local_file)
if task:
return local_file
except Exception as e:
logger.error(f"115下载文件失败{str(e)}")
return None
def upload(self, fileitem: schemas.FileItem, path: Path, new_name: str = None) -> Optional[schemas.FileItem]:
"""
上传文件
:param fileitem: 上传目录项
:param path: 本地文件路径
:param new_name: 上传后文件名
"""
if not self.client:
return None
try:
new_path = Path(fileitem.path) / (new_name or path.name)
with open(path, "rb") as f:
result = self.client.fs.upload(f, new_path)
if result:
return schemas.FileItem(
storage=self.schema.value,
type="file",
path=str(path),
name=result.name,
basename=Path(result.name).stem,
size=result.size,
extension=Path(result.name).suffix[1:],
modify_time=result.mtime
)
except Exception as e:
logger.error(f"115上传文件失败{str(e)}")
return None
raise
def copy(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
复制文件
:param fileitem: 文件项
:param path: 目标目录
:param new_name: 新文件名
企业级复制实现(支持目录递归复制)
"""
if not self.client:
return False
try:
self.client.fs.copy(fileitem.path, path / new_name)
src_cid = self._path_to_cid(fileitem.path)
dest_cid = self._path_to_cid(str(path))
resp = self._request_api(
"POST", "/file/copy",
json={
"cid": src_cid,
"pid": dest_cid,
"name": new_name,
"overwrite": 0 # 0:不覆盖 1:覆盖
}
)
if resp["state"]:
# 更新目标路径缓存
new_path = str(Path(path) / new_name)
self._cid_cache[new_path] = resp["cid"]
return True
except Exception as e:
logger.error(f"115复制文件失败{str(e)}")
return False
def move(self, fileitem: schemas.FileItem, path: Path, new_name: str) -> bool:
"""
移动文件
:param fileitem: 文件项
:param path: 目标目录
:param new_name: 新文件名
原子性移动操作实现
"""
if not self.client:
return False
try:
self.client.fs.move(fileitem.path, path / new_name)
src_cid = self._path_to_cid(fileitem.path)
dest_cid = self._path_to_cid(str(path))
resp = self._request_api(
"POST", "/file/move",
json={
"cid": src_cid,
"pid": dest_cid,
"name": new_name,
"overwrite": 0
}
)
if resp["state"]:
# 更新缓存
old_path = fileitem.path
new_path = str(Path(path) / new_name)
del self._cid_cache[old_path]
self._cid_cache[new_path] = src_cid
return True
except Exception as e:
logger.error(f"115移动文件失败{str(e)}")
return False
def link(self, fileitem: schemas.FileItem, target_file: Path) -> bool:
@@ -403,14 +445,13 @@ class U115Pan(StorageBase, metaclass=Singleton):
pass
def usage(self) -> Optional[schemas.StorageUsage]:
"""
存储使用情况
"""
info = self.storage()
if info:
total, free = info
"""获取带有企业级配额信息的存储使用情况"""
try:
resp = self._request_api("GET", "/user/info")
space = resp["data"]["space_info"]
return schemas.StorageUsage(
total=total,
available=free
total=space["total"],
available=space["free"]
)
return schemas.StorageUsage()
except KeyError:
return None

View File

@@ -65,4 +65,7 @@ aiofiles~=24.1.0
jieba~=0.42.1
rsa~=4.9
redis~=5.2.1
async_timeout~=5.0.1; python_full_version < "3.11.3"
async_timeout~=5.0.1; python_full_version < "3.11.3"
packaging~=24.2
cf_clearance~=0.31.0
qrcode~=8.0