mirror of
https://github.com/EstrellaXD/Auto_Bangumi.git
synced 2026-04-13 14:39:44 +08:00
feat(auth): support usernameless passkey login (discoverable credentials)
- Change resident key requirement from PREFERRED to REQUIRED during registration - Add discoverable authentication options (empty allowCredentials) - Add verify_discoverable_authentication method in WebAuthn service - Update auth strategy to lookup user from credential when username not provided - Make username optional in frontend passkey login flow Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -5,6 +5,7 @@
|
||||
### Features
|
||||
|
||||
- 数据库迁移自动填充 NULL 值为模型默认值
|
||||
- Passkey 登录支持无用户名模式(可发现凭证)
|
||||
|
||||
### Fixes
|
||||
|
||||
@@ -18,6 +19,7 @@
|
||||
- 重新设计搜索面板,新增模态框和过滤系统
|
||||
- 重新设计登录面板,采用现代毛玻璃风格
|
||||
- 日志页面新增日志级别过滤功能
|
||||
- Passkey 登录支持无用户名模式(可发现凭证)
|
||||
|
||||
### Fixes
|
||||
|
||||
|
||||
@@ -160,9 +160,22 @@ async def get_passkey_login_options(
|
||||
"""
|
||||
生成 Passkey 登录选项(challenge)
|
||||
前端先调用此接口,再调用 navigator.credentials.get()
|
||||
|
||||
如果提供 username,返回该用户的 passkey 列表(allowCredentials)
|
||||
如果不提供 username,返回可发现凭证选项(浏览器显示所有可用 passkey)
|
||||
"""
|
||||
webauthn = _get_webauthn_from_request(request)
|
||||
|
||||
# Discoverable credentials mode (no username)
|
||||
if not auth_data.username:
|
||||
try:
|
||||
options = webauthn.generate_discoverable_authentication_options()
|
||||
return options
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to generate discoverable login options: {e}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
# Username-based mode
|
||||
async with async_session_factory() as session:
|
||||
try:
|
||||
# Get user
|
||||
@@ -201,6 +214,9 @@ async def login_with_passkey(
|
||||
):
|
||||
"""
|
||||
使用 Passkey 登录(替代密码登录)
|
||||
|
||||
如果提供 username,验证 passkey 属于该用户
|
||||
如果不提供 username(可发现凭证模式),从 credential 中提取用户信息
|
||||
"""
|
||||
webauthn = _get_webauthn_from_request(request)
|
||||
|
||||
@@ -208,12 +224,17 @@ async def login_with_passkey(
|
||||
resp = await strategy.authenticate(auth_data.username, auth_data.credential)
|
||||
|
||||
if resp.status:
|
||||
# Get username from response (may be discovered from credential)
|
||||
username = resp.data.get("username") if resp.data else auth_data.username
|
||||
if not username:
|
||||
raise HTTPException(status_code=500, detail="Failed to determine username")
|
||||
|
||||
token = create_access_token(
|
||||
data={"sub": auth_data.username}, expires_delta=timedelta(days=1)
|
||||
data={"sub": username}, expires_delta=timedelta(days=1)
|
||||
)
|
||||
response.set_cookie(key="token", value=token, httponly=True, max_age=86400)
|
||||
if auth_data.username not in active_user:
|
||||
active_user.append(auth_data.username)
|
||||
if username not in active_user:
|
||||
active_user.append(username)
|
||||
return {"access_token": token, "token_type": "bearer"}
|
||||
|
||||
raise HTTPException(status_code=resp.status_code, detail=resp.msg_en)
|
||||
|
||||
@@ -65,11 +65,11 @@ class PasskeyDelete(BaseModel):
|
||||
class PasskeyAuthStart(BaseModel):
|
||||
"""Passkey 认证开始请求"""
|
||||
|
||||
username: str
|
||||
username: Optional[str] = None # Optional for discoverable credentials
|
||||
|
||||
|
||||
class PasskeyAuthFinish(BaseModel):
|
||||
"""Passkey 认证完成请求"""
|
||||
|
||||
username: str
|
||||
username: Optional[str] = None # Optional for discoverable credentials
|
||||
credential: dict
|
||||
|
||||
@@ -16,12 +16,14 @@ class AuthStrategy(ABC):
|
||||
"""认证策略基类"""
|
||||
|
||||
@abstractmethod
|
||||
async def authenticate(self, username: str, credential: dict) -> ResponseModel:
|
||||
async def authenticate(
|
||||
self, username: str | None, credential: dict
|
||||
) -> ResponseModel:
|
||||
"""
|
||||
执行认证
|
||||
|
||||
Args:
|
||||
username: 用户名
|
||||
username: 用户名(可选,用于可发现凭证模式)
|
||||
credential: 认证凭证(密码或 WebAuthn 响应)
|
||||
|
||||
Returns:
|
||||
@@ -36,41 +38,28 @@ class PasskeyAuthStrategy(AuthStrategy):
|
||||
def __init__(self, webauthn_service):
|
||||
self.webauthn_service = webauthn_service
|
||||
|
||||
async def authenticate(self, username: str, credential: dict) -> ResponseModel:
|
||||
"""使用 WebAuthn Passkey 认证"""
|
||||
async with async_session_factory() as session:
|
||||
# 1. 查找用户
|
||||
try:
|
||||
result = await session.execute(
|
||||
select(User).where(User.username == username)
|
||||
)
|
||||
user = result.scalar_one_or_none()
|
||||
if not user:
|
||||
raise ValueError("User not found")
|
||||
except ValueError:
|
||||
return ResponseModel(
|
||||
status_code=401,
|
||||
status=False,
|
||||
msg_en="User not found",
|
||||
msg_zh="用户不存在",
|
||||
)
|
||||
async def authenticate(
|
||||
self, username: str | None, credential: dict
|
||||
) -> ResponseModel:
|
||||
"""
|
||||
使用 WebAuthn Passkey 认证
|
||||
|
||||
# 2. 提取 credential_id 并查找对应的 passkey
|
||||
Args:
|
||||
username: 用户名(可选)。如果为 None,使用可发现凭证模式
|
||||
credential: WebAuthn 凭证响应
|
||||
"""
|
||||
async with async_session_factory() as session:
|
||||
passkey_db = PasskeyDatabase(session)
|
||||
|
||||
# 1. 提取 credential_id
|
||||
try:
|
||||
raw_id = credential.get("rawId")
|
||||
if not raw_id:
|
||||
raise ValueError("Missing credential ID")
|
||||
|
||||
# 将 rawId 从 base64url 转换为标准格式
|
||||
credential_id_str = self.webauthn_service.base64url_encode(
|
||||
self.webauthn_service.base64url_decode(raw_id)
|
||||
)
|
||||
|
||||
passkey_db = PasskeyDatabase(session)
|
||||
passkey = await passkey_db.get_passkey_by_credential_id(credential_id_str)
|
||||
if not passkey or passkey.user_id != user.id:
|
||||
raise ValueError("Passkey not found or not owned by user")
|
||||
|
||||
except Exception:
|
||||
return ResponseModel(
|
||||
status_code=401,
|
||||
@@ -79,13 +68,54 @@ class PasskeyAuthStrategy(AuthStrategy):
|
||||
msg_zh="Passkey 凭证无效",
|
||||
)
|
||||
|
||||
# 3. 验证 WebAuthn 签名
|
||||
try:
|
||||
new_sign_count = self.webauthn_service.verify_authentication(
|
||||
username, credential, passkey
|
||||
# 2. 查找 passkey
|
||||
passkey = await passkey_db.get_passkey_by_credential_id(credential_id_str)
|
||||
if not passkey:
|
||||
return ResponseModel(
|
||||
status_code=401,
|
||||
status=False,
|
||||
msg_en="Passkey not found",
|
||||
msg_zh="未找到 Passkey",
|
||||
)
|
||||
|
||||
# 4. 更新使用记录
|
||||
# 3. 获取用户
|
||||
result = await session.execute(
|
||||
select(User).where(User.id == passkey.user_id)
|
||||
)
|
||||
user = result.scalar_one_or_none()
|
||||
if not user:
|
||||
return ResponseModel(
|
||||
status_code=401,
|
||||
status=False,
|
||||
msg_en="User not found",
|
||||
msg_zh="用户不存在",
|
||||
)
|
||||
|
||||
# 4. 如果提供了 username,验证一致性
|
||||
if username and user.username != username:
|
||||
return ResponseModel(
|
||||
status_code=401,
|
||||
status=False,
|
||||
msg_en="Passkey does not belong to specified user",
|
||||
msg_zh="Passkey 不属于指定用户",
|
||||
)
|
||||
|
||||
# 5. 验证 WebAuthn 签名
|
||||
try:
|
||||
if username:
|
||||
# Username-based mode
|
||||
new_sign_count = self.webauthn_service.verify_authentication(
|
||||
username, credential, passkey
|
||||
)
|
||||
else:
|
||||
# Discoverable credentials mode
|
||||
new_sign_count = (
|
||||
self.webauthn_service.verify_discoverable_authentication(
|
||||
credential, passkey
|
||||
)
|
||||
)
|
||||
|
||||
# 6. 更新使用记录
|
||||
await passkey_db.update_passkey_usage(passkey, new_sign_count)
|
||||
|
||||
return ResponseModel(
|
||||
@@ -93,6 +123,7 @@ class PasskeyAuthStrategy(AuthStrategy):
|
||||
status=True,
|
||||
msg_en="Login successfully with passkey",
|
||||
msg_zh="通过 Passkey 登录成功",
|
||||
data={"username": user.username},
|
||||
)
|
||||
|
||||
except ValueError as e:
|
||||
|
||||
@@ -81,7 +81,7 @@ class WebAuthnService:
|
||||
user_display_name=username,
|
||||
exclude_credentials=exclude_credentials if exclude_credentials else None,
|
||||
authenticator_selection=AuthenticatorSelectionCriteria(
|
||||
resident_key=ResidentKeyRequirement.PREFERRED,
|
||||
resident_key=ResidentKeyRequirement.REQUIRED, # Required for usernameless login
|
||||
user_verification=UserVerificationRequirement.PREFERRED,
|
||||
),
|
||||
supported_pub_key_algs=[
|
||||
@@ -191,6 +191,26 @@ class WebAuthnService:
|
||||
|
||||
return json.loads(options_to_json(options))
|
||||
|
||||
def generate_discoverable_authentication_options(self) -> dict:
|
||||
"""
|
||||
生成可发现凭证的认证选项(无需用户名)
|
||||
|
||||
Returns:
|
||||
JSON-serializable authentication options without allowCredentials
|
||||
"""
|
||||
options = generate_authentication_options(
|
||||
rp_id=self.rp_id,
|
||||
allow_credentials=None, # Empty = discoverable credentials mode
|
||||
user_verification=UserVerificationRequirement.PREFERRED,
|
||||
)
|
||||
|
||||
# Store challenge with a unique key for discoverable auth
|
||||
challenge_key = f"auth_discoverable_{self.base64url_encode(options.challenge)[:16]}"
|
||||
self._challenges[challenge_key] = options.challenge
|
||||
logger.debug("Generated discoverable authentication challenge")
|
||||
|
||||
return json.loads(options_to_json(options))
|
||||
|
||||
def verify_authentication(
|
||||
self, username: str, credential: dict, passkey: Passkey
|
||||
) -> int:
|
||||
@@ -236,6 +256,56 @@ class WebAuthnService:
|
||||
# 清理 challenge(无论成功或失败都清理,防止重放攻击)
|
||||
self._challenges.pop(challenge_key, None)
|
||||
|
||||
def verify_discoverable_authentication(
|
||||
self, credential: dict, passkey: Passkey
|
||||
) -> int:
|
||||
"""
|
||||
验证可发现凭证的认证响应(无需用户名)
|
||||
|
||||
Args:
|
||||
credential: 来自前端的 credential 响应
|
||||
passkey: 通过 credential_id 查找到的 Passkey 对象
|
||||
|
||||
Returns:
|
||||
新的 sign_count
|
||||
|
||||
Raises:
|
||||
ValueError: 验证失败
|
||||
"""
|
||||
# Find the challenge by checking all discoverable challenges
|
||||
expected_challenge = None
|
||||
challenge_key = None
|
||||
for key, challenge in list(self._challenges.items()):
|
||||
if key.startswith("auth_discoverable_"):
|
||||
expected_challenge = challenge
|
||||
challenge_key = key
|
||||
break
|
||||
|
||||
if not expected_challenge:
|
||||
raise ValueError("Challenge not found or expired")
|
||||
|
||||
try:
|
||||
credential_public_key = base64.b64decode(passkey.public_key)
|
||||
|
||||
verification = verify_authentication_response(
|
||||
credential=credential,
|
||||
expected_challenge=expected_challenge,
|
||||
expected_rp_id=self.rp_id,
|
||||
expected_origin=self.origin,
|
||||
credential_public_key=credential_public_key,
|
||||
credential_current_sign_count=passkey.sign_count,
|
||||
)
|
||||
|
||||
logger.info("Successfully verified discoverable authentication")
|
||||
return verification.new_sign_count
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Discoverable authentication verification failed: {e}")
|
||||
raise ValueError(f"Invalid authentication response: {str(e)}")
|
||||
finally:
|
||||
if challenge_key:
|
||||
self._challenges.pop(challenge_key, None)
|
||||
|
||||
# ============ 辅助方法 ============
|
||||
|
||||
def _parse_transports(
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import { createSharedComposable } from '@vueuse/core';
|
||||
import { apiPasskey } from '@/api/passkey';
|
||||
import {
|
||||
isWebAuthnSupported,
|
||||
registerPasskey,
|
||||
loginWithPasskey as webauthnLogin,
|
||||
isWebAuthnSupported,
|
||||
} from '@/services/webauthn';
|
||||
import type { PasskeyItem } from '#/passkey';
|
||||
|
||||
@@ -56,7 +56,7 @@ export const usePasskey = createSharedComposable(() => {
|
||||
}
|
||||
|
||||
// 使用 Passkey 登录
|
||||
async function loginWithPasskey(username: string): Promise<boolean> {
|
||||
async function loginWithPasskey(username?: string): Promise<boolean> {
|
||||
try {
|
||||
await webauthnLogin(username);
|
||||
isLoggedIn.value = true;
|
||||
|
||||
@@ -108,17 +108,20 @@ export async function registerPasskey(deviceName: string): Promise<void> {
|
||||
|
||||
/**
|
||||
* 使用 Passkey 登录
|
||||
* @param username 用户名
|
||||
* @param username 用户名(可选,不提供时使用可发现凭证模式)
|
||||
*/
|
||||
export async function loginWithPasskey(username: string): Promise<void> {
|
||||
export async function loginWithPasskey(username?: string): Promise<void> {
|
||||
// 1. 获取认证选项
|
||||
const options = await apiPasskey.getLoginOptions({ username });
|
||||
const options = await apiPasskey.getLoginOptions(
|
||||
username ? { username } : {}
|
||||
);
|
||||
|
||||
// 2. 转换选项
|
||||
const getOptions: PublicKeyCredentialRequestOptions = {
|
||||
challenge: base64UrlToBuffer(options.challenge),
|
||||
timeout: options.timeout || 60000,
|
||||
rpId: options.rpId,
|
||||
// allowCredentials is undefined for discoverable credentials mode
|
||||
allowCredentials: options.allowCredentials?.map((cred) => ({
|
||||
type: cred.type as PublicKeyCredentialType,
|
||||
id: base64UrlToBuffer(cred.id),
|
||||
@@ -154,7 +157,7 @@ export async function loginWithPasskey(username: string): Promise<void> {
|
||||
|
||||
// 5. 提交到后端验证并登录
|
||||
await apiPasskey.loginWithPasskey({
|
||||
username,
|
||||
...(username && { username }),
|
||||
credential: assertionResponse,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -60,11 +60,11 @@ export interface PasskeyDeleteRequest {
|
||||
|
||||
// 认证开始请求
|
||||
export interface PasskeyAuthStartRequest {
|
||||
username: string;
|
||||
username?: string; // Optional for discoverable credentials
|
||||
}
|
||||
|
||||
// 认证完成请求
|
||||
export interface PasskeyAuthFinishRequest {
|
||||
username: string;
|
||||
username?: string; // Optional for discoverable credentials
|
||||
credential: unknown;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user