mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-03-19 19:46:55 +08:00
refactor: 优化登录安全性并重构 PassKey 逻辑
- 统一登录失败返回信息,防止信息泄露 - 提取 PassKeyHelper 公共函数,简化 Base64 和凭证处理 - 重构 mfa.py 端点代码,提升可读性和维护性 - 移除冗余的 origin 验证逻辑
This commit is contained in:
@@ -90,6 +90,79 @@ class PassKeyHelper:
|
||||
logger.error(f"标准化凭证ID失败: {e}")
|
||||
return credential_id
|
||||
|
||||
@staticmethod
|
||||
def _base64_encode_urlsafe(data: bytes) -> str:
|
||||
"""
|
||||
Base64 URL Safe 编码(不带填充)
|
||||
|
||||
:param data: 要编码的字节数据
|
||||
:return: Base64 URL Safe 编码的字符串
|
||||
"""
|
||||
return base64.urlsafe_b64encode(data).decode('utf-8').rstrip('=')
|
||||
|
||||
@staticmethod
|
||||
def _base64_decode_urlsafe(data: str) -> bytes:
|
||||
"""
|
||||
Base64 URL Safe 解码(自动添加填充)
|
||||
|
||||
:param data: Base64 URL Safe 编码的字符串
|
||||
:return: 解码后的字节数据
|
||||
"""
|
||||
return base64.urlsafe_b64decode(data + '==')
|
||||
|
||||
@staticmethod
|
||||
def _parse_credential_list(credentials: List[Dict[str, Any]]) -> List[PublicKeyCredentialDescriptor]:
|
||||
"""
|
||||
解析凭证列表为 PublicKeyCredentialDescriptor 列表
|
||||
|
||||
:param credentials: 凭证字典列表
|
||||
:return: PublicKeyCredentialDescriptor 列表
|
||||
"""
|
||||
result = []
|
||||
for cred in credentials:
|
||||
try:
|
||||
result.append(
|
||||
PublicKeyCredentialDescriptor(
|
||||
id=PassKeyHelper._base64_decode_urlsafe(cred['credential_id']),
|
||||
transports=[
|
||||
AuthenticatorTransport(t) for t in cred.get('transports', '').split(',') if t
|
||||
] if cred.get('transports') else None
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"解析凭证失败: {e}")
|
||||
continue
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _get_user_verification_requirement(user_verification: Optional[str] = None) -> UserVerificationRequirement:
|
||||
"""
|
||||
获取用户验证要求
|
||||
|
||||
:param user_verification: 指定的用户验证要求,如果不指定则从配置中读取
|
||||
:return: UserVerificationRequirement
|
||||
"""
|
||||
if user_verification:
|
||||
return UserVerificationRequirement(user_verification)
|
||||
return UserVerificationRequirement.REQUIRED if settings.PASSKEY_REQUIRE_UV \
|
||||
else UserVerificationRequirement.PREFERRED
|
||||
|
||||
@staticmethod
|
||||
def _get_verification_params(
|
||||
expected_origin: Optional[str] = None,
|
||||
expected_rp_id: Optional[str] = None
|
||||
) -> Tuple[str, str]:
|
||||
"""
|
||||
获取验证参数(origin 和 rp_id)
|
||||
|
||||
:param expected_origin: 期望的源地址
|
||||
:param expected_rp_id: 期望的RP ID
|
||||
:return: (origin, rp_id)
|
||||
"""
|
||||
origin = expected_origin or PassKeyHelper.get_origin()
|
||||
rp_id = expected_rp_id or PassKeyHelper.get_rp_id()
|
||||
return origin, rp_id
|
||||
|
||||
@staticmethod
|
||||
def generate_registration_options(
|
||||
user_id: int,
|
||||
@@ -109,27 +182,13 @@ class PassKeyHelper:
|
||||
try:
|
||||
# 用户信息
|
||||
user_id_bytes = str(user_id).encode('utf-8')
|
||||
|
||||
|
||||
# 排除已有的凭证
|
||||
exclude_credentials = []
|
||||
if existing_credentials:
|
||||
for cred in existing_credentials:
|
||||
try:
|
||||
exclude_credentials.append(
|
||||
PublicKeyCredentialDescriptor(
|
||||
id=base64.urlsafe_b64decode(cred['credential_id'] + '=='),
|
||||
transports=[
|
||||
AuthenticatorTransport(t) for t in cred.get('transports', '').split(',') if t
|
||||
] if cred.get('transports') else None
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"解析凭证失败: {e}")
|
||||
continue
|
||||
exclude_credentials = PassKeyHelper._parse_credential_list(existing_credentials) \
|
||||
if existing_credentials else None
|
||||
|
||||
# 用户验证要求
|
||||
uv_requirement = UserVerificationRequirement.REQUIRED if settings.PASSKEY_REQUIRE_UV \
|
||||
else UserVerificationRequirement.PREFERRED
|
||||
uv_requirement = PassKeyHelper._get_user_verification_requirement()
|
||||
|
||||
# 生成注册选项
|
||||
options = generate_registration_options(
|
||||
@@ -138,7 +197,7 @@ class PassKeyHelper:
|
||||
user_id=user_id_bytes,
|
||||
user_name=username,
|
||||
user_display_name=display_name or username,
|
||||
exclude_credentials=exclude_credentials if exclude_credentials else None,
|
||||
exclude_credentials=exclude_credentials,
|
||||
authenticator_selection=AuthenticatorSelectionCriteria(
|
||||
authenticator_attachment=None,
|
||||
resident_key=ResidentKeyRequirement.REQUIRED,
|
||||
@@ -152,9 +211,9 @@ class PassKeyHelper:
|
||||
|
||||
# 转换为JSON
|
||||
options_json = options_to_json(options)
|
||||
|
||||
|
||||
# 提取challenge(用于后续验证)
|
||||
challenge = base64.urlsafe_b64encode(options.challenge).decode('utf-8').rstrip('=')
|
||||
challenge = PassKeyHelper._base64_encode_urlsafe(options.challenge)
|
||||
|
||||
return options_json, challenge
|
||||
|
||||
@@ -162,29 +221,6 @@ class PassKeyHelper:
|
||||
logger.error(f"生成注册选项失败: {e}")
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
def _get_verified_origin(credential: Dict[str, Any], rp_id: str, default_origin: str) -> str:
|
||||
"""
|
||||
在 localhost 环境下获取并验证实际 Origin,否则返回默认值
|
||||
"""
|
||||
if not settings.APP_DOMAIN and rp_id == 'localhost':
|
||||
try:
|
||||
# 解析 clientDataJSON 获取实际的 origin
|
||||
client_data_json = json.loads(
|
||||
base64.urlsafe_b64decode(
|
||||
credential['response']['clientDataJSON'].replace('-', '+').replace('_', '/') + '=='
|
||||
).decode('utf-8')
|
||||
)
|
||||
actual_origin = client_data_json.get('origin', '')
|
||||
hostname = urlparse(actual_origin).hostname
|
||||
|
||||
if hostname in ['localhost', '127.0.0.1']:
|
||||
logger.info(f"本地环境,使用动态 origin: {actual_origin}")
|
||||
return actual_origin
|
||||
except Exception as e:
|
||||
logger.warning(f"无法提取动态 origin: {e}")
|
||||
return default_origin
|
||||
|
||||
@staticmethod
|
||||
def verify_registration_response(
|
||||
credential: Dict[str, Any],
|
||||
@@ -203,18 +239,13 @@ class PassKeyHelper:
|
||||
"""
|
||||
try:
|
||||
# 准备验证参数
|
||||
origin = expected_origin or PassKeyHelper.get_origin()
|
||||
rp_id = expected_rp_id or PassKeyHelper.get_rp_id()
|
||||
|
||||
origin, rp_id = PassKeyHelper._get_verification_params(expected_origin, expected_rp_id)
|
||||
# 解码challenge
|
||||
challenge_bytes = base64.urlsafe_b64decode(expected_challenge + '==')
|
||||
challenge_bytes = PassKeyHelper._base64_decode_urlsafe(expected_challenge)
|
||||
|
||||
# 构建RegistrationCredential对象
|
||||
registration_credential = parse_registration_credential_json(json.dumps(credential))
|
||||
|
||||
# 获取并验证 Origin
|
||||
origin = PassKeyHelper._get_verified_origin(credential, rp_id, origin)
|
||||
|
||||
# 验证注册响应
|
||||
verification = verify_registration_response(
|
||||
credential=registration_credential,
|
||||
@@ -225,8 +256,8 @@ class PassKeyHelper:
|
||||
)
|
||||
|
||||
# 提取信息
|
||||
credential_id = base64.urlsafe_b64encode(verification.credential_id).decode('utf-8').rstrip('=')
|
||||
public_key = base64.urlsafe_b64encode(verification.credential_public_key).decode('utf-8').rstrip('=')
|
||||
credential_id = PassKeyHelper._base64_encode_urlsafe(verification.credential_id)
|
||||
public_key = PassKeyHelper._base64_encode_urlsafe(verification.credential_public_key)
|
||||
sign_count = verification.sign_count
|
||||
# aaguid 可能已经是字符串格式,也可能是bytes
|
||||
if verification.aaguid:
|
||||
@@ -257,41 +288,24 @@ class PassKeyHelper:
|
||||
"""
|
||||
try:
|
||||
# 允许的凭证
|
||||
allow_credentials = []
|
||||
if existing_credentials:
|
||||
for cred in existing_credentials:
|
||||
try:
|
||||
allow_credentials.append(
|
||||
PublicKeyCredentialDescriptor(
|
||||
id=base64.urlsafe_b64decode(cred['credential_id'] + '=='),
|
||||
transports=[
|
||||
AuthenticatorTransport(t) for t in cred.get('transports', '').split(',') if t
|
||||
] if cred.get('transports') else None
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"解析凭证失败: {e}")
|
||||
continue
|
||||
allow_credentials = PassKeyHelper._parse_credential_list(existing_credentials) \
|
||||
if existing_credentials else None
|
||||
|
||||
# 用户验证要求
|
||||
if not user_verification:
|
||||
uv_requirement = UserVerificationRequirement.REQUIRED if settings.PASSKEY_REQUIRE_UV \
|
||||
else UserVerificationRequirement.PREFERRED
|
||||
else:
|
||||
uv_requirement = UserVerificationRequirement(user_verification)
|
||||
uv_requirement = PassKeyHelper._get_user_verification_requirement(user_verification)
|
||||
|
||||
# 生成认证选项
|
||||
options = generate_authentication_options(
|
||||
rp_id=PassKeyHelper.get_rp_id(),
|
||||
allow_credentials=allow_credentials if allow_credentials else None,
|
||||
allow_credentials=allow_credentials,
|
||||
user_verification=uv_requirement
|
||||
)
|
||||
|
||||
# 转换为JSON
|
||||
options_json = options_to_json(options)
|
||||
|
||||
|
||||
# 提取challenge
|
||||
challenge = base64.urlsafe_b64encode(options.challenge).decode('utf-8').rstrip('=')
|
||||
challenge = PassKeyHelper._base64_encode_urlsafe(options.challenge)
|
||||
|
||||
return options_json, challenge
|
||||
|
||||
@@ -321,19 +335,14 @@ class PassKeyHelper:
|
||||
"""
|
||||
try:
|
||||
# 准备验证参数
|
||||
origin = expected_origin or PassKeyHelper.get_origin()
|
||||
rp_id = expected_rp_id or PassKeyHelper.get_rp_id()
|
||||
|
||||
origin, rp_id = PassKeyHelper._get_verification_params(expected_origin, expected_rp_id)
|
||||
# 解码
|
||||
challenge_bytes = base64.urlsafe_b64decode(expected_challenge + '==')
|
||||
public_key_bytes = base64.urlsafe_b64decode(credential_public_key + '==')
|
||||
challenge_bytes = PassKeyHelper._base64_decode_urlsafe(expected_challenge)
|
||||
public_key_bytes = PassKeyHelper._base64_decode_urlsafe(credential_public_key)
|
||||
|
||||
# 构建AuthenticationCredential对象
|
||||
authentication_credential = parse_authentication_credential_json(json.dumps(credential))
|
||||
|
||||
# 获取并验证 Origin
|
||||
origin = PassKeyHelper._get_verified_origin(credential, rp_id, origin)
|
||||
|
||||
# 验证认证响应
|
||||
verification = verify_authentication_response(
|
||||
credential=authentication_credential,
|
||||
|
||||
Reference in New Issue
Block a user