diff --git a/app/modules/filemanager/storages/alist.py b/app/modules/filemanager/storages/alist.py index 41aeba82..47e5fe32 100644 --- a/app/modules/filemanager/storages/alist.py +++ b/app/modules/filemanager/storages/alist.py @@ -1,4 +1,6 @@ +import json import logging +from datetime import datetime from pathlib import Path from typing import Optional, Tuple, List, Dict, Union @@ -6,6 +8,7 @@ from requests import Response from cachetools import cached, TTLCache from app import schemas +from app.core.config import settings from app.log import logger from app.modules.filemanager.storages import StorageBase from app.schemas.types import StorageSchema @@ -30,7 +33,6 @@ class Alist(StorageBase): def __init__(self): super().__init__() - self.req = RequestUtils(headers=self.__get_header_with_token()) @property def __get_base_url(self) -> str: @@ -68,12 +70,14 @@ class Alist(StorageBase): 缓存2天,提前5分钟更新 """ conf = self.get_conf() - resp: Response = self.req.post_res( + resp: Response = RequestUtils(headers={ + 'Content-Type': 'application/json' + }).post_res( self.__get_api_url("/api/auth/login"), - json={ + data=json.dumps({ "username": conf.get("username"), "password": conf.get("password"), - }, + }), ) """ { @@ -90,15 +94,21 @@ class Alist(StorageBase): } """ + if resp is None: + logger.warning("请求登录失败,无法连接alist服务") + return "" + if resp.status_code != 200: logger.warning(f"更新令牌请求发送失败,状态码:{resp.status_code}") + return "" result = resp.json() if result["code"] != 200: logger.critical(f'更新令牌,错误信息:{result["message"]}') + return "" - logger.debug("更新令牌成功") + logger.debug("AList获取令牌成功") return result["data"]["token"] def __get_header_with_token(self) -> dict: @@ -114,12 +124,12 @@ class Alist(StorageBase): pass def list( - self, - fileitem: schemas.FileItem, - password: str = "", - page: int = 1, - per_page: int = 0, - refresh: bool = False, + self, + fileitem: schemas.FileItem, + password: str = "", + page: int = 1, + per_page: int = 0, + refresh: bool = False, ) -> Optional[List[schemas.FileItem]]: """ 浏览文件 @@ -129,7 +139,14 @@ class Alist(StorageBase): :param per_page: 每页数量 :param refresh: 是否刷新 """ - resp: Response = self.req.post_res( + if fileitem.type == "file": + item = self.get_item(Path(fileitem.path)) + if item: + return [item] + return None + resp: Response = RequestUtils( + headers=self.__get_header_with_token() + ).post_res( self.__get_api_url("/api/fs/list"), json={ "path": fileitem.path, @@ -175,6 +192,9 @@ class Alist(StorageBase): } """ + if resp is None: + logging.warning(f"请求获取目录 {fileitem.path} 的文件列表失败,无法连接alist服务") + return if resp.status_code != 200: logging.warning( f"请求获取目录 {fileitem.path} 的文件列表失败,状态码:{resp.status_code}" @@ -193,27 +213,29 @@ class Alist(StorageBase): schemas.FileItem( storage=self.schema.value, type="dir" if item["is_dir"] else "file", - path=fileitem.path + "/" + item["name"], + path=(Path(fileitem.path) / item["name"]).as_posix() + ("/" if item["is_dir"] else ""), name=item["name"], basename=Path(item["name"]).stem, extension=Path(item["name"]).suffix, size=item["size"], - modify_time=item["modified"], + modify_time=self.__parse_timestamp(item["modified"]), thumbnail=item["thumb"], ) - for item in result["data"]["content"] + for item in result["data"]["content"] or [] ] def create_folder( - self, fileitem: schemas.FileItem, name: str + self, fileitem: schemas.FileItem, name: str ) -> Optional[schemas.FileItem]: """ 创建目录 """ path = Path(fileitem.path) / name - resp: Response = self.req.post_res( + resp: Response = RequestUtils( + headers=self.__get_header_with_token() + ).post_res( self.__get_api_url("/api/fs/mkdir"), - json={"path": path}, + json={"path": path.as_posix()}, ) """ { @@ -249,19 +271,19 @@ class Alist(StorageBase): folder = self.create_folder(self.get_parent(schemas.FileItem( storage=self.schema.value, type="dir", - path=path.as_posix(), + path=path.as_posix() + "/", name=path.name, basename=path.stem )), path.name) return folder def get_item( - self, - path: Path, - password: str = "", - page: int = 1, - per_page: int = 0, - refresh: bool = False, + self, + path: Path, + password: str = "", + page: int = 1, + per_page: int = 0, + refresh: bool = False, ) -> Optional[schemas.FileItem]: """ 获取文件或目录,不存在返回None @@ -271,7 +293,9 @@ class Alist(StorageBase): :param per_page: 每页数量 :param refresh: 是否刷新 """ - resp: Response = self.req.post_res( + resp: Response = RequestUtils( + headers=self.__get_header_with_token() + ).post_res( self.__get_api_url("/api/fs/get"), json={ "path": path.as_posix(), @@ -327,12 +351,12 @@ class Alist(StorageBase): return schemas.FileItem( storage=self.schema.value, type="dir" if result["data"]["is_dir"] else "file", - path=path, + path=path.as_posix() + ("/" if result["data"]["is_dir"] else ""), name=result["data"]["name"], basename=Path(result["data"]["name"]).stem, extension=Path(result["data"]["name"]).suffix, size=result["data"]["size"], - modify_time=result["data"]["modified"], + modify_time=self.__parse_timestamp(result["data"]["modified"]), thumbnail=result["data"]["thumb"], ) @@ -346,7 +370,9 @@ class Alist(StorageBase): """ 删除文件 """ - resp: Response = self.req.post_res( + resp: Response = RequestUtils( + headers=self.__get_header_with_token() + ).post_res( self.__get_api_url("/api/fs/delete"), json={ "dir": Path(fileitem.path).parent.as_posix(), @@ -388,7 +414,9 @@ class Alist(StorageBase): """ 重命名文件 """ - resp: Response = self.req.post_res( + resp: Response = RequestUtils( + headers=self.__get_header_with_token() + ).post_res( self.__get_api_url("/api/fs/rename"), json={ "name": name, @@ -426,20 +454,20 @@ class Alist(StorageBase): return True def download( - self, - fileitem: schemas.FileItem, - path: Path = None, - password: str = "", - raw_url: bool = False, + self, + fileitem: schemas.FileItem, + path: Path = None, + password: str = "", ) -> Optional[Path]: """ 下载文件,保存到本地,返回本地临时文件地址 :param fileitem: 文件项 :param path: 文件保存路径 :param password: 文件密码 - :param raw_url: 是否使用原始链接下载 """ - resp: Response = self.req.post_res( + resp: Response = RequestUtils( + headers=self.__get_header_with_token() + ).post_res( self.__get_api_url("/api/fs/get"), json={ "path": fileitem.path, @@ -484,14 +512,20 @@ class Alist(StorageBase): logging.warning(f'获取文件 {path} 失败,错误信息:{result["message"]}') return - if raw_url: + if result["data"]["raw_url"]: download_url = result["data"]["raw_url"] else: download_url = UrlUtils.adapt_request_url(self.__get_base_url, f"/d{fileitem.path}") if result["data"]["sign"]: download_url = download_url + "?sign=" + result["data"]["sign"] - resp = self.req.get_res(download_url) + resp = RequestUtils( + headers=self.__get_header_with_token() + ).get_res(download_url) + + if not path: + path = settings.TEMP_PATH / fileitem.name + with open(path, "wb") as f: f.write(resp.content) @@ -500,7 +534,7 @@ class Alist(StorageBase): return None def upload( - self, fileitem: schemas.FileItem, path: Path, task: bool = False + self, fileitem: schemas.FileItem, path: Path, task: bool = False ) -> Optional[schemas.FileItem]: """ 上传文件 @@ -555,7 +589,7 @@ class Alist(StorageBase): return src_dir, traget_dir, [name], True def copy( - self, fileitem: schemas.FileItem, target: Union[schemas.FileItem, Path] + self, fileitem: schemas.FileItem, target: Union[schemas.FileItem, Path] ) -> bool: """ 复制文件 @@ -568,7 +602,9 @@ class Alist(StorageBase): if not is_valid: return False - resp: Response = self.req.post_res( + resp: Response = RequestUtils( + headers=self.__get_header_with_token() + ).post_res( self.__get_api_url("/api/fs/copy"), json={ "src_dir": src_dir, @@ -611,7 +647,7 @@ class Alist(StorageBase): return True def move( - self, fileitem: schemas.FileItem, target: Union[schemas.FileItem, Path] + self, fileitem: schemas.FileItem, target: Union[schemas.FileItem, Path] ) -> bool: """ 移动文件 @@ -622,7 +658,9 @@ class Alist(StorageBase): if not is_valid: return False - resp: Response = self.req.post_res( + resp: Response = RequestUtils( + headers=self.__get_header_with_token() + ).post_res( self.__get_api_url("/api/fs/move"), json={ "src_dir": src_dir, @@ -705,3 +743,15 @@ class Alist(StorageBase): __snapshot_file(fileitem) return files_info + + @staticmethod + def __parse_timestamp(time_str: str) -> float: + try: + # 尝试解析带微秒的时间格式 + dt = datetime.strptime(time_str[:26], '%Y-%m-%dT%H:%M:%S.%f') + except ValueError: + # 如果失败,尝试解析不带微秒的时间格式 + dt = datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%SZ') + + # 返回时间戳 + return dt.timestamp()