This commit is contained in:
jxxghp
2025-03-25 12:58:03 +08:00
parent e83fe0aabe
commit d9ed135be4

View File

@@ -1,6 +1,7 @@
import base64
import hashlib
import secrets
import threading
import time
from pathlib import Path
from typing import List, Dict, Optional, Tuple, Union
@@ -16,6 +17,8 @@ from app.schemas.types import StorageSchema
from app.utils.singleton import Singleton
from app.utils.string import StringUtils
lock = threading.Lock()
class U115Pan(StorageBase, metaclass=Singleton):
"""
@@ -54,27 +57,37 @@ class U115Pan(StorageBase, metaclass=Singleton):
"Accept-Encoding": "gzip, deflate",
"Content-Type": "application/x-www-form-urlencoded"
})
self.init_storage()
def _check_session(self):
"""
检查会话是否过期
"""
if not self.access_token:
raise Exception("【115】请先扫码登录")
@property
def access_token(self) -> Optional[str]:
"""
访问token
"""
tokens = self.get_conf()
refresh_token = tokens.get("refresh_token")
if not refresh_token:
return None
expires_in = tokens.get("expires_in", 0)
refresh_time = tokens.get("refresh_time", 0)
if expires_in and refresh_time + expires_in < int(time.time()):
tokens = self.__refresh_access_token(refresh_token)
if tokens:
self.set_config({
"refresh_time": int(time.time()),
**tokens
})
return tokens.get("access_token")
with lock:
tokens = self.get_conf()
refresh_token = tokens.get("refresh_token")
if not refresh_token:
return None
expires_in = tokens.get("expires_in", 0)
refresh_time = tokens.get("refresh_time", 0)
if expires_in and refresh_time + expires_in < int(time.time()):
tokens = self.__refresh_access_token(refresh_token)
if tokens:
self.set_config({
"refresh_time": int(time.time()),
**tokens
})
access_token = tokens.get("access_token")
if access_token:
self.session.headers.update({"Authorization": f"Bearer {access_token}"})
return access_token
def generate_qrcode(self) -> Tuple[dict, str]:
"""
@@ -112,6 +125,36 @@ class U115Pan(StorageBase, metaclass=Singleton):
"codeContent": result['data']['qrcode']
}, ""
def check_login(self) -> Optional[Tuple[dict, str]]:
"""
改进的带PKCE校验的登录状态检查
"""
if not self._auth_state:
return {}, "生成二维码失败"
try:
resp = self.session.get(
"https://qrcodeapi.115.com/get/status/",
params={
"uid": self._auth_state["uid"],
"time": self._auth_state["time"],
"sign": self._auth_state["sign"]
}
)
if resp is None:
return {}, "网络错误"
result = resp.json()
if result.get("code") != 0 or not result.get("data"):
return {}, result.get("message")
if result["data"]["status"] == 2:
tokens = self.__get_access_token()
self.set_config({
"refresh_time": int(time.time()),
**tokens
})
return {"status": result["data"]["status"], "tip": result["data"]["msg"]}, ""
except Exception as e:
return {}, str(e)
def __get_access_token(self) -> dict:
"""
确认登录后获取相关token
@@ -155,6 +198,9 @@ class U115Pan(StorageBase, metaclass=Singleton):
"""
带错误处理和速率限制的API请求
"""
# 检查会话
self._check_session()
resp = self.session.request(
method, f"{self.base_url}{endpoint}",
**kwargs
@@ -174,26 +220,6 @@ class U115Pan(StorageBase, metaclass=Singleton):
# 返回数据
ret_data = resp.json()
# 处理refresh_token失效
if ret_data.get("code") in [40140116, 40140119]:
self.set_config({})
logger.warn("【115】refresh_token 失效,请重新扫描登录!")
return None
# 处理access_token失效
if ret_data.get("code") == 40140125:
refresh_token = self.get_conf().get("refresh_token")
if refresh_token:
tokens = self.__refresh_access_token(refresh_token)
if tokens:
self.set_config({
"refresh_time": int(time.time()),
**tokens
})
return self._request_api(method, endpoint, result_key, **kwargs)
return None
if ret_data.get("code") != 0:
logger.warn(f"【115】{method} 请求 {endpoint} 出错:{ret_data.get('message')}")
@@ -284,43 +310,8 @@ class U115Pan(StorageBase, metaclass=Singleton):
sha1.update(chunk)
return sha1.hexdigest()
def check_login(self) -> Optional[Tuple[dict, str]]:
"""
改进的带PKCE校验的登录状态检查
"""
if not self._auth_state:
return {}, "生成二维码失败"
try:
resp = self.session.get(
"https://qrcodeapi.115.com/get/status/",
params={
"uid": self._auth_state["uid"],
"time": self._auth_state["time"],
"sign": self._auth_state["sign"]
}
)
if resp is None:
return {}, "网络错误"
result = resp.json()
if result.get("code") != 0 or not result.get("data"):
return {}, result.get("message")
if result["data"]["status"] == 2:
tokens = self.__get_access_token()
self.set_config({
"refresh_time": int(time.time()),
**tokens
})
return {"status": result["data"]["status"], "tip": result["data"]["msg"]}, ""
except Exception as e:
return {}, str(e)
def init_storage(self):
"""
初始化存储连接
"""
self.session.headers.update({
"Authorization": f"Bearer {self.access_token}"
})
pass
def list(self, fileitem: schemas.FileItem) -> List[schemas.FileItem]:
"""
@@ -498,7 +489,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
access_key_secret=token_resp['AccessKeySecret'],
security_token=token_resp['SecurityToken']
)
bucket = oss2.Bucket(auth, endpoint, init_result['bucket']) # noqa
bucket = oss2.Bucket(auth, endpoint, init_result['bucket']) # noqa
# 分片上传
headers = {