Files

196 lines
6.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import re
from typing import Annotated, Any, List, Union
from fastapi import APIRouter, Body, Depends, HTTPException, UploadFile, File
from sqlalchemy.ext.asyncio import AsyncSession
from app import schemas
from app.core.security import get_password_hash
from app.db import get_async_db
from app.db.models.user import User
from app.db.user_oper import get_current_active_superuser_async, \
get_current_active_user_async, get_current_active_user
from app.db.userconfig_oper import UserConfigOper
from app.utils.otp import OtpUtils
router = APIRouter()
@router.get("/", summary="所有用户", response_model=List[schemas.User])
async def list_users(
db: AsyncSession = Depends(get_async_db),
current_user: User = Depends(get_current_active_superuser_async),
) -> Any:
"""
查询用户列表
"""
return await current_user.async_list(db)
@router.post("/", summary="新增用户", response_model=schemas.Response)
async def create_user(
*,
db: AsyncSession = Depends(get_async_db),
user_in: schemas.UserCreate,
current_user: User = Depends(get_current_active_superuser_async),
) -> Any:
"""
新增用户
"""
user = await current_user.async_get_by_name(db, name=user_in.name)
if user:
return schemas.Response(success=False, message="用户已存在")
user_info = user_in.model_dump()
if user_info.get("password"):
user_info["hashed_password"] = get_password_hash(user_info["password"])
user_info.pop("password")
user = await User(**user_info).async_create(db)
return schemas.Response(success=True if user else False)
@router.put("/", summary="更新用户", response_model=schemas.Response)
async def update_user(
*,
db: AsyncSession = Depends(get_async_db),
user_in: schemas.UserUpdate,
current_user: User = Depends(get_current_active_superuser_async),
) -> Any:
"""
更新用户
"""
user_info = user_in.model_dump()
if user_info.get("password"):
# 正则表达式匹配密码包含字母、数字、特殊字符中的至少两项
pattern = r'^(?![a-zA-Z]+$)(?!\d+$)(?![^\da-zA-Z\s]+$).{6,50}$'
if not re.match(pattern, user_info.get("password")):
return schemas.Response(success=False,
message="密码需要同时包含字母、数字、特殊字符中的至少两项且长度大于6位")
user_info["hashed_password"] = get_password_hash(user_info["password"])
user_info.pop("password")
user = await current_user.async_get_by_id(db, user_id=user_info["id"])
user_name = user_info.get("name")
if not user_name:
return schemas.Response(success=False, message="用户名不能为空")
# 新用户名去重
users = await current_user.async_list(db)
for u in users:
if u.name == user_name and u.id != user_info["id"]:
return schemas.Response(success=False, message="用户名已被使用")
if not user:
return schemas.Response(success=False, message="用户不存在")
await user.async_update(db, user_info)
return schemas.Response(success=True)
@router.get("/current", summary="当前登录用户信息", response_model=schemas.User)
async def read_current_user(
current_user: User = Depends(get_current_active_user_async)
) -> Any:
"""
当前登录用户信息
"""
return current_user
@router.post("/avatar/{user_id}", summary="上传用户头像", response_model=schemas.Response)
async def upload_avatar(user_id: int, db: AsyncSession = Depends(get_async_db), file: UploadFile = File(...),
_: User = Depends(get_current_active_user_async)):
"""
上传用户头像
"""
# 将文件转换为Base64
file_base64 = base64.b64encode(file.file.read())
# 更新到用户表
user = await User.async_get(db, user_id)
if not user:
return schemas.Response(success=False, message="用户不存在")
await user.async_update(db, {
"avatar": f"data:image/ico;base64,{file_base64}"
})
return schemas.Response(success=True, message=file.filename)
@router.get("/config/{key}", summary="查询用户配置", response_model=schemas.Response)
def get_config(key: str,
current_user: User = Depends(get_current_active_user)):
"""
查询用户配置
"""
value = UserConfigOper().get(username=current_user.name, key=key)
return schemas.Response(success=True, data={
"value": value
})
@router.post("/config/{key}", summary="更新用户配置", response_model=schemas.Response)
def set_config(
key: str,
value: Annotated[Union[list, dict, bool, int, str] | None, Body()] = None,
current_user: User = Depends(get_current_active_user),
):
"""
更新用户配置
"""
UserConfigOper().set(username=current_user.name, key=key, value=value)
return schemas.Response(success=True)
@router.delete("/id/{user_id}", summary="删除用户", response_model=schemas.Response)
async def delete_user_by_id(
*,
db: AsyncSession = Depends(get_async_db),
user_id: int,
current_user: User = Depends(get_current_active_superuser_async),
) -> Any:
"""
通过唯一ID删除用户
"""
user = await current_user.async_get_by_id(db, user_id=user_id)
if not user:
return schemas.Response(success=False, message="用户不存在")
await current_user.async_delete(db, user_id)
return schemas.Response(success=True)
@router.delete("/name/{user_name}", summary="删除用户", response_model=schemas.Response)
async def delete_user_by_name(
*,
db: AsyncSession = Depends(get_async_db),
user_name: str,
current_user: User = Depends(get_current_active_superuser_async),
) -> Any:
"""
通过用户名删除用户
"""
user = await current_user.async_get_by_name(db, name=user_name)
if not user:
return schemas.Response(success=False, message="用户不存在")
await current_user.async_delete(db, user.id)
return schemas.Response(success=True)
@router.get("/{username}", summary="用户详情", response_model=schemas.User)
async def read_user_by_name(
username: str,
current_user: User = Depends(get_current_active_user_async),
db: AsyncSession = Depends(get_async_db),
) -> Any:
"""
查询用户详情
"""
user = await current_user.async_get_by_name(db, name=username)
if not user:
raise HTTPException(
status_code=404,
detail="用户不存在",
)
if user == current_user:
return user
if not current_user.is_superuser:
raise HTTPException(
status_code=400,
detail="用户权限不足"
)
return user