mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-06-29 00:06:27 +08:00
89 lines
2.8 KiB
Python
89 lines
2.8 KiB
Python
"""Agent 命令工具的安全校验逻辑。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import os.path
|
||
import re
|
||
import shlex
|
||
|
||
|
||
COMMAND_FORBIDDEN_KEYWORDS = (
|
||
":(){ :|:& };:",
|
||
"dd if=/dev/zero",
|
||
"mkfs",
|
||
"reboot",
|
||
"shutdown",
|
||
)
|
||
|
||
COMMAND_DANGEROUS_PATTERNS = (
|
||
re.compile(r"\brm\s+[^;&|]*-[^\s;&|]*[rR][fF]?[^\s;&|]*\s+/(?:\s|$|[;&|])"),
|
||
re.compile(r"\bdd\s+[^;&|]*(?:of=/dev/(?:sd[a-z]\d*|nvme\d+n\d+p?\d*|disk\d+)|if=/dev/zero)"),
|
||
re.compile(r"\b(?:mkfs|fdisk|parted|diskutil)\b"),
|
||
re.compile(r"\b(?:chmod|chown)\s+[^;&|]*-R[^;&|]*\s+/(?:\s|$|[;&|])"),
|
||
re.compile(r"\b(?:reboot|shutdown|halt|poweroff)\b"),
|
||
)
|
||
|
||
|
||
def _command_tokens(command: str) -> list[str]:
|
||
"""尽力解析 shell 命令 token,解析失败时退回空白分割。"""
|
||
try:
|
||
return shlex.split(command, posix=True)
|
||
except ValueError:
|
||
return re.split(r"\s+", command.strip())
|
||
|
||
|
||
def _contains_recursive_root_delete(command: str) -> bool:
|
||
"""识别递归删除根目录或一级目录的 rm 命令。"""
|
||
tokens = _command_tokens(command)
|
||
if not any(token == "rm" or token.endswith("/rm") for token in tokens):
|
||
return False
|
||
has_recursive = any(
|
||
token.startswith("-") and ("r" in token or "R" in token)
|
||
for token in tokens
|
||
)
|
||
if not has_recursive:
|
||
return False
|
||
|
||
for token in tokens:
|
||
clean_token = re.match(r"^([^;|&><]+)", token)
|
||
if not clean_token:
|
||
continue
|
||
path_value = clean_token.group(1).strip("\"'")
|
||
if not path_value.startswith("/"):
|
||
continue
|
||
norm_path = os.path.normpath(path_value)
|
||
if norm_path == "/" or re.match(r"^/[^/]+$", norm_path):
|
||
return True
|
||
return False
|
||
|
||
|
||
def detect_dangerous_command(command: str) -> str:
|
||
"""返回危险命令原因,安全时返回空字符串。"""
|
||
normalized = str(command or "").strip()
|
||
if not normalized:
|
||
return "命令不能为空"
|
||
for keyword in COMMAND_FORBIDDEN_KEYWORDS:
|
||
if keyword in normalized:
|
||
return f"命令包含禁止使用的关键字 '{keyword}'"
|
||
if _contains_recursive_root_delete(normalized):
|
||
return "命令疑似递归删除根目录或一级目录"
|
||
for pattern in COMMAND_DANGEROUS_PATTERNS:
|
||
if pattern.search(normalized):
|
||
return "命令匹配高危系统操作模式"
|
||
return ""
|
||
|
||
|
||
def validate_command_safety(command: str, *, confirmed: bool = False) -> None:
|
||
"""
|
||
校验 shell 命令安全性。
|
||
|
||
:param command: 待执行命令
|
||
:param confirmed: 是否已经通过显式参数确认高危操作
|
||
"""
|
||
reason = detect_dangerous_command(command)
|
||
if not reason:
|
||
return
|
||
if confirmed and reason != "命令不能为空":
|
||
return
|
||
raise ValueError(f"{reason}。如确认需要执行,请设置 confirm_dangerous=true")
|