diff --git a/app/api/endpoints/system.py b/app/api/endpoints/system.py index cfb439b5..7f81cb24 100644 --- a/app/api/endpoints/system.py +++ b/app/api/endpoints/system.py @@ -1,18 +1,18 @@ import json import time from datetime import datetime -from typing import Union, Any +from typing import Any, Union import tailer -from fastapi import APIRouter, Depends, Response +from fastapi import APIRouter, Depends, HTTPException, Response from fastapi.responses import StreamingResponse from app import schemas from app.chain.search import SearchChain from app.chain.system import SystemChain -from app.core.config import settings, global_vars +from app.core.config import global_vars, settings from app.core.module import ModuleManager -from app.core.security import verify_token, verify_apitoken, verify_resource_token +from app.core.security import verify_apitoken, verify_resource_token, verify_token from app.db.models import User from app.db.systemconfig_oper import SystemConfigOper from app.db.user_oper import get_current_active_superuser @@ -24,6 +24,7 @@ from app.monitor import Monitor from app.scheduler import Scheduler from app.schemas.types import SystemConfigKey from app.utils.http import RequestUtils +from app.utils.security import SecurityUtils from app.utils.system import SystemUtils from version import APP_VERSION @@ -214,6 +215,12 @@ def get_logging(length: int = 50, logfile: str = "moviepilot.log", """ log_path = settings.LOG_PATH / logfile + if not SecurityUtils.is_safe_path(settings.LOG_PATH, log_path, allowed_suffixes={".log"}): + raise HTTPException(status_code=404, detail="Not Found") + + if not log_path.exists() or not log_path.is_file(): + raise HTTPException(status_code=404, detail="Not Found") + def log_generator(): # 读取文件末尾50行,不使用tailer模块 with open(log_path, 'r', encoding='utf-8') as f: diff --git a/app/utils/security.py b/app/utils/security.py new file mode 100644 index 00000000..4b10df09 --- /dev/null +++ b/app/utils/security.py @@ -0,0 +1,36 @@ +from pathlib import Path +from typing import Optional, Set + + +class SecurityUtils: + + @staticmethod + def is_safe_path(base_path: Path, user_path: Path, allowed_suffixes: Optional[Set[str]] = None) -> bool: + """ + 验证用户提供的路径是否在基准目录内,并检查文件类型是否合法,防止目录遍历攻击 + + :param base_path: 基准目录,允许访问的根目录 + :param user_path: 用户提供的路径,需检查其是否位于基准目录内 + :param allowed_suffixes: 允许的文件后缀名集合,用于验证文件类型 + :return: 如果用户路径安全且位于基准目录内,且文件类型合法,返回 True;否则返回 False + :raises Exception: 如果解析路径时发生错误,则捕获并记录异常 + """ + try: + # resolve() 将相对路径转换为绝对路径,并处理符号链接和'..' + base_path_resolved = base_path.resolve() + user_path_resolved = user_path.resolve() + + # 检查用户路径是否在基准目录或基准目录的子目录内 + if base_path_resolved != user_path_resolved and base_path_resolved not in user_path_resolved.parents: + return False + + # 如果指定了 allowed_suffixes,进一步检查文件后缀 + if allowed_suffixes and user_path.is_file() and user_path.suffix not in allowed_suffixes: + return False + + # 所有检查通过 + return True + except Exception as e: + # 捕获并记录路径解析时的异常 + print(f"Error occurred while resolving paths: {e}") + return False