fix warnings

This commit is contained in:
jxxghp
2025-12-24 18:54:38 +08:00
parent a70bf18770
commit 0f8ceb0fac
3 changed files with 28 additions and 23 deletions

View File

@@ -3,8 +3,9 @@ MFA (Multi-Factor Authentication) API 端点
包含 OTP 和 PassKey 相关功能
"""
from datetime import timedelta
from typing import Any, Annotated, Optional, List, Union
from typing import Any, Annotated, Optional
from app.helper.sites import SitesHelper
from fastapi import APIRouter, Depends, HTTPException, Body
from sqlalchemy.ext.asyncio import AsyncSession
@@ -14,10 +15,9 @@ from app.core.config import settings
from app.db import get_async_db
from app.db.models.passkey import PassKey
from app.db.models.user import User
from app.db.user_oper import get_current_active_user, get_current_active_user_async
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_user, get_current_active_user_async
from app.helper.passkey import PassKeyHelper
from app.helper.sites import SitesHelper
from app.log import logger
from app.schemas.types import SystemConfigKey
from app.utils.otp import OtpUtils
@@ -134,7 +134,6 @@ class PassKeyAuthenticationFinish(schemas.BaseModel):
@router.post("/passkey/register/start", summary="开始注册 PassKey", response_model=schemas.Response)
def passkey_register_start(
passkey_req: PassKeyRegistrationStart,
current_user: Annotated[User, Depends(get_current_active_user)]
) -> Any:
"""开始注册 PassKey - 生成注册选项"""
@@ -351,7 +350,7 @@ def passkey_list(
try:
passkeys = PassKey.get_by_user_id(db=None, user_id=current_user.id)
passkey_list = [
key_list = [
{
'id': pk.id,
'name': pk.name,
@@ -365,7 +364,7 @@ def passkey_list(
return schemas.Response(
success=True,
data=passkey_list
data=key_list
)
except Exception as e:
logger.error(f"获取PassKey列表失败: {e}")

View File

@@ -91,8 +91,8 @@ class DiscordModule(_ModuleBase, _MessageBase[Discord]):
return None
try:
msg_json: dict = json.loads(body)
except Exception as err:
logger.debug(f"解析 Discord 消息失败:{str(err)}")
except Exception as e:
logger.debug(f"解析 Discord 消息失败:{str(e)}")
return None
if not msg_json:

View File

@@ -56,7 +56,7 @@ class Discord:
def _to_int(val: Optional[Union[str, int]]) -> Optional[int]:
try:
return int(val) if val is not None and str(val).strip() else None
except Exception:
except ValueError:
return None
def _register_events(self):
@@ -96,8 +96,8 @@ class Discord:
return
try:
await interaction.response.defer(ephemeral=True)
except Exception:
pass
except Exception as e:
logger.error(f"处理 Discord 交互响应失败:{e}")
username = (interaction.user.display_name or interaction.user.global_name or interaction.user.name) \
if interaction.user else None
@@ -126,8 +126,8 @@ class Discord:
finally:
try:
self._loop.run_until_complete(self._client.close())
except Exception:
pass
except Exception as err:
logger.debug(f"Discord Bot 关闭失败:{err}")
self._thread = threading.Thread(target=runner, daemon=True)
self._thread.start()
@@ -142,8 +142,8 @@ class Discord:
finally:
try:
self._loop.call_soon_threadsafe(self._loop.stop)
except Exception:
pass
except Exception as err:
logger.error(f"停止 Discord 事件循环失败:{err}")
self._ready_event.clear()
def get_state(self) -> bool:
@@ -313,7 +313,8 @@ class Discord:
logger.error(f"删除 Discord 消息失败:{err}")
return False
def _build_embed(self, title: str, text: Optional[str], image: Optional[str],
@staticmethod
def _build_embed(title: str, text: Optional[str], image: Optional[str],
link: Optional[str]) -> discord.Embed:
description = ""
fields: List[Dict[str, str]] = []
@@ -337,7 +338,8 @@ class Discord:
embed.set_image(url=image)
return embed
def _build_media_embeds(self, medias: List[MediaInfo], title: str) -> List[discord.Embed]:
@staticmethod
def _build_media_embeds(medias: List[MediaInfo], title: str) -> List[discord.Embed]:
embeds: List[discord.Embed] = []
for index, media in enumerate(medias[:10], start=1):
overview = media.get_overview_string(80)
@@ -358,7 +360,8 @@ class Discord:
embeds[0].set_author(name=title)
return embeds
def _build_torrent_embeds(self, torrents: List[Context], title: str) -> List[discord.Embed]:
@staticmethod
def _build_torrent_embeds(torrents: List[Context], title: str) -> List[discord.Embed]:
embeds: List[discord.Embed] = []
for index, context in enumerate(torrents[:10], start=1):
torrent = context.torrent_info
@@ -384,7 +387,8 @@ class Discord:
embeds[0].set_author(name=title)
return embeds
def _build_default_buttons(self, count: int) -> List[List[dict]]:
@staticmethod
def _build_default_buttons(count: int) -> List[List[dict]]:
buttons: List[List[dict]] = []
max_rows = 5
max_per_row = 5
@@ -398,7 +402,8 @@ class Discord:
logger.warn(f"按钮数量超过 Discord 限制,仅展示前 {capped}")
return buttons
def _build_view(self, buttons: Optional[List[List[dict]]], link: Optional[str] = None) -> Optional[discord.ui.View]:
@staticmethod
def _build_view(buttons: Optional[List[List[dict]]], link: Optional[str] = None) -> Optional[discord.ui.View]:
has_buttons = buttons and any(buttons)
if not has_buttons and not link:
return None
@@ -429,8 +434,8 @@ class Discord:
return channel
try:
return await self._client.fetch_channel(int(chat_id))
except Exception:
pass
except Exception as err:
logger.warn(f"通过 chat_id 获取 Discord 频道失败:{err}")
# 私聊
if userid:
@@ -446,7 +451,8 @@ class Discord:
if not channel:
try:
channel = await self._client.fetch_channel(self._channel_id)
except Exception:
except Exception as err:
logger.warn(f"通过配置的频道ID获取 Discord 频道失败:{err}")
channel = None
self._broadcast_channel = channel
if channel: