mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-02-03 10:35:15 +08:00
196 lines
6.7 KiB
Python
196 lines
6.7 KiB
Python
import base64
|
||
import re
|
||
from typing import Annotated, Any, List, Union
|
||
|
||
from fastapi import APIRouter, Body, Depends, HTTPException, UploadFile, File
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app import schemas
|
||
from app.core.security import get_password_hash
|
||
from app.db import get_async_db
|
||
from app.db.models.user import User
|
||
from app.db.user_oper import get_current_active_superuser_async, \
|
||
get_current_active_user_async, get_current_active_user
|
||
from app.db.userconfig_oper import UserConfigOper
|
||
from app.utils.otp import OtpUtils
|
||
|
||
router = APIRouter()
|
||
|
||
|
||
@router.get("/", summary="所有用户", response_model=List[schemas.User])
|
||
async def list_users(
|
||
db: AsyncSession = Depends(get_async_db),
|
||
current_user: User = Depends(get_current_active_superuser_async),
|
||
) -> Any:
|
||
"""
|
||
查询用户列表
|
||
"""
|
||
return await current_user.async_list(db)
|
||
|
||
|
||
@router.post("/", summary="新增用户", response_model=schemas.Response)
|
||
async def create_user(
|
||
*,
|
||
db: AsyncSession = Depends(get_async_db),
|
||
user_in: schemas.UserCreate,
|
||
current_user: User = Depends(get_current_active_superuser_async),
|
||
) -> Any:
|
||
"""
|
||
新增用户
|
||
"""
|
||
user = await current_user.async_get_by_name(db, name=user_in.name)
|
||
if user:
|
||
return schemas.Response(success=False, message="用户已存在")
|
||
user_info = user_in.model_dump()
|
||
if user_info.get("password"):
|
||
user_info["hashed_password"] = get_password_hash(user_info["password"])
|
||
user_info.pop("password")
|
||
user = await User(**user_info).async_create(db)
|
||
return schemas.Response(success=True if user else False)
|
||
|
||
|
||
@router.put("/", summary="更新用户", response_model=schemas.Response)
|
||
async def update_user(
|
||
*,
|
||
db: AsyncSession = Depends(get_async_db),
|
||
user_in: schemas.UserUpdate,
|
||
current_user: User = Depends(get_current_active_superuser_async),
|
||
) -> Any:
|
||
"""
|
||
更新用户
|
||
"""
|
||
user_info = user_in.model_dump()
|
||
if user_info.get("password"):
|
||
# 正则表达式匹配密码包含字母、数字、特殊字符中的至少两项
|
||
pattern = r'^(?![a-zA-Z]+$)(?!\d+$)(?![^\da-zA-Z\s]+$).{6,50}$'
|
||
if not re.match(pattern, user_info.get("password")):
|
||
return schemas.Response(success=False,
|
||
message="密码需要同时包含字母、数字、特殊字符中的至少两项,且长度大于6位")
|
||
user_info["hashed_password"] = get_password_hash(user_info["password"])
|
||
user_info.pop("password")
|
||
user = await current_user.async_get_by_id(db, user_id=user_info["id"])
|
||
user_name = user_info.get("name")
|
||
if not user_name:
|
||
return schemas.Response(success=False, message="用户名不能为空")
|
||
# 新用户名去重
|
||
users = await current_user.async_list(db)
|
||
for u in users:
|
||
if u.name == user_name and u.id != user_info["id"]:
|
||
return schemas.Response(success=False, message="用户名已被使用")
|
||
if not user:
|
||
return schemas.Response(success=False, message="用户不存在")
|
||
await user.async_update(db, user_info)
|
||
return schemas.Response(success=True)
|
||
|
||
|
||
@router.get("/current", summary="当前登录用户信息", response_model=schemas.User)
|
||
async def read_current_user(
|
||
current_user: User = Depends(get_current_active_user_async)
|
||
) -> Any:
|
||
"""
|
||
当前登录用户信息
|
||
"""
|
||
return current_user
|
||
|
||
|
||
@router.post("/avatar/{user_id}", summary="上传用户头像", response_model=schemas.Response)
|
||
async def upload_avatar(user_id: int, db: AsyncSession = Depends(get_async_db), file: UploadFile = File(...),
|
||
_: User = Depends(get_current_active_user_async)):
|
||
"""
|
||
上传用户头像
|
||
"""
|
||
# 将文件转换为Base64
|
||
file_base64 = base64.b64encode(file.file.read())
|
||
# 更新到用户表
|
||
user = await User.async_get(db, user_id)
|
||
if not user:
|
||
return schemas.Response(success=False, message="用户不存在")
|
||
await user.async_update(db, {
|
||
"avatar": f"data:image/ico;base64,{file_base64}"
|
||
})
|
||
return schemas.Response(success=True, message=file.filename)
|
||
|
||
|
||
@router.get("/config/{key}", summary="查询用户配置", response_model=schemas.Response)
|
||
def get_config(key: str,
|
||
current_user: User = Depends(get_current_active_user)):
|
||
"""
|
||
查询用户配置
|
||
"""
|
||
value = UserConfigOper().get(username=current_user.name, key=key)
|
||
return schemas.Response(success=True, data={
|
||
"value": value
|
||
})
|
||
|
||
|
||
@router.post("/config/{key}", summary="更新用户配置", response_model=schemas.Response)
|
||
def set_config(
|
||
key: str,
|
||
value: Annotated[Union[list, dict, bool, int, str] | None, Body()] = None,
|
||
current_user: User = Depends(get_current_active_user),
|
||
):
|
||
"""
|
||
更新用户配置
|
||
"""
|
||
UserConfigOper().set(username=current_user.name, key=key, value=value)
|
||
return schemas.Response(success=True)
|
||
|
||
|
||
@router.delete("/id/{user_id}", summary="删除用户", response_model=schemas.Response)
|
||
async def delete_user_by_id(
|
||
*,
|
||
db: AsyncSession = Depends(get_async_db),
|
||
user_id: int,
|
||
current_user: User = Depends(get_current_active_superuser_async),
|
||
) -> Any:
|
||
"""
|
||
通过唯一ID删除用户
|
||
"""
|
||
user = await current_user.async_get_by_id(db, user_id=user_id)
|
||
if not user:
|
||
return schemas.Response(success=False, message="用户不存在")
|
||
await current_user.async_delete(db, user_id)
|
||
return schemas.Response(success=True)
|
||
|
||
|
||
@router.delete("/name/{user_name}", summary="删除用户", response_model=schemas.Response)
|
||
async def delete_user_by_name(
|
||
*,
|
||
db: AsyncSession = Depends(get_async_db),
|
||
user_name: str,
|
||
current_user: User = Depends(get_current_active_superuser_async),
|
||
) -> Any:
|
||
"""
|
||
通过用户名删除用户
|
||
"""
|
||
user = await current_user.async_get_by_name(db, name=user_name)
|
||
if not user:
|
||
return schemas.Response(success=False, message="用户不存在")
|
||
await current_user.async_delete(db, user.id)
|
||
return schemas.Response(success=True)
|
||
|
||
|
||
@router.get("/{username}", summary="用户详情", response_model=schemas.User)
|
||
async def read_user_by_name(
|
||
username: str,
|
||
current_user: User = Depends(get_current_active_user_async),
|
||
db: AsyncSession = Depends(get_async_db),
|
||
) -> Any:
|
||
"""
|
||
查询用户详情
|
||
"""
|
||
user = await current_user.async_get_by_name(db, name=username)
|
||
if not user:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail="用户不存在",
|
||
)
|
||
if user == current_user:
|
||
return user
|
||
if not current_user.is_superuser:
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail="用户权限不足"
|
||
)
|
||
return user
|