diff --git a/app/core/security.py b/app/core/security.py index 5d67555e..b9596c43 100644 --- a/app/core/security.py +++ b/app/core/security.py @@ -17,6 +17,7 @@ from fastapi.security import OAuth2PasswordBearer, APIKeyHeader, APIKeyQuery, AP from passlib.context import CryptContext from app import schemas +from app.core.cache import cached from app.core.config import settings from app.log import logger @@ -24,7 +25,8 @@ pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") ALGORITHM = "HS256" # OAuth2PasswordBearer 用于 JWT Token 认证 -oauth2_scheme = OAuth2PasswordBearer( +oauth2_scheme_manual_error = OAuth2PasswordBearer( + auto_error=False, # 禁用自动错误处理,用以支持API令牌鉴权 tokenUrl=f"{settings.API_V1_STR}/login/access-token" ) @@ -41,6 +43,58 @@ api_key_header = APIKeyHeader(name="X-API-KEY", auto_error=False, scheme_name="a api_key_query = APIKeyQuery(name="apikey", auto_error=False, scheme_name="api_key_query") +def __get_api_token( + token_query: Annotated[str | None, Security(api_token_query)] = None +) -> str | None: + """ + 从 URL 查询参数中获取 API Token + :param token_query: 从 URL 中的 `token` 查询参数获取 API Token + :return: 返回获取到的 API Token,若无则返回 None + """ + return token_query + + +def __get_api_key( + key_query: Annotated[str | None, Security(api_key_query)] = None, + key_header: Annotated[str | None, Security(api_key_header)] = None +) -> str | None: + """ + 从 URL 查询参数或请求头部获取 API Key,优先使用请求头 + :param key_query: URL 中的 `apikey` 查询参数 + :param key_header: 请求头中的 `X-API-KEY` 参数 + :return: 返回从 URL 或请求头中获取的 API Key,若无则返回 None + """ + return key_header or key_query # 首选请求头 + + +@cached(maxsize=1, ttl=600) +def __create_superuser_token_payload() -> schemas.TokenPayload: + """ + 创建管理员用户的TokenPayload + + :return: 管理员TokenPayload + """ + # 延迟导入 + # pylint: disable=import-outside-toplevel + # pylint: disable=no-name-in-module + from app.db.user_oper import UserOper + from app.helper.sites import SitesHelper # noqa + + user = UserOper().get_by_name(settings.SUPERUSER) + if not user or not user.is_superuser: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="用户权限不足", + ) + return schemas.TokenPayload( + sub=user.id, + username=user.name, + super_user=user.is_superuser, + level=SitesHelper().auth_level, + purpose="authentication", + ) + + def create_access_token( userid: Union[str, Any], username: str, @@ -176,23 +230,43 @@ def __verify_token(token: str, purpose: Optional[str] = "authentication") -> sch def verify_token( request: Request, response: Response, - token: Annotated[str, Security(oauth2_scheme)] + jwt_token: Annotated[str | None, Security(oauth2_scheme_manual_error)], + api_key: Annotated[str | None, Security(__get_api_key)], + api_token: Annotated[str | None, Security(__get_api_token)], ) -> schemas.TokenPayload: """ 验证 JWT 令牌并自动处理 resource_token 写入 + + 如果缺少JWT令牌再尝试用API令牌鉴权 + :param request: 请求对象,用于访问 Cookie 和请求信息 :param response: 响应对象,用于设置 Cookie - :param token: 从 Authorization 头部获取的 JWT 令牌 + :param jwt_token: 从 Authorization 头部获取的 JWT 令牌 + :param api_key: 从 查询参数`apikey` 或 请求头`X-API-KEY` 获取 API Token + :param api_token: 从 查询参数`token` 获取 API Token :return: 解析后的 TokenPayload :raises HTTPException: 如果令牌无效或用途不匹配 """ - # 验证并解析 JWT 认证令牌 - payload = __verify_token(token=token, purpose="authentication") + if jwt_token: + # 验证并解析 JWT 认证令牌 + payload = __verify_token(token=jwt_token, purpose="authentication") - # 如果没有 resource_token,生成并写入到 Cookie - __set_or_refresh_resource_token_cookie(request, response, payload) + # 如果没有 resource_token,生成并写入到 Cookie + __set_or_refresh_resource_token_cookie(request, response, payload) - return payload + return payload + elif api_key: + verify_apikey(api_key) + return __create_superuser_token_payload() + elif api_token: + verify_apitoken(api_token) + return __create_superuser_token_payload() + else: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Not authenticated", + headers={"WWW-Authenticate": "Bearer"}, + ) def verify_resource_token( @@ -208,31 +282,7 @@ def verify_resource_token( return __verify_token(token=resource_token, purpose="resource") -def __get_api_token( - token_query: Annotated[str | None, Security(api_token_query)] = None -) -> str: - """ - 从 URL 查询参数中获取 API Token - :param token_query: 从 URL 中的 `token` 查询参数获取 API Token - :return: 返回获取到的 API Token,若无则返回 None - """ - return token_query - - -def __get_api_key( - key_query: Annotated[str | None, Security(api_key_query)] = None, - key_header: Annotated[str | None, Security(api_key_header)] = None -) -> str: - """ - 从 URL 查询参数或请求头部获取 API Key,优先使用 URL 参数 - :param key_query: URL 中的 `apikey` 查询参数 - :param key_header: 请求头中的 `X-API-KEY` 参数 - :return: 返回从 URL 或请求头中获取的 API Key,若无则返回 None - """ - return key_query or key_header - - -def __verify_key(key: str, expected_key: str, key_type: str) -> str: +def __verify_key(key: str | None, expected_key: str, key_type: str) -> str: """ 通用的 API Key 或 Token 验证函数 :param key: 从请求中获取的 API Key 或 Token @@ -241,7 +291,7 @@ def __verify_key(key: str, expected_key: str, key_type: str) -> str: :return: 返回校验通过的 API Key 或 Token :raises HTTPException: 如果校验不通过,抛出 401 错误 """ - if key != expected_key: + if not key or key != expected_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"{key_type} 校验不通过" @@ -249,7 +299,7 @@ def __verify_key(key: str, expected_key: str, key_type: str) -> str: return key -def verify_apitoken(token: Annotated[str, Security(__get_api_token)]) -> str: +def verify_apitoken(token: Annotated[str | None, Security(__get_api_token)]) -> str: """ 使用 API Token 进行身份认证 :param token: API Token,从 URL 查询参数中获取 token=xxx @@ -258,7 +308,7 @@ def verify_apitoken(token: Annotated[str, Security(__get_api_token)]) -> str: return __verify_key(token, settings.API_TOKEN, "token") -def verify_apikey(apikey: Annotated[str, Security(__get_api_key)]) -> str: +def verify_apikey(apikey: Annotated[str | None, Security(__get_api_key)]) -> str: """ 使用 API Key 进行身份认证 :param apikey: API Key,从 URL 查询参数中获取 apikey=xxx,或请求头中获取 X-API-KEY=xxx