fix(security): rebind resource cookie on user changes (#6049)

This commit is contained in:
InfinityPacer
2026-07-05 09:24:27 +08:00
committed by GitHub
parent d14d401c86
commit cab2ac400a
2 changed files with 118 additions and 3 deletions

View File

@@ -169,6 +169,15 @@ def set_or_refresh_resource_token_cookie(
# 根据剩余时长提前刷新令牌
if remaining_time < timedelta(seconds=(settings.RESOURCE_ACCESS_TOKEN_EXPIRE_SECONDS / 3)):
raise jwt.ExpiredSignatureError
expected_claims = {
"sub": str(payload.sub),
"username": payload.username,
"super_user": payload.super_user,
"level": payload.level,
"purpose": "resource",
}
if any(decoded_token.get(claim) != value for claim, value in expected_claims.items()):
raise jwt.InvalidTokenError("资源令牌身份或权限上下文不匹配")
except jwt.PyJWTError:
logger.debug(f"Token error occurred. refreshing token")
except Exception as e:

View File

@@ -1,9 +1,12 @@
import datetime
from unittest import TestCase
import jwt
from fastapi import Response
from app import schemas
from app.core.security import set_or_refresh_resource_token_cookie
from app.core.config import settings
from app.core.security import ALGORITHM, create_access_token, set_or_refresh_resource_token_cookie
class FakeURL:
@@ -16,13 +19,62 @@ class FakeRequest:
最小化的请求桩对象,仅提供 set_or_refresh_resource_token_cookie 所需属性。
"""
def __init__(self, scheme: str, headers: dict | None = None) -> None:
def __init__(self, scheme: str, headers: dict | None = None, cookies: dict | None = None) -> None:
self.url = FakeURL(scheme)
self.headers = headers or {}
self.cookies: dict = {}
self.cookies: dict = cookies or {}
class ResourceTokenCookieSecureFlagTest(TestCase):
def _build_request_with_resource_cookie(
self,
*,
userid: int = 1,
username: str = "test",
super_user: bool = False,
level: int = 1,
purpose: str = "resource"
) -> FakeRequest:
resource_token = create_access_token(
userid=userid,
username=username,
super_user=super_user,
level=level,
purpose=purpose,
)
return FakeRequest(
scheme="https",
cookies={settings.PROJECT_NAME: resource_token},
)
def _build_request_with_resource_secret_cookie(
self,
*,
userid: int = 1,
username: str = "test",
super_user: bool = False,
level: int = 1,
purpose: str = "authentication"
) -> FakeRequest:
now = datetime.datetime.now(datetime.UTC)
resource_token = jwt.encode(
{
"exp": now + datetime.timedelta(seconds=settings.RESOURCE_ACCESS_TOKEN_EXPIRE_SECONDS),
"iat": now,
"sub": str(userid),
"username": username,
"super_user": super_user,
"level": level,
"purpose": purpose,
},
settings.RESOURCE_SECRET_KEY,
algorithm=ALGORITHM,
)
return FakeRequest(
scheme="https",
cookies={settings.PROJECT_NAME: resource_token},
)
def test_secure_flag_set_when_https_terminated_at_reverse_proxy(self):
"""
当反向代理(如 nginx终止 HTTPS 并以 HTTP 转发给后端时,
@@ -36,3 +88,57 @@ class ResourceTokenCookieSecureFlagTest(TestCase):
set_cookie_header = response.headers.get("set-cookie", "")
self.assertIn("Secure", set_cookie_header)
def test_existing_matching_resource_cookie_is_reused(self):
request = self._build_request_with_resource_cookie()
response = Response()
payload = schemas.TokenPayload(sub=1, username="test", super_user=False, level=1)
set_or_refresh_resource_token_cookie(request, response, payload)
self.assertIsNone(response.headers.get("set-cookie"))
def test_existing_resource_cookie_with_different_sub_is_replaced(self):
request = self._build_request_with_resource_cookie(userid=2)
response = Response()
payload = schemas.TokenPayload(sub=1, username="test", super_user=False, level=1)
set_or_refresh_resource_token_cookie(request, response, payload)
self.assertIn(f"{settings.PROJECT_NAME}=", response.headers.get("set-cookie", ""))
def test_existing_resource_cookie_with_different_username_is_replaced(self):
request = self._build_request_with_resource_cookie(username="other")
response = Response()
payload = schemas.TokenPayload(sub=1, username="test", super_user=False, level=1)
set_or_refresh_resource_token_cookie(request, response, payload)
self.assertIn(f"{settings.PROJECT_NAME}=", response.headers.get("set-cookie", ""))
def test_existing_resource_cookie_with_different_super_user_is_replaced(self):
request = self._build_request_with_resource_cookie(super_user=True)
response = Response()
payload = schemas.TokenPayload(sub=1, username="test", super_user=False, level=1)
set_or_refresh_resource_token_cookie(request, response, payload)
self.assertIn(f"{settings.PROJECT_NAME}=", response.headers.get("set-cookie", ""))
def test_existing_resource_cookie_with_different_level_is_replaced(self):
request = self._build_request_with_resource_cookie(level=2)
response = Response()
payload = schemas.TokenPayload(sub=1, username="test", super_user=False, level=1)
set_or_refresh_resource_token_cookie(request, response, payload)
self.assertIn(f"{settings.PROJECT_NAME}=", response.headers.get("set-cookie", ""))
def test_existing_resource_signed_cookie_with_wrong_purpose_is_replaced(self):
request = self._build_request_with_resource_secret_cookie(purpose="authentication")
response = Response()
payload = schemas.TokenPayload(sub=1, username="test", super_user=False, level=1)
set_or_refresh_resource_token_cookie(request, response, payload)
self.assertIn(f"{settings.PROJECT_NAME}=", response.headers.get("set-cookie", ""))