fix alist

This commit is contained in:
jxxghp
2024-11-14 14:54:22 +08:00
parent dfbd9f3b30
commit 5da28f702f

View File

@@ -1,4 +1,6 @@
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Optional, Tuple, List, Dict, Union
@@ -6,6 +8,7 @@ from requests import Response
from cachetools import cached, TTLCache
from app import schemas
from app.core.config import settings
from app.log import logger
from app.modules.filemanager.storages import StorageBase
from app.schemas.types import StorageSchema
@@ -30,7 +33,6 @@ class Alist(StorageBase):
def __init__(self):
super().__init__()
self.req = RequestUtils(headers=self.__get_header_with_token())
@property
def __get_base_url(self) -> str:
@@ -68,12 +70,14 @@ class Alist(StorageBase):
缓存2天提前5分钟更新
"""
conf = self.get_conf()
resp: Response = self.req.post_res(
resp: Response = RequestUtils(headers={
'Content-Type': 'application/json'
}).post_res(
self.__get_api_url("/api/auth/login"),
json={
data=json.dumps({
"username": conf.get("username"),
"password": conf.get("password"),
},
}),
)
"""
{
@@ -90,15 +94,21 @@ class Alist(StorageBase):
}
"""
if resp is None:
logger.warning("请求登录失败无法连接alist服务")
return ""
if resp.status_code != 200:
logger.warning(f"更新令牌请求发送失败,状态码:{resp.status_code}")
return ""
result = resp.json()
if result["code"] != 200:
logger.critical(f'更新令牌,错误信息:{result["message"]}')
return ""
logger.debug("更新令牌成功")
logger.debug("AList获取令牌成功")
return result["data"]["token"]
def __get_header_with_token(self) -> dict:
@@ -114,12 +124,12 @@ class Alist(StorageBase):
pass
def list(
self,
fileitem: schemas.FileItem,
password: str = "",
page: int = 1,
per_page: int = 0,
refresh: bool = False,
self,
fileitem: schemas.FileItem,
password: str = "",
page: int = 1,
per_page: int = 0,
refresh: bool = False,
) -> Optional[List[schemas.FileItem]]:
"""
浏览文件
@@ -129,7 +139,14 @@ class Alist(StorageBase):
:param per_page: 每页数量
:param refresh: 是否刷新
"""
resp: Response = self.req.post_res(
if fileitem.type == "file":
item = self.get_item(Path(fileitem.path))
if item:
return [item]
return None
resp: Response = RequestUtils(
headers=self.__get_header_with_token()
).post_res(
self.__get_api_url("/api/fs/list"),
json={
"path": fileitem.path,
@@ -175,6 +192,9 @@ class Alist(StorageBase):
}
"""
if resp is None:
logging.warning(f"请求获取目录 {fileitem.path} 的文件列表失败无法连接alist服务")
return
if resp.status_code != 200:
logging.warning(
f"请求获取目录 {fileitem.path} 的文件列表失败,状态码:{resp.status_code}"
@@ -193,27 +213,29 @@ class Alist(StorageBase):
schemas.FileItem(
storage=self.schema.value,
type="dir" if item["is_dir"] else "file",
path=fileitem.path + "/" + item["name"],
path=(Path(fileitem.path) / item["name"]).as_posix() + ("/" if item["is_dir"] else ""),
name=item["name"],
basename=Path(item["name"]).stem,
extension=Path(item["name"]).suffix,
size=item["size"],
modify_time=item["modified"],
modify_time=self.__parse_timestamp(item["modified"]),
thumbnail=item["thumb"],
)
for item in result["data"]["content"]
for item in result["data"]["content"] or []
]
def create_folder(
self, fileitem: schemas.FileItem, name: str
self, fileitem: schemas.FileItem, name: str
) -> Optional[schemas.FileItem]:
"""
创建目录
"""
path = Path(fileitem.path) / name
resp: Response = self.req.post_res(
resp: Response = RequestUtils(
headers=self.__get_header_with_token()
).post_res(
self.__get_api_url("/api/fs/mkdir"),
json={"path": path},
json={"path": path.as_posix()},
)
"""
{
@@ -249,19 +271,19 @@ class Alist(StorageBase):
folder = self.create_folder(self.get_parent(schemas.FileItem(
storage=self.schema.value,
type="dir",
path=path.as_posix(),
path=path.as_posix() + "/",
name=path.name,
basename=path.stem
)), path.name)
return folder
def get_item(
self,
path: Path,
password: str = "",
page: int = 1,
per_page: int = 0,
refresh: bool = False,
self,
path: Path,
password: str = "",
page: int = 1,
per_page: int = 0,
refresh: bool = False,
) -> Optional[schemas.FileItem]:
"""
获取文件或目录不存在返回None
@@ -271,7 +293,9 @@ class Alist(StorageBase):
:param per_page: 每页数量
:param refresh: 是否刷新
"""
resp: Response = self.req.post_res(
resp: Response = RequestUtils(
headers=self.__get_header_with_token()
).post_res(
self.__get_api_url("/api/fs/get"),
json={
"path": path.as_posix(),
@@ -327,12 +351,12 @@ class Alist(StorageBase):
return schemas.FileItem(
storage=self.schema.value,
type="dir" if result["data"]["is_dir"] else "file",
path=path,
path=path.as_posix() + ("/" if result["data"]["is_dir"] else ""),
name=result["data"]["name"],
basename=Path(result["data"]["name"]).stem,
extension=Path(result["data"]["name"]).suffix,
size=result["data"]["size"],
modify_time=result["data"]["modified"],
modify_time=self.__parse_timestamp(result["data"]["modified"]),
thumbnail=result["data"]["thumb"],
)
@@ -346,7 +370,9 @@ class Alist(StorageBase):
"""
删除文件
"""
resp: Response = self.req.post_res(
resp: Response = RequestUtils(
headers=self.__get_header_with_token()
).post_res(
self.__get_api_url("/api/fs/delete"),
json={
"dir": Path(fileitem.path).parent.as_posix(),
@@ -388,7 +414,9 @@ class Alist(StorageBase):
"""
重命名文件
"""
resp: Response = self.req.post_res(
resp: Response = RequestUtils(
headers=self.__get_header_with_token()
).post_res(
self.__get_api_url("/api/fs/rename"),
json={
"name": name,
@@ -426,20 +454,20 @@ class Alist(StorageBase):
return True
def download(
self,
fileitem: schemas.FileItem,
path: Path = None,
password: str = "",
raw_url: bool = False,
self,
fileitem: schemas.FileItem,
path: Path = None,
password: str = "",
) -> Optional[Path]:
"""
下载文件,保存到本地,返回本地临时文件地址
:param fileitem: 文件项
:param path: 文件保存路径
:param password: 文件密码
:param raw_url: 是否使用原始链接下载
"""
resp: Response = self.req.post_res(
resp: Response = RequestUtils(
headers=self.__get_header_with_token()
).post_res(
self.__get_api_url("/api/fs/get"),
json={
"path": fileitem.path,
@@ -484,14 +512,20 @@ class Alist(StorageBase):
logging.warning(f'获取文件 {path} 失败,错误信息:{result["message"]}')
return
if raw_url:
if result["data"]["raw_url"]:
download_url = result["data"]["raw_url"]
else:
download_url = UrlUtils.adapt_request_url(self.__get_base_url, f"/d{fileitem.path}")
if result["data"]["sign"]:
download_url = download_url + "?sign=" + result["data"]["sign"]
resp = self.req.get_res(download_url)
resp = RequestUtils(
headers=self.__get_header_with_token()
).get_res(download_url)
if not path:
path = settings.TEMP_PATH / fileitem.name
with open(path, "wb") as f:
f.write(resp.content)
@@ -500,7 +534,7 @@ class Alist(StorageBase):
return None
def upload(
self, fileitem: schemas.FileItem, path: Path, task: bool = False
self, fileitem: schemas.FileItem, path: Path, task: bool = False
) -> Optional[schemas.FileItem]:
"""
上传文件
@@ -555,7 +589,7 @@ class Alist(StorageBase):
return src_dir, traget_dir, [name], True
def copy(
self, fileitem: schemas.FileItem, target: Union[schemas.FileItem, Path]
self, fileitem: schemas.FileItem, target: Union[schemas.FileItem, Path]
) -> bool:
"""
复制文件
@@ -568,7 +602,9 @@ class Alist(StorageBase):
if not is_valid:
return False
resp: Response = self.req.post_res(
resp: Response = RequestUtils(
headers=self.__get_header_with_token()
).post_res(
self.__get_api_url("/api/fs/copy"),
json={
"src_dir": src_dir,
@@ -611,7 +647,7 @@ class Alist(StorageBase):
return True
def move(
self, fileitem: schemas.FileItem, target: Union[schemas.FileItem, Path]
self, fileitem: schemas.FileItem, target: Union[schemas.FileItem, Path]
) -> bool:
"""
移动文件
@@ -622,7 +658,9 @@ class Alist(StorageBase):
if not is_valid:
return False
resp: Response = self.req.post_res(
resp: Response = RequestUtils(
headers=self.__get_header_with_token()
).post_res(
self.__get_api_url("/api/fs/move"),
json={
"src_dir": src_dir,
@@ -705,3 +743,15 @@ class Alist(StorageBase):
__snapshot_file(fileitem)
return files_info
@staticmethod
def __parse_timestamp(time_str: str) -> float:
try:
# 尝试解析带微秒的时间格式
dt = datetime.strptime(time_str[:26], '%Y-%m-%dT%H:%M:%S.%f')
except ValueError:
# 如果失败,尝试解析不带微秒的时间格式
dt = datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%SZ')
# 返回时间戳
return dt.timestamp()