feat(security): add safe path check for log file access and validation

This commit is contained in:
InfinityPacer
2024-10-13 21:59:22 +08:00
parent f67ee27618
commit f9e06e4381
2 changed files with 47 additions and 4 deletions

View File

@@ -1,18 +1,18 @@
import json
import time
from datetime import datetime
from typing import Union, Any
from typing import Any, Union
import tailer
from fastapi import APIRouter, Depends, Response
from fastapi import APIRouter, Depends, HTTPException, Response
from fastapi.responses import StreamingResponse
from app import schemas
from app.chain.search import SearchChain
from app.chain.system import SystemChain
from app.core.config import settings, global_vars
from app.core.config import global_vars, settings
from app.core.module import ModuleManager
from app.core.security import verify_token, verify_apitoken, verify_resource_token
from app.core.security import verify_apitoken, verify_resource_token, verify_token
from app.db.models import User
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_superuser
@@ -24,6 +24,7 @@ from app.monitor import Monitor
from app.scheduler import Scheduler
from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils
from app.utils.security import SecurityUtils
from app.utils.system import SystemUtils
from version import APP_VERSION
@@ -214,6 +215,12 @@ def get_logging(length: int = 50, logfile: str = "moviepilot.log",
"""
log_path = settings.LOG_PATH / logfile
if not SecurityUtils.is_safe_path(settings.LOG_PATH, log_path, allowed_suffixes={".log"}):
raise HTTPException(status_code=404, detail="Not Found")
if not log_path.exists() or not log_path.is_file():
raise HTTPException(status_code=404, detail="Not Found")
def log_generator():
# 读取文件末尾50行不使用tailer模块
with open(log_path, 'r', encoding='utf-8') as f:

36
app/utils/security.py Normal file
View File

@@ -0,0 +1,36 @@
from pathlib import Path
from typing import Optional, Set
class SecurityUtils:
@staticmethod
def is_safe_path(base_path: Path, user_path: Path, allowed_suffixes: Optional[Set[str]] = None) -> bool:
"""
验证用户提供的路径是否在基准目录内,并检查文件类型是否合法,防止目录遍历攻击
:param base_path: 基准目录,允许访问的根目录
:param user_path: 用户提供的路径,需检查其是否位于基准目录内
:param allowed_suffixes: 允许的文件后缀名集合,用于验证文件类型
:return: 如果用户路径安全且位于基准目录内,且文件类型合法,返回 True否则返回 False
:raises Exception: 如果解析路径时发生错误,则捕获并记录异常
"""
try:
# resolve() 将相对路径转换为绝对路径,并处理符号链接和'..'
base_path_resolved = base_path.resolve()
user_path_resolved = user_path.resolve()
# 检查用户路径是否在基准目录或基准目录的子目录内
if base_path_resolved != user_path_resolved and base_path_resolved not in user_path_resolved.parents:
return False
# 如果指定了 allowed_suffixes进一步检查文件后缀
if allowed_suffixes and user_path.is_file() and user_path.suffix not in allowed_suffixes:
return False
# 所有检查通过
return True
except Exception as e:
# 捕获并记录路径解析时的异常
print(f"Error occurred while resolving paths: {e}")
return False