Files
MoviePilot/app/utils/system.py
2026-01-12 21:36:17 +08:00

702 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import datetime
import hashlib
import os
import platform
import re
import shutil
import subprocess
import sys
import uuid
from pathlib import Path
from typing import List, Optional, Tuple, Union
import psutil
from app import schemas
class SystemUtils:
"""
系统工具类,提供系统相关的操作和信息获取方法。
"""
@staticmethod
def execute(cmd: str) -> str:
"""
执行命令,获得返回结果
"""
try:
with os.popen(cmd) as p:
return p.readline().strip()
except Exception as err:
print(str(err))
return ""
@staticmethod
def execute_with_subprocess(pip_command: list) -> Tuple[bool, str]:
"""
执行命令并捕获标准输出和错误输出,记录日志。
:param pip_command: 要执行的命令,以列表形式提供
:return: (命令是否成功, 输出信息或错误信息)
"""
try:
# 使用 subprocess.run 捕获标准输出和标准错误
result = subprocess.run(pip_command, check=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 合并 stdout 和 stderr
output = result.stdout + result.stderr
return True, output
except subprocess.CalledProcessError as e:
error_message = f"命令:{' '.join(pip_command)},执行失败,错误信息:{e.stderr.strip()}"
return False, error_message
except Exception as e:
error_message = f"未知错误,命令:{' '.join(pip_command)},错误:{str(e)}"
return False, error_message
@staticmethod
def is_docker() -> bool:
"""
判断是否为Docker环境
"""
return Path("/.dockerenv").exists()
@staticmethod
def is_synology() -> bool:
"""
判断是否为群晖系统
"""
if SystemUtils.is_windows():
return False
return "synology" in SystemUtils.execute('uname -a')
@staticmethod
def is_windows() -> bool:
"""
判断是否为Windows系统
"""
return os.name == "nt"
@staticmethod
def is_frozen() -> bool:
"""
判断是否为冻结的二进制文件
"""
return getattr(sys, 'frozen', False)
@staticmethod
def is_macos() -> bool:
"""
判断是否为MacOS系统
"""
return platform.system() == 'Darwin'
@staticmethod
def is_aarch64() -> bool:
"""
判断是否为ARM64架构
"""
return platform.machine().lower() in ('aarch64', 'arm64')
@staticmethod
def is_aarch() -> bool:
"""
判断是否为ARM32架构
"""
arch_name = platform.machine().lower()
return arch_name.startswith(('arm', 'aarch')) and arch_name not in ('aarch64', 'arm64')
@staticmethod
def is_x86_64() -> bool:
"""
判断是否为AMD64架构
"""
return platform.machine().lower() in ('amd64', 'x86_64')
@staticmethod
def is_x86_32() -> bool:
"""
判断是否为AMD32架构
"""
return platform.machine().lower() in ('i386', 'i686', 'x86', '386', 'x86_32')
@staticmethod
def platform() -> str:
"""
获取系统平台
"""
if SystemUtils.is_windows():
return "Windows"
elif SystemUtils.is_macos():
return "MacOS"
elif SystemUtils.is_aarch64():
return "Arm64"
else:
return "Linux"
@staticmethod
def cpu_arch() -> str:
"""
获取CPU架构
"""
if SystemUtils.is_x86_64():
return "x86_64"
elif SystemUtils.is_x86_32():
return "x86_32"
elif SystemUtils.is_aarch64():
return "Arm64"
elif SystemUtils.is_aarch():
return "Arm32"
else:
return platform.machine()
@staticmethod
def copy(src: Path, dest: Path) -> Tuple[int, str]:
"""
复制
"""
try:
shutil.copy2(src, dest)
return 0, ""
except Exception as err:
return -1, str(err)
@staticmethod
def move(src: Path, dest: Path) -> Tuple[int, str]:
"""
移动
"""
try:
# 当前目录改名
temp = src.replace(src.parent / dest.name)
# 移动到目标目录
shutil.move(temp, dest)
return 0, ""
except Exception as err:
return -1, str(err)
@staticmethod
def link(src: Path, dest: Path) -> Tuple[int, str]:
"""
硬链接
"""
try:
# 准备目标路径,增加后缀 .mp
tmp_path = dest.with_suffix(dest.suffix + ".mp")
# 检查目标路径是否已存在如果存在则先unlink
if tmp_path.exists():
tmp_path.unlink()
tmp_path.hardlink_to(src)
# 硬链接完成,移除 .mp 后缀
shutil.move(tmp_path, dest)
return 0, ""
except Exception as err:
return -1, str(err)
@staticmethod
def softlink(src: Path, dest: Path) -> Tuple[int, str]:
"""
软链接
"""
try:
dest.symlink_to(src)
return 0, ""
except Exception as err:
return -1, str(err)
@staticmethod
def list_files(directory: Path, extensions: list = None,
min_filesize: int = 0, recursive: bool = True) -> List[Path]:
"""
获取目录下所有指定扩展名的文件(包括子目录)
:param directory: 指定的父目录
:param extensions: 需要包含的扩展名列表,例如 ['mkv', 'mp4']
:param min_filesize: 文件最低大小,单位 MB
:param recursive: 是否递归查找,可选参数,默认 True
:return: 文件 Path 列表
"""
if not min_filesize:
min_filesize = 0
if not directory.exists():
return []
if directory.is_file():
return [directory]
files = []
# 预编译正则表达式
if extensions:
pattern = re.compile(r".*(" + "|".join(extensions) + r")$", re.IGNORECASE)
else:
pattern = re.compile(r".*")
def _scan_directory(dir_path: Path, is_recursive: bool):
try:
with os.scandir(dir_path) as entries:
for entry in entries:
try:
if entry.is_file(follow_symlinks=False):
entry_path = Path(entry.path)
if (pattern.match(entry.name) and
(min_filesize <= 0 or entry.stat().st_size >= min_filesize * 1024 * 1024)):
files.append(entry_path)
elif entry.is_dir() and is_recursive:
_scan_directory(Path(entry.path), is_recursive)
except (OSError, PermissionError):
continue
except (OSError, PermissionError):
pass
_scan_directory(directory, recursive)
return files
@staticmethod
def exits_files(directory: Path, extensions: list, min_filesize: int = 0, recursive: bool = True) -> bool:
"""
判断目录下是否存在指定扩展名的文件
:param directory: 指定的父目录
:param extensions: 需要包含的扩展名列表,例如 ['mkv', 'mp4']
:param min_filesize: 文件最低大小,单位 MB
:param recursive: 是否递归查找,可选参数,默认 True
:return: True存在 False不存在
"""
if not directory.exists():
return False
# 预编译正则表达式
if extensions:
pattern = re.compile(r".*(" + "|".join(extensions) + r")$", re.IGNORECASE)
else:
pattern = re.compile(r".*")
if directory.is_file():
# 检查单个文件是否符合条件
if extensions and not pattern.match(directory.name):
return False
if min_filesize > 0 and directory.stat().st_size < min_filesize * 1024 * 1024:
return False
return True
def _search_files(dir_path: Path, is_recursive: bool) -> bool:
try:
with os.scandir(dir_path) as entries:
for entry in entries:
try:
if entry.is_file(follow_symlinks=False):
# 检查文件是否符合条件
if (pattern.match(entry.name) and
(min_filesize <= 0 or entry.stat().st_size >= min_filesize * 1024 * 1024)):
return True
elif entry.is_dir() and is_recursive:
# 递归搜索子目录
if _search_files(Path(entry.path), is_recursive):
return True
except (OSError, PermissionError):
continue
except (OSError, PermissionError):
pass
return False
return _search_files(directory, recursive)
@staticmethod
def list_sub_files(directory: Path, extensions: list) -> List[Path]:
"""
列出当前目录下的所有指定扩展名的文件(不包括子目录)
"""
if not directory.exists():
return []
if directory.is_file():
return [directory]
files = []
# 预编译正则表达式
if extensions:
pattern = re.compile(r".*(" + "|".join(extensions) + r")$", re.IGNORECASE)
else:
pattern = re.compile(r".*")
try:
with os.scandir(directory) as entries:
for entry in entries:
if entry.is_file() and pattern.match(entry.name):
files.append(Path(entry.path))
except OSError:
pass
return files
@staticmethod
def list_sub_directory(directory: Path) -> List[Path]:
"""
列出当前目录下的所有子目录(不递归)
"""
if not directory.exists():
return []
if directory.is_file():
return []
dirs = []
# 遍历目录
for path in directory.iterdir():
if path.is_dir():
if not SystemUtils.is_windows() and path.name.startswith("."):
continue
if path.name == "@eaDir":
continue
dirs.append(path)
return dirs
@staticmethod
def list_sub_file(directory: Path) -> List[Path]:
"""
列出当前目录下的所有子目录和文件(不递归)
"""
if not directory.exists():
return []
if directory.is_file():
return [directory]
items = []
# 遍历目录
for path in directory.iterdir():
if path.is_file():
items.append(path)
return items
@staticmethod
def get_directory_size(path: Path) -> int:
"""
计算目录的大小
参数:
directory_path (Path): 目录路径
返回:
int: 目录的大小(以字节为单位)
"""
if not path or not path.exists():
return 0
def _calc_dir_size(dir_path):
total = 0
try:
with os.scandir(dir_path) as entries:
for entry in entries:
if entry.is_file():
total += entry.stat().st_size
elif entry.is_dir():
total += _calc_dir_size(entry.path)
except OSError:
pass
return total
return _calc_dir_size(path) if path.is_dir() else path.stat().st_size
@staticmethod
def space_usage(dir_list: Union[Path, List[Path]]) -> Tuple[float, float]:
"""
计算多个目录的总可用空间/剩余空间单位Byte并去除重复磁盘
"""
if not dir_list:
return 0.0, 0.0
if not isinstance(dir_list, list):
dir_list = [dir_list]
# 存储不重复的磁盘
disk_set = set()
# 存储总剩余空间
total_free_space = 0.0
# 存储总空间
total_space = 0.0
for dir_path in dir_list:
if not dir_path:
continue
if not dir_path.exists():
continue
# 获取目录所在磁盘
if os.name == "nt":
disk = dir_path.drive
else:
disk = os.stat(dir_path).st_dev
# 如果磁盘未出现过,则计算其剩余空间并加入总剩余空间中
if disk not in disk_set:
disk_set.add(disk)
total_space += SystemUtils.total_space(dir_path)
total_free_space += SystemUtils.free_space(dir_path)
return total_space, total_free_space
@staticmethod
def free_space(path: Path) -> float:
"""
获取指定路径的剩余空间单位Byte
"""
if not os.path.exists(path):
return 0.0
return psutil.disk_usage(str(path)).free
@staticmethod
def total_space(path: Path) -> float:
"""
获取指定路径的总空间单位Byte
"""
if not os.path.exists(path):
return 0.0
return psutil.disk_usage(str(path)).total
@staticmethod
def processes() -> List[schemas.ProcessInfo]:
"""
获取所有进程
"""
processes = []
for proc in psutil.process_iter(['pid', 'name', 'create_time', 'memory_info', 'status']):
try:
if proc.status() != psutil.STATUS_ZOMBIE:
runtime = datetime.datetime.now() - datetime.datetime.fromtimestamp(
int(getattr(proc, 'create_time', 0)()))
mem_info = getattr(proc, 'memory_info', None)()
if mem_info is not None:
mem_mb = round(mem_info.rss / (1024 * 1024), 1)
processes.append(schemas.ProcessInfo(
pid=proc.pid, name=proc.name(), run_time=runtime.seconds, memory=mem_mb
))
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return processes
@staticmethod
def is_bluray_dir(dir_path: Path) -> bool:
"""
判断是否为蓝光原盘目录
(该方法已弃用,改用`StorageChain().is_bluray_folder)`
"""
if not dir_path.is_dir():
return False
# 蓝光原盘目录必备的文件或文件夹
required_files = ['BDMV', 'CERTIFICATE']
# 检查目录下是否存在所需文件或文件夹
for item in required_files:
if (dir_path / item).exists():
return True
return False
@staticmethod
def get_windows_drives():
"""
获取Windows所有盘符
"""
vols = []
for i in range(65, 91):
vol = chr(i) + ':'
if os.path.isdir(vol):
vols.append(vol)
return vols
@staticmethod
def cpu_usage():
"""
获取CPU使用率
"""
return psutil.cpu_percent()
@staticmethod
def memory_usage() -> List[int]:
"""
获取当前程序的内存使用量和使用率
"""
current_process = psutil.Process()
process_memory = current_process.memory_info().rss
system_memory = psutil.virtual_memory().total
process_memory_percent = (process_memory / system_memory) * 100
return [process_memory, int(process_memory_percent)]
@staticmethod
def network_usage() -> List[int]:
"""
获取当前网络流量上行和下行流量单位bytes/s
"""
import time
# 获取初始网络统计
net_io_1 = psutil.net_io_counters()
time.sleep(1) # 等待1秒
# 获取1秒后的网络统计
net_io_2 = psutil.net_io_counters()
# 计算1秒内的流量变化
upload_speed = net_io_2.bytes_sent - net_io_1.bytes_sent
download_speed = net_io_2.bytes_recv - net_io_1.bytes_recv
return [upload_speed, download_speed]
@staticmethod
def is_hardlink(src: Path, dest: Path) -> bool:
"""
判断是否为硬链接可能无法支持宿主机挂载smb盘符映射docker的场景
"""
try:
if not src.exists() or not dest.exists():
return False
if src.is_file():
# 如果是文件,直接比较文件
return src.samefile(dest)
else:
for src_file in src.glob("**/*"):
if src_file.is_dir():
continue
# 计算目标文件路径
relative_path = src_file.relative_to(src)
target_file = dest.joinpath(relative_path)
# 检查是否是硬链接
if not target_file.exists() or not src_file.samefile(target_file):
return False
return True
except Exception as e:
print(f"Error occurred: {e}")
return False
@staticmethod
def is_network_filesystem(directory: Path) -> bool:
"""
检测是否为网络文件系统
:param directory: 目录路径
:return: 是否为网络文件系统
"""
try:
system = platform.system()
if system == 'Linux':
# 检查挂载信息
result = subprocess.run(['df', '-T', str(directory)],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
output = result.stdout.lower()
# 以下本地文件系统含有fuse关键字
local_fs = [
"fuse.shfs", # Unraid
"zfuse.zfsv", # 极空间(zfuse.zfsv2、zfuse.zfsv3、...)
# TBD
]
if any(fs in output for fs in local_fs):
return False
network_fs = ['nfs', 'cifs', 'smbfs', 'fuse', 'sshfs', 'ftpfs']
return any(fs in output for fs in network_fs)
elif system == 'Darwin':
# macOS 检查
result = subprocess.run(['df', '-T', str(directory)],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
output = result.stdout.lower()
return 'nfs' in output or 'smbfs' in output
elif system == 'Windows':
# Windows 检查网络驱动器
return str(directory).startswith('\\\\')
except Exception as e:
print(f"Error occurred: {e}")
return False
@staticmethod
def is_same_disk(src: Path, dest: Path) -> bool:
"""
判断两个路径是否在同一磁盘
"""
if not src.exists() or not dest.exists():
return False
if os.name == "nt":
return src.drive == dest.drive
return os.stat(src).st_dev == os.stat(dest).st_dev
@staticmethod
def get_config_path(config_dir: Optional[str] = None) -> Path:
"""
获取配置路径
"""
if not config_dir:
config_dir = os.getenv("CONFIG_DIR")
if config_dir:
return Path(config_dir)
if SystemUtils.is_docker():
return Path("/config")
elif SystemUtils.is_frozen():
return Path(sys.executable).parent / "config"
else:
return Path(__file__).parents[2] / "config"
@staticmethod
def get_env_path() -> Path:
"""
获取配置路径
"""
return SystemUtils.get_config_path() / "app.env"
@staticmethod
def clear(temp_path: Path, days: int):
"""
清理指定目录中指定天数前的文件,递归删除子文件及空文件夹
"""
if not temp_path.exists():
return
# 遍历目录及子目录中的所有文件和文件夹
for file in temp_path.rglob('*'):
# 如果是文件并且符合时间条件,则删除
if file.is_file() and (
datetime.datetime.now() - datetime.datetime.fromtimestamp(file.stat().st_mtime)).days > days:
file.unlink()
# 删除空的文件夹
for folder in sorted(temp_path.rglob('*'), reverse=True):
# 确保是空文件夹
if folder.is_dir() and not any(folder.iterdir()):
folder.rmdir()
@staticmethod
def generate_user_unique_id():
"""
根据优先级依次尝试生成稳定唯一ID
1. 文件系统唯一标识符。
2. MAC 地址。
3. 主机名。
"""
def get_filesystem_unique_id():
"""
获取文件系统的唯一标识符。
使用根目录的设备号和 inode。
"""
try:
stat_info = os.stat("/")
fs_id = f"{stat_info.st_dev}-{stat_info.st_ino}"
return hashlib.sha256(fs_id.encode("utf-8")).hexdigest()
except Exception as e:
print(str(e))
return None
def get_mac_address_id():
"""
获取设备的 MAC 地址并生成唯一标识符。
"""
try:
mac_address = uuid.getnode()
if (mac_address >> 40) % 2: # 检查是否是虚拟MAC地址
raise ValueError("MAC地址可能是虚拟地址")
mac_str = f"{mac_address:012x}"
return hashlib.sha256(mac_str.encode("utf-8")).hexdigest()
except Exception as e:
print(str(e))
return None
for method in [get_filesystem_unique_id, get_mac_address_id]:
unique_id = method()
if unique_id:
return unique_id
return None