Merge pull request #5362 from cddjr/feat_extended_api_token_support

This commit is contained in:
jxxghp
2026-01-15 13:33:56 +08:00
committed by GitHub

View File

@@ -17,6 +17,7 @@ from fastapi.security import OAuth2PasswordBearer, APIKeyHeader, APIKeyQuery, AP
from passlib.context import CryptContext
from app import schemas
from app.core.cache import cached
from app.core.config import settings
from app.log import logger
@@ -24,7 +25,8 @@ pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ALGORITHM = "HS256"
# OAuth2PasswordBearer 用于 JWT Token 认证
oauth2_scheme = OAuth2PasswordBearer(
oauth2_scheme_manual_error = OAuth2PasswordBearer(
auto_error=False, # 禁用自动错误处理用以支持API令牌鉴权
tokenUrl=f"{settings.API_V1_STR}/login/access-token"
)
@@ -41,6 +43,58 @@ api_key_header = APIKeyHeader(name="X-API-KEY", auto_error=False, scheme_name="a
api_key_query = APIKeyQuery(name="apikey", auto_error=False, scheme_name="api_key_query")
def __get_api_token(
token_query: Annotated[str | None, Security(api_token_query)] = None
) -> str | None:
"""
从 URL 查询参数中获取 API Token
:param token_query: 从 URL 中的 `token` 查询参数获取 API Token
:return: 返回获取到的 API Token若无则返回 None
"""
return token_query
def __get_api_key(
key_query: Annotated[str | None, Security(api_key_query)] = None,
key_header: Annotated[str | None, Security(api_key_header)] = None
) -> str | None:
"""
从 URL 查询参数或请求头部获取 API Key优先使用请求头
:param key_query: URL 中的 `apikey` 查询参数
:param key_header: 请求头中的 `X-API-KEY` 参数
:return: 返回从 URL 或请求头中获取的 API Key若无则返回 None
"""
return key_header or key_query # 首选请求头
@cached(maxsize=1, ttl=600)
def __create_superuser_token_payload() -> schemas.TokenPayload:
"""
创建管理员用户的TokenPayload
:return: 管理员TokenPayload
"""
# 延迟导入
# pylint: disable=import-outside-toplevel
# pylint: disable=no-name-in-module
from app.db.user_oper import UserOper
from app.helper.sites import SitesHelper # noqa
user = UserOper().get_by_name(settings.SUPERUSER)
if not user or not user.is_superuser:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户权限不足",
)
return schemas.TokenPayload(
sub=user.id,
username=user.name,
super_user=user.is_superuser,
level=SitesHelper().auth_level,
purpose="authentication",
)
def create_access_token(
userid: Union[str, Any],
username: str,
@@ -176,23 +230,43 @@ def __verify_token(token: str, purpose: Optional[str] = "authentication") -> sch
def verify_token(
request: Request,
response: Response,
token: Annotated[str, Security(oauth2_scheme)]
jwt_token: Annotated[str | None, Security(oauth2_scheme_manual_error)],
api_key: Annotated[str | None, Security(__get_api_key)],
api_token: Annotated[str | None, Security(__get_api_token)],
) -> schemas.TokenPayload:
"""
验证 JWT 令牌并自动处理 resource_token 写入
如果缺少JWT令牌再尝试用API令牌鉴权
:param request: 请求对象,用于访问 Cookie 和请求信息
:param response: 响应对象,用于设置 Cookie
:param token: 从 Authorization 头部获取的 JWT 令牌
:param jwt_token: 从 Authorization 头部获取的 JWT 令牌
:param api_key: 从 查询参数`apikey` 或 请求头`X-API-KEY` 获取 API Token
:param api_token: 从 查询参数`token` 获取 API Token
:return: 解析后的 TokenPayload
:raises HTTPException: 如果令牌无效或用途不匹配
"""
# 验证并解析 JWT 认证令牌
payload = __verify_token(token=token, purpose="authentication")
if jwt_token:
# 验证并解析 JWT 认证令牌
payload = __verify_token(token=jwt_token, purpose="authentication")
# 如果没有 resource_token生成并写入到 Cookie
__set_or_refresh_resource_token_cookie(request, response, payload)
# 如果没有 resource_token生成并写入到 Cookie
__set_or_refresh_resource_token_cookie(request, response, payload)
return payload
return payload
elif api_key:
verify_apikey(api_key)
return __create_superuser_token_payload()
elif api_token:
verify_apitoken(api_token)
return __create_superuser_token_payload()
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
def verify_resource_token(
@@ -208,31 +282,7 @@ def verify_resource_token(
return __verify_token(token=resource_token, purpose="resource")
def __get_api_token(
token_query: Annotated[str | None, Security(api_token_query)] = None
) -> str:
"""
从 URL 查询参数中获取 API Token
:param token_query: 从 URL 中的 `token` 查询参数获取 API Token
:return: 返回获取到的 API Token若无则返回 None
"""
return token_query
def __get_api_key(
key_query: Annotated[str | None, Security(api_key_query)] = None,
key_header: Annotated[str | None, Security(api_key_header)] = None
) -> str:
"""
从 URL 查询参数或请求头部获取 API Key优先使用 URL 参数
:param key_query: URL 中的 `apikey` 查询参数
:param key_header: 请求头中的 `X-API-KEY` 参数
:return: 返回从 URL 或请求头中获取的 API Key若无则返回 None
"""
return key_query or key_header
def __verify_key(key: str, expected_key: str, key_type: str) -> str:
def __verify_key(key: str | None, expected_key: str, key_type: str) -> str:
"""
通用的 API Key 或 Token 验证函数
:param key: 从请求中获取的 API Key 或 Token
@@ -241,7 +291,7 @@ def __verify_key(key: str, expected_key: str, key_type: str) -> str:
:return: 返回校验通过的 API Key 或 Token
:raises HTTPException: 如果校验不通过,抛出 401 错误
"""
if key != expected_key:
if not key or key != expected_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"{key_type} 校验不通过"
@@ -249,7 +299,7 @@ def __verify_key(key: str, expected_key: str, key_type: str) -> str:
return key
def verify_apitoken(token: Annotated[str, Security(__get_api_token)]) -> str:
def verify_apitoken(token: Annotated[str | None, Security(__get_api_token)]) -> str:
"""
使用 API Token 进行身份认证
:param token: API Token从 URL 查询参数中获取 token=xxx
@@ -258,7 +308,7 @@ def verify_apitoken(token: Annotated[str, Security(__get_api_token)]) -> str:
return __verify_key(token, settings.API_TOKEN, "token")
def verify_apikey(apikey: Annotated[str, Security(__get_api_key)]) -> str:
def verify_apikey(apikey: Annotated[str | None, Security(__get_api_key)]) -> str:
"""
使用 API Key 进行身份认证
:param apikey: API Key从 URL 查询参数中获取 apikey=xxx或请求头中获取 X-API-KEY=xxx