diff --git a/app/modules/filemanager/storages/u115.py b/app/modules/filemanager/storages/u115.py index 7d0772a4..746abef8 100644 --- a/app/modules/filemanager/storages/u115.py +++ b/app/modules/filemanager/storages/u115.py @@ -1,6 +1,7 @@ import base64 import hashlib import secrets +import threading import time from pathlib import Path from typing import List, Dict, Optional, Tuple, Union @@ -16,6 +17,8 @@ from app.schemas.types import StorageSchema from app.utils.singleton import Singleton from app.utils.string import StringUtils +lock = threading.Lock() + class U115Pan(StorageBase, metaclass=Singleton): """ @@ -54,27 +57,37 @@ class U115Pan(StorageBase, metaclass=Singleton): "Accept-Encoding": "gzip, deflate", "Content-Type": "application/x-www-form-urlencoded" }) - self.init_storage() + + def _check_session(self): + """ + 检查会话是否过期 + """ + if not self.access_token: + raise Exception("【115】请先扫码登录!") @property def access_token(self) -> Optional[str]: """ 访问token """ - tokens = self.get_conf() - refresh_token = tokens.get("refresh_token") - if not refresh_token: - return None - expires_in = tokens.get("expires_in", 0) - refresh_time = tokens.get("refresh_time", 0) - if expires_in and refresh_time + expires_in < int(time.time()): - tokens = self.__refresh_access_token(refresh_token) - if tokens: - self.set_config({ - "refresh_time": int(time.time()), - **tokens - }) - return tokens.get("access_token") + with lock: + tokens = self.get_conf() + refresh_token = tokens.get("refresh_token") + if not refresh_token: + return None + expires_in = tokens.get("expires_in", 0) + refresh_time = tokens.get("refresh_time", 0) + if expires_in and refresh_time + expires_in < int(time.time()): + tokens = self.__refresh_access_token(refresh_token) + if tokens: + self.set_config({ + "refresh_time": int(time.time()), + **tokens + }) + access_token = tokens.get("access_token") + if access_token: + self.session.headers.update({"Authorization": f"Bearer {access_token}"}) + return access_token def generate_qrcode(self) -> Tuple[dict, str]: """ @@ -112,6 +125,36 @@ class U115Pan(StorageBase, metaclass=Singleton): "codeContent": result['data']['qrcode'] }, "" + def check_login(self) -> Optional[Tuple[dict, str]]: + """ + 改进的带PKCE校验的登录状态检查 + """ + if not self._auth_state: + return {}, "生成二维码失败" + try: + resp = self.session.get( + "https://qrcodeapi.115.com/get/status/", + params={ + "uid": self._auth_state["uid"], + "time": self._auth_state["time"], + "sign": self._auth_state["sign"] + } + ) + if resp is None: + return {}, "网络错误" + result = resp.json() + if result.get("code") != 0 or not result.get("data"): + return {}, result.get("message") + if result["data"]["status"] == 2: + tokens = self.__get_access_token() + self.set_config({ + "refresh_time": int(time.time()), + **tokens + }) + return {"status": result["data"]["status"], "tip": result["data"]["msg"]}, "" + except Exception as e: + return {}, str(e) + def __get_access_token(self) -> dict: """ 确认登录后,获取相关token @@ -155,6 +198,9 @@ class U115Pan(StorageBase, metaclass=Singleton): """ 带错误处理和速率限制的API请求 """ + # 检查会话 + self._check_session() + resp = self.session.request( method, f"{self.base_url}{endpoint}", **kwargs @@ -174,26 +220,6 @@ class U115Pan(StorageBase, metaclass=Singleton): # 返回数据 ret_data = resp.json() - - # 处理refresh_token失效 - if ret_data.get("code") in [40140116, 40140119]: - self.set_config({}) - logger.warn("【115】refresh_token 失效,请重新扫描登录!") - return None - - # 处理access_token失效 - if ret_data.get("code") == 40140125: - refresh_token = self.get_conf().get("refresh_token") - if refresh_token: - tokens = self.__refresh_access_token(refresh_token) - if tokens: - self.set_config({ - "refresh_time": int(time.time()), - **tokens - }) - return self._request_api(method, endpoint, result_key, **kwargs) - return None - if ret_data.get("code") != 0: logger.warn(f"【115】{method} 请求 {endpoint} 出错:{ret_data.get('message')}!") @@ -284,43 +310,8 @@ class U115Pan(StorageBase, metaclass=Singleton): sha1.update(chunk) return sha1.hexdigest() - def check_login(self) -> Optional[Tuple[dict, str]]: - """ - 改进的带PKCE校验的登录状态检查 - """ - if not self._auth_state: - return {}, "生成二维码失败" - try: - resp = self.session.get( - "https://qrcodeapi.115.com/get/status/", - params={ - "uid": self._auth_state["uid"], - "time": self._auth_state["time"], - "sign": self._auth_state["sign"] - } - ) - if resp is None: - return {}, "网络错误" - result = resp.json() - if result.get("code") != 0 or not result.get("data"): - return {}, result.get("message") - if result["data"]["status"] == 2: - tokens = self.__get_access_token() - self.set_config({ - "refresh_time": int(time.time()), - **tokens - }) - return {"status": result["data"]["status"], "tip": result["data"]["msg"]}, "" - except Exception as e: - return {}, str(e) - def init_storage(self): - """ - 初始化存储连接 - """ - self.session.headers.update({ - "Authorization": f"Bearer {self.access_token}" - }) + pass def list(self, fileitem: schemas.FileItem) -> List[schemas.FileItem]: """ @@ -498,7 +489,7 @@ class U115Pan(StorageBase, metaclass=Singleton): access_key_secret=token_resp['AccessKeySecret'], security_token=token_resp['SecurityToken'] ) - bucket = oss2.Bucket(auth, endpoint, init_result['bucket']) # noqa + bucket = oss2.Bucket(auth, endpoint, init_result['bucket']) # noqa # 分片上传 headers = {