mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-03-20 03:57:30 +08:00
让现有基于JWT令牌鉴权的接口也能支持API令牌鉴权
This commit is contained in:
@@ -24,7 +24,8 @@ pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||||
ALGORITHM = "HS256"
|
||||
|
||||
# OAuth2PasswordBearer 用于 JWT Token 认证
|
||||
oauth2_scheme = OAuth2PasswordBearer(
|
||||
oauth2_scheme_manual_error = OAuth2PasswordBearer(
|
||||
auto_error=False, # 禁用自动错误处理,用以支持API令牌鉴权
|
||||
tokenUrl=f"{settings.API_V1_STR}/login/access-token"
|
||||
)
|
||||
|
||||
@@ -41,6 +42,57 @@ api_key_header = APIKeyHeader(name="X-API-KEY", auto_error=False, scheme_name="a
|
||||
api_key_query = APIKeyQuery(name="apikey", auto_error=False, scheme_name="api_key_query")
|
||||
|
||||
|
||||
def __get_api_token(
|
||||
token_query: Annotated[str | None, Security(api_token_query)] = None
|
||||
) -> str | None:
|
||||
"""
|
||||
从 URL 查询参数中获取 API Token
|
||||
:param token_query: 从 URL 中的 `token` 查询参数获取 API Token
|
||||
:return: 返回获取到的 API Token,若无则返回 None
|
||||
"""
|
||||
return token_query
|
||||
|
||||
|
||||
def __get_api_key(
|
||||
key_query: Annotated[str | None, Security(api_key_query)] = None,
|
||||
key_header: Annotated[str | None, Security(api_key_header)] = None
|
||||
) -> str | None:
|
||||
"""
|
||||
从 URL 查询参数或请求头部获取 API Key,优先使用请求头
|
||||
:param key_query: URL 中的 `apikey` 查询参数
|
||||
:param key_header: 请求头中的 `X-API-KEY` 参数
|
||||
:return: 返回从 URL 或请求头中获取的 API Key,若无则返回 None
|
||||
"""
|
||||
return key_header or key_query # 首选请求头
|
||||
|
||||
|
||||
def __create_superuser_token_payload() -> schemas.TokenPayload:
|
||||
"""
|
||||
创建管理员用户的TokenPayload
|
||||
|
||||
:return: 管理员TokenPayload
|
||||
"""
|
||||
# 延迟导入
|
||||
# pylint: disable=import-outside-toplevel
|
||||
# pylint: disable=no-name-in-module
|
||||
from app.db.user_oper import UserOper
|
||||
from app.helper.sites import SitesHelper # noqa
|
||||
|
||||
user = UserOper().get_by_name(settings.SUPERUSER)
|
||||
if not user or not user.is_superuser:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="用户权限不足",
|
||||
)
|
||||
return schemas.TokenPayload(
|
||||
sub=user.id,
|
||||
username=user.name,
|
||||
super_user=user.is_superuser,
|
||||
level=SitesHelper().auth_level,
|
||||
purpose="authentication",
|
||||
)
|
||||
|
||||
|
||||
def create_access_token(
|
||||
userid: Union[str, Any],
|
||||
username: str,
|
||||
@@ -176,23 +228,43 @@ def __verify_token(token: str, purpose: Optional[str] = "authentication") -> sch
|
||||
def verify_token(
|
||||
request: Request,
|
||||
response: Response,
|
||||
token: Annotated[str, Security(oauth2_scheme)]
|
||||
jwt_token: Annotated[str | None, Security(oauth2_scheme_manual_error)],
|
||||
api_key: Annotated[str | None, Security(__get_api_key)],
|
||||
api_token: Annotated[str | None, Security(__get_api_token)],
|
||||
) -> schemas.TokenPayload:
|
||||
"""
|
||||
验证 JWT 令牌并自动处理 resource_token 写入
|
||||
|
||||
如果缺少JWT令牌再尝试用API令牌鉴权
|
||||
|
||||
:param request: 请求对象,用于访问 Cookie 和请求信息
|
||||
:param response: 响应对象,用于设置 Cookie
|
||||
:param token: 从 Authorization 头部获取的 JWT 令牌
|
||||
:param jwt_token: 从 Authorization 头部获取的 JWT 令牌
|
||||
:param api_key: 从 查询参数`apikey` 或 请求头`X-API-KEY` 获取 API Token
|
||||
:param api_token: 从 查询参数`token` 获取 API Token
|
||||
:return: 解析后的 TokenPayload
|
||||
:raises HTTPException: 如果令牌无效或用途不匹配
|
||||
"""
|
||||
# 验证并解析 JWT 认证令牌
|
||||
payload = __verify_token(token=token, purpose="authentication")
|
||||
if jwt_token:
|
||||
# 验证并解析 JWT 认证令牌
|
||||
payload = __verify_token(token=jwt_token, purpose="authentication")
|
||||
|
||||
# 如果没有 resource_token,生成并写入到 Cookie
|
||||
__set_or_refresh_resource_token_cookie(request, response, payload)
|
||||
# 如果没有 resource_token,生成并写入到 Cookie
|
||||
__set_or_refresh_resource_token_cookie(request, response, payload)
|
||||
|
||||
return payload
|
||||
return payload
|
||||
elif api_key:
|
||||
verify_apikey(api_key)
|
||||
return __create_superuser_token_payload()
|
||||
elif api_token:
|
||||
verify_apitoken(api_token)
|
||||
return __create_superuser_token_payload()
|
||||
else:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Not authenticated",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
|
||||
def verify_resource_token(
|
||||
@@ -208,31 +280,7 @@ def verify_resource_token(
|
||||
return __verify_token(token=resource_token, purpose="resource")
|
||||
|
||||
|
||||
def __get_api_token(
|
||||
token_query: Annotated[str | None, Security(api_token_query)] = None
|
||||
) -> str:
|
||||
"""
|
||||
从 URL 查询参数中获取 API Token
|
||||
:param token_query: 从 URL 中的 `token` 查询参数获取 API Token
|
||||
:return: 返回获取到的 API Token,若无则返回 None
|
||||
"""
|
||||
return token_query
|
||||
|
||||
|
||||
def __get_api_key(
|
||||
key_query: Annotated[str | None, Security(api_key_query)] = None,
|
||||
key_header: Annotated[str | None, Security(api_key_header)] = None
|
||||
) -> str:
|
||||
"""
|
||||
从 URL 查询参数或请求头部获取 API Key,优先使用 URL 参数
|
||||
:param key_query: URL 中的 `apikey` 查询参数
|
||||
:param key_header: 请求头中的 `X-API-KEY` 参数
|
||||
:return: 返回从 URL 或请求头中获取的 API Key,若无则返回 None
|
||||
"""
|
||||
return key_query or key_header
|
||||
|
||||
|
||||
def __verify_key(key: str, expected_key: str, key_type: str) -> str:
|
||||
def __verify_key(key: str | None, expected_key: str, key_type: str) -> str:
|
||||
"""
|
||||
通用的 API Key 或 Token 验证函数
|
||||
:param key: 从请求中获取的 API Key 或 Token
|
||||
@@ -241,7 +289,7 @@ def __verify_key(key: str, expected_key: str, key_type: str) -> str:
|
||||
:return: 返回校验通过的 API Key 或 Token
|
||||
:raises HTTPException: 如果校验不通过,抛出 401 错误
|
||||
"""
|
||||
if key != expected_key:
|
||||
if not key or key != expected_key:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail=f"{key_type} 校验不通过"
|
||||
@@ -249,7 +297,7 @@ def __verify_key(key: str, expected_key: str, key_type: str) -> str:
|
||||
return key
|
||||
|
||||
|
||||
def verify_apitoken(token: Annotated[str, Security(__get_api_token)]) -> str:
|
||||
def verify_apitoken(token: Annotated[str | None, Security(__get_api_token)]) -> str:
|
||||
"""
|
||||
使用 API Token 进行身份认证
|
||||
:param token: API Token,从 URL 查询参数中获取 token=xxx
|
||||
@@ -258,7 +306,7 @@ def verify_apitoken(token: Annotated[str, Security(__get_api_token)]) -> str:
|
||||
return __verify_key(token, settings.API_TOKEN, "token")
|
||||
|
||||
|
||||
def verify_apikey(apikey: Annotated[str, Security(__get_api_key)]) -> str:
|
||||
def verify_apikey(apikey: Annotated[str | None, Security(__get_api_key)]) -> str:
|
||||
"""
|
||||
使用 API Key 进行身份认证
|
||||
:param apikey: API Key,从 URL 查询参数中获取 apikey=xxx,或请求头中获取 X-API-KEY=xxx
|
||||
|
||||
Reference in New Issue
Block a user