add aiopath

This commit is contained in:
jxxghp
2025-07-30 19:49:59 +08:00
parent 49647e3bb5
commit c8749b3c9c
4 changed files with 65 additions and 21 deletions

View File

@@ -3,6 +3,8 @@ from pathlib import Path
from typing import List, Optional, Set, Union
from urllib.parse import quote, urlparse
from aiopath import AsyncPath
from app.log import logger
@@ -39,6 +41,37 @@ class SecurityUtils:
logger.debug(f"Error occurred while validating paths: {e}")
return False
@staticmethod
async def async_is_safe_path(base_path: AsyncPath, user_path: AsyncPath,
allowed_suffixes: Optional[Union[Set[str], List[str]]] = None) -> bool:
"""
异步验证用户提供的路径是否在基准目录内,并检查文件类型是否合法,防止目录遍历攻击
:param base_path: 基准目录,允许访问的根目录
:param user_path: 用户提供的路径,需检查其是否位于基准目录内
:param allowed_suffixes: 允许的文件后缀名集合,用于验证文件类型
:return: 如果用户路径安全且位于基准目录内,且文件类型合法,返回 True否则返回 False
:raises Exception: 如果解析路径时发生错误,则捕获并记录异常
"""
try:
# resolve() 将相对路径转换为绝对路径,并处理符号链接和'..'
base_path_resolved = await base_path.resolve()
user_path_resolved = await user_path.resolve()
# 检查用户路径是否在基准目录或基准目录的子目录内
if base_path_resolved != user_path_resolved and base_path_resolved not in user_path_resolved.parents:
return False
if allowed_suffixes is not None:
allowed_suffixes = set(allowed_suffixes)
if user_path.suffix.lower() not in allowed_suffixes:
return False
return True
except Exception as e:
logger.debug(f"Error occurred while validating paths: {e}")
return False
@staticmethod
def is_safe_url(url: str, allowed_domains: Union[Set[str], List[str]], strict: bool = False) -> bool:
"""