mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-02-02 18:22:39 +08:00
Merge pull request #5439 from DDSRem-Dev/dev
This commit is contained in:
@@ -31,6 +31,17 @@ def qrcode(name: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
|
||||
return schemas.Response(success=False, message=errmsg)
|
||||
|
||||
|
||||
@router.get("/auth_url/{name}", summary="获取 OAuth2 授权 URL", response_model=schemas.Response)
|
||||
def auth_url(name: str, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
|
||||
"""
|
||||
获取 OAuth2 授权 URL
|
||||
"""
|
||||
auth_data, errmsg = StorageChain().generate_auth_url(name)
|
||||
if auth_data:
|
||||
return schemas.Response(success=True, data=auth_data)
|
||||
return schemas.Response(success=False, message=errmsg)
|
||||
|
||||
|
||||
@router.get("/check/{name}", summary="二维码登录确认", response_model=schemas.Response)
|
||||
def check(name: str, ck: Optional[str] = None, t: Optional[str] = None,
|
||||
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
|
||||
|
||||
@@ -31,6 +31,12 @@ class StorageChain(ChainBase):
|
||||
"""
|
||||
return self.run_module("generate_qrcode", storage=storage)
|
||||
|
||||
def generate_auth_url(self, storage: str) -> Optional[Tuple[dict, str]]:
|
||||
"""
|
||||
生成 OAuth2 授权 URL
|
||||
"""
|
||||
return self.run_module("generate_auth_url", storage=storage)
|
||||
|
||||
def check_login(self, storage: str, **kwargs) -> Optional[Tuple[dict, str]]:
|
||||
"""
|
||||
登录确认
|
||||
|
||||
@@ -209,6 +209,8 @@ class ConfigModel(BaseModel):
|
||||
# ==================== 云盘配置 ====================
|
||||
# 115 AppId
|
||||
U115_APP_ID: str = "100196807"
|
||||
# 115 OAuth2 Server 地址
|
||||
U115_AUTH_SERVER: str = "https://movie-pilot.org"
|
||||
# Alipan AppId
|
||||
ALIPAN_APP_ID: str = "ac1bf04dc9fd4d9aaabb65b4a668d403"
|
||||
|
||||
|
||||
@@ -197,6 +197,16 @@ class FileManagerModule(_ModuleBase):
|
||||
return None
|
||||
return storage_oper.generate_qrcode()
|
||||
|
||||
def generate_auth_url(self, storage: str) -> Optional[Tuple[dict, str]]:
|
||||
"""
|
||||
生成 OAuth2 授权 URL
|
||||
"""
|
||||
storage_oper = self.__get_storage_oper(storage, "generate_auth_url")
|
||||
if not storage_oper:
|
||||
logger.error(f"不支持 {storage} 的 OAuth2 授权")
|
||||
return {}, f"不支持 {storage} 的 OAuth2 授权"
|
||||
return storage_oper.generate_auth_url()
|
||||
|
||||
def check_login(self, storage: str, **kwargs) -> Optional[Dict[str, str]]:
|
||||
"""
|
||||
登录确认
|
||||
|
||||
@@ -57,6 +57,12 @@ class StorageBase(metaclass=ABCMeta):
|
||||
def generate_qrcode(self, *args, **kwargs) -> Optional[Tuple[dict, str]]:
|
||||
pass
|
||||
|
||||
def generate_auth_url(self, *args, **kwargs) -> Optional[Tuple[dict, str]]:
|
||||
"""
|
||||
生成 OAuth2 授权 URL
|
||||
"""
|
||||
return {}, "此存储不支持 OAuth2 授权"
|
||||
|
||||
def check_login(self, *args, **kwargs) -> Optional[Dict[str, str]]:
|
||||
pass
|
||||
|
||||
|
||||
@@ -105,6 +105,33 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
|
||||
self.session.headers.update({"Authorization": f"Bearer {access_token}"})
|
||||
return access_token
|
||||
|
||||
def generate_auth_url(self) -> Tuple[dict, str]:
|
||||
"""
|
||||
生成 OAuth2 授权 URL
|
||||
"""
|
||||
try:
|
||||
resp = self.session.get(f"{settings.U115_AUTH_SERVER}/u115/auth_url")
|
||||
if resp is None:
|
||||
return {}, "无法连接到授权服务器"
|
||||
|
||||
result = resp.json()
|
||||
if not result.get("success"):
|
||||
return {}, result.get("message", "获取授权URL失败")
|
||||
|
||||
data = result.get("data", {})
|
||||
auth_url = data.get("auth_url")
|
||||
state = data.get("state")
|
||||
|
||||
if not auth_url or not state:
|
||||
return {}, "授权服务器返回数据不完整"
|
||||
|
||||
self._auth_state = {"state": state}
|
||||
|
||||
return {"authUrl": auth_url, "state": state}, ""
|
||||
except Exception as e:
|
||||
logger.error(f"【115】获取授权 URL 失败: {str(e)}")
|
||||
return {}, f"获取授权 URL 失败: {str(e)}"
|
||||
|
||||
def generate_qrcode(self) -> Tuple[dict, str]:
|
||||
"""
|
||||
实现PKCE规范的设备授权二维码生成
|
||||
@@ -141,8 +168,11 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
|
||||
|
||||
def check_login(self) -> Optional[Tuple[dict, str]]:
|
||||
"""
|
||||
改进的带PKCE校验的登录状态检查
|
||||
检查授权状态
|
||||
"""
|
||||
if self._auth_state and self._auth_state.get("state"):
|
||||
return self.__check_oauth_login()
|
||||
|
||||
if not self._auth_state:
|
||||
return {}, "生成二维码失败"
|
||||
try:
|
||||
@@ -169,6 +199,46 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
|
||||
except Exception as e:
|
||||
return {}, str(e)
|
||||
|
||||
def __check_oauth_login(self) -> Tuple[dict, str]:
|
||||
"""
|
||||
检查 OAuth2 授权状态
|
||||
"""
|
||||
state = self._auth_state.get("state")
|
||||
if not state:
|
||||
return {}, "state为空"
|
||||
|
||||
try:
|
||||
resp = self.session.get(
|
||||
f"{settings.U115_AUTH_SERVER}/u115/token",
|
||||
params={"state": state}
|
||||
)
|
||||
if resp is None:
|
||||
return {}, "无法连接到授权服务器"
|
||||
|
||||
result = resp.json()
|
||||
status = result.get("status", "pending")
|
||||
|
||||
if status == "completed":
|
||||
data = result.get("data", {})
|
||||
if data:
|
||||
self.set_config({
|
||||
"refresh_time": int(time.time()),
|
||||
"access_token": data.get("access_token"),
|
||||
"refresh_token": data.get("refresh_token"),
|
||||
"expires_in": data.get("expires_in"),
|
||||
})
|
||||
self._auth_state = {}
|
||||
return {"status": 2, "tip": "授权成功"}, ""
|
||||
return {}, "授权服务器返回数据不完整"
|
||||
elif status == "expired":
|
||||
self._auth_state = {}
|
||||
return {"status": -1, "tip": result.get("message", "授权已过期")}, ""
|
||||
else:
|
||||
return {"status": 0, "tip": "等待用户授权"}, ""
|
||||
except Exception as e:
|
||||
logger.error(f"【115】检查授权状态失败: {str(e)}")
|
||||
return {}, f"检查授权状态失败: {str(e)}"
|
||||
|
||||
def __get_access_token(self) -> dict:
|
||||
"""
|
||||
确认登录后,获取相关token
|
||||
|
||||
Reference in New Issue
Block a user