Files
MoviePilot/scripts/local_setup.py
jxxghp a05ffc07d4 refactor: remove legacy LLM_DISABLE_THINKING and LLM_REASONING_EFFORT config, unify thinking_level handling
- Eliminate support for LLM_DISABLE_THINKING and LLM_REASONING_EFFORT in config, code, and tests
- Simplify LLM thinking level logic to rely solely on LLM_THINKING_LEVEL
- Refactor LLMHelper and related endpoints to remove legacy parameter handling
- Update system API and test utilities to match new configuration structure
- Minor code cleanup and formatting improvements
2026-04-25 10:42:03 +08:00

3475 lines
114 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import asyncio
import getpass
import json
import os
import platform
import secrets
import re
import shlex
import shutil
import subprocess
import sys
import tarfile
import textwrap
import uuid
import zipfile
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Optional
ROOT = Path(__file__).resolve().parents[1]
LEGACY_CONFIG_DIR = ROOT / "config"
HELPER_DIR = ROOT / "app" / "helper"
PUBLIC_DIR = ROOT / "public"
RUNTIME_DIR = ROOT / ".runtime"
NODE_DIR = RUNTIME_DIR / "node"
INSTALL_ENV_FILE = ROOT / ".moviepilot.env"
MIN_PYTHON_VERSION = (3, 11)
SUPPORTED_PYTHON_TEXT = (
f"Python {MIN_PYTHON_VERSION[0]}.{MIN_PYTHON_VERSION[1]} 或更高版本"
)
CONFIG_DIR = LEGACY_CONFIG_DIR
LOG_DIR = CONFIG_DIR / "logs"
CACHE_DIR = CONFIG_DIR / "cache"
TEMP_DIR = CONFIG_DIR / "temp"
COOKIE_DIR = CONFIG_DIR / "cookies"
ENV_FILE = CONFIG_DIR / "app.env"
DEFAULT_NODE_VERSION = "20.12.1"
FRONTEND_LATEST_API = (
"https://api.github.com/repos/jxxghp/MoviePilot-Frontend/releases/latest"
)
FRONTEND_TAG_API = (
"https://api.github.com/repos/jxxghp/MoviePilot-Frontend/releases/tags/{tag}"
)
RESOURCES_MAIN_ZIP = (
"https://github.com/jxxghp/MoviePilot-Resources/archive/refs/heads/main.zip"
)
LLM_PROVIDER_DEFAULTS = {
"deepseek": {
"model": "deepseek-chat",
"base_url": "https://api.deepseek.com",
},
"openai": {
"model": "gpt-4o-mini",
"base_url": "https://api.openai.com/v1",
},
"google": {
"model": "gemini-2.5-flash",
"base_url": "",
},
}
RUNTIME_PACKAGE = {
"name": "moviepilot-frontend-runtime",
"private": True,
"license": "UNLICENSED",
"dependencies": {
"express": "^4.18.2",
"express-http-proxy": "^2.0.0",
},
}
LOCAL_FRONTEND_SERVICE_SCRIPT = textwrap.dedent(
"""
const http = require('node:http')
const path = require('node:path')
const express = require('express')
const proxy = require('express-http-proxy')
const app = express()
const backendHost = process.env.MOVIEPILOT_BACKEND_HOST || '127.0.0.1'
const backendPort = Number(process.env.PORT || 3001)
const frontendPort = Number(process.env.NGINX_PORT || 3000)
const backendHealthPath = '/api/v1/system/global?token=moviepilot'
const backendHealthTimeoutMs = Number(process.env.MOVIEPILOT_FRONTEND_HEALTH_TIMEOUT_MS || 3000)
const backendHealthIntervalMs = Number(process.env.MOVIEPILOT_FRONTEND_HEALTH_INTERVAL_MS || 15000)
const backendMaxFailures = Math.max(
Number(process.env.MOVIEPILOT_FRONTEND_MAX_FAILURES || 4),
1
)
function sleep (ms) {
return new Promise(resolve => setTimeout(resolve, ms))
}
function checkBackendHealth () {
return new Promise(resolve => {
const request = http.request(
{
host: backendHost,
port: backendPort,
path: backendHealthPath,
method: 'GET',
timeout: backendHealthTimeoutMs
},
response => {
let body = ''
response.setEncoding('utf8')
response.on('data', chunk => {
body += chunk
})
response.on('end', () => {
if (response.statusCode !== 200) {
resolve(false)
return
}
try {
const payload = JSON.parse(body)
resolve(payload?.success !== false)
} catch (error) {
// 健康检查接口只要返回 200就允许继续提供前端服务。
resolve(true)
}
})
}
)
request.on('timeout', () => {
request.destroy(new Error('backend health check timeout'))
})
request.on('error', () => {
resolve(false)
})
request.end()
})
}
async function waitForBackendReady () {
for (let attempt = 1; attempt <= backendMaxFailures; attempt += 1) {
if (await checkBackendHealth()) {
return true
}
if (attempt < backendMaxFailures) {
await sleep(1000)
}
}
return false
}
function startBackendWatchdog (server) {
let consecutiveFailures = 0
let checking = false
const timer = setInterval(async () => {
if (checking) {
return
}
checking = true
try {
const healthy = await checkBackendHealth()
if (healthy) {
consecutiveFailures = 0
return
}
consecutiveFailures += 1
console.warn(
`Backend health check failed (${consecutiveFailures}/${backendMaxFailures})`
)
if (consecutiveFailures < backendMaxFailures) {
return
}
clearInterval(timer)
console.error('Backend is unavailable, stopping frontend service')
server.close(() => process.exit(1))
setTimeout(() => process.exit(1), 1000).unref()
} finally {
checking = false
}
}, backendHealthIntervalMs)
timer.unref()
const shutdown = signal => {
clearInterval(timer)
console.log(`Received ${signal}, shutting down frontend service`)
server.close(() => process.exit(0))
setTimeout(() => process.exit(0), 1000).unref()
}
process.on('SIGINT', () => shutdown('SIGINT'))
process.on('SIGTERM', () => shutdown('SIGTERM'))
}
// 静态文件服务目录
app.use(express.static(__dirname))
// 配置代理中间件将请求转发给后端 API。
app.use(
'/api',
proxy(`${backendHost}:${backendPort}`, {
proxyReqPathResolver: req => `/api${req.url}`
})
)
// 配置代理中间件将 CookieCloud 请求转发给后端 API。
app.use(
'/cookiecloud',
proxy(`${backendHost}:${backendPort}`, {
proxyReqPathResolver: req => `/cookiecloud${req.url}`
})
)
// 处理根路径的请求。
app.get('/', (req, res) => {
res.sendFile(path.join(__dirname, 'index.html'))
})
// 处理所有其他请求,重定向到前端入口文件。
app.get('*', (req, res) => {
res.sendFile(path.join(__dirname, 'index.html'))
})
async function bootstrap () {
// 前端本地代理不再允许单独存活,避免设备重启后只剩前端进程。
const backendReady = await waitForBackendReady()
if (!backendReady) {
console.error('Backend is unavailable, skip starting frontend service')
process.exit(1)
}
const server = app.listen(frontendPort, () => {
console.log(`Server is running on port ${frontendPort}`)
})
startBackendWatchdog(server)
}
bootstrap().catch(error => {
console.error(`Failed to start frontend service: ${error?.message || error}`)
process.exit(1)
})
"""
).lstrip()
NOTIFICATION_SWITCH_TYPES = [
"资源下载",
"整理入库",
"订阅",
"站点",
"媒体服务器",
"手动处理",
"插件",
"智能体",
"其它",
]
UNINSTALL_CONFIRM_TEXT = "UNINSTALL"
RESOURCE_FILE_PATTERNS = ("sites*", "user.sites*.bin")
AUTOSTART_ENV_KEY = "MOVIEPILOT_AUTO_START"
AUTOSTART_RUNTIME_DIR = RUNTIME_DIR / "startup"
AUTOSTART_UNIX_LAUNCHER = AUTOSTART_RUNTIME_DIR / "moviepilot-start.sh"
AUTOSTART_WINDOWS_LAUNCHER = AUTOSTART_RUNTIME_DIR / "moviepilot-start.cmd"
AUTOSTART_TIMEOUT = 120
MACOS_LAUNCH_AGENT_LABEL = "org.moviepilot.localcli"
LINUX_SYSTEMD_UNIT_NAME = "moviepilot-autostart.service"
LINUX_XDG_AUTOSTART_FILENAME = "moviepilot.desktop"
WINDOWS_STARTUP_FILENAME = "MoviePilot Startup.cmd"
def _default_config_dir() -> Path:
if platform.system() == "Darwin":
return Path.home() / "Library" / "Application Support" / "MoviePilot"
return (
Path(os.getenv("XDG_CONFIG_HOME") or (Path.home() / ".config")) / "moviepilot"
)
def _legacy_runtime_config_exists() -> bool:
markers = [
LEGACY_CONFIG_DIR / "app.env",
LEGACY_CONFIG_DIR / "user.db",
LEGACY_CONFIG_DIR / "logs",
LEGACY_CONFIG_DIR / "temp",
LEGACY_CONFIG_DIR / "cache",
LEGACY_CONFIG_DIR / "cookies",
LEGACY_CONFIG_DIR / "sites",
]
return any(marker.exists() for marker in markers)
def _read_install_env_config_dir() -> Optional[Path]:
if not INSTALL_ENV_FILE.exists():
return None
for line in INSTALL_ENV_FILE.read_text(encoding="utf-8").splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
continue
key, value = stripped.split("=", 1)
if key.strip() != "CONFIG_DIR":
continue
return Path(value.strip().strip('"').strip("'")).expanduser()
return None
def _set_config_dir(config_dir: Path) -> Path:
global CONFIG_DIR, LOG_DIR, CACHE_DIR, TEMP_DIR, COOKIE_DIR, ENV_FILE
CONFIG_DIR = config_dir.expanduser().resolve()
LOG_DIR = CONFIG_DIR / "logs"
CACHE_DIR = CONFIG_DIR / "cache"
TEMP_DIR = CONFIG_DIR / "temp"
COOKIE_DIR = CONFIG_DIR / "cookies"
ENV_FILE = CONFIG_DIR / "app.env"
os.environ["CONFIG_DIR"] = str(CONFIG_DIR)
return CONFIG_DIR
def _write_install_env(config_dir: Path) -> None:
INSTALL_ENV_FILE.write_text(
f"CONFIG_DIR={shlex.quote(str(config_dir.expanduser().resolve()))}\n",
encoding="utf-8",
)
def _seed_default_config_files(target_dir: Path) -> None:
for name in ("category.yaml",):
source = LEGACY_CONFIG_DIR / name
target = target_dir / name
if source.exists() and not target.exists():
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, target)
def _migrate_legacy_config_if_needed(target_dir: Path) -> None:
target_dir = target_dir.expanduser().resolve()
if target_dir == LEGACY_CONFIG_DIR.resolve():
return
if not _legacy_runtime_config_exists():
_seed_default_config_files(target_dir)
return
target_dir.mkdir(parents=True, exist_ok=True)
for source in sorted(LEGACY_CONFIG_DIR.iterdir()):
target = target_dir / source.name
if target.exists():
continue
if source.is_dir():
shutil.copytree(source, target)
else:
shutil.copy2(source, target)
print_step(f"已将现有本地配置迁移到 {target_dir}")
def configure_config_dir(
explicit: Optional[Path] = None,
*,
persist: bool = False,
prefer_external: bool = False,
) -> Path:
if explicit:
config_dir = explicit.expanduser().resolve()
elif os.getenv("CONFIG_DIR"):
config_dir = Path(os.environ["CONFIG_DIR"]).expanduser().resolve()
else:
install_env_dir = _read_install_env_config_dir()
if install_env_dir:
config_dir = install_env_dir.resolve()
elif prefer_external:
config_dir = _default_config_dir().resolve()
elif _legacy_runtime_config_exists():
config_dir = LEGACY_CONFIG_DIR.resolve()
else:
config_dir = _default_config_dir().resolve()
_set_config_dir(config_dir)
if prefer_external:
_migrate_legacy_config_if_needed(config_dir)
if persist:
_write_install_env(config_dir)
return config_dir
def resolve_config_dir(
explicit: Optional[Path] = None,
*,
prefer_external: bool = False,
) -> Path:
"""
解析当前命令应使用的配置目录,但不写入环境变量或安装元数据。
该函数用于交互式命令在真正持久化配置目录前,先给用户展示默认值。
"""
if explicit:
return explicit.expanduser().resolve()
if os.getenv("CONFIG_DIR"):
return Path(os.environ["CONFIG_DIR"]).expanduser().resolve()
install_env_dir = _read_install_env_config_dir()
if install_env_dir:
return install_env_dir.resolve()
if prefer_external:
return _default_config_dir().resolve()
if _legacy_runtime_config_exists():
return LEGACY_CONFIG_DIR.resolve()
return _default_config_dir().resolve()
configure_config_dir()
def print_step(message: str) -> None:
print(f"==> {message}")
def run(command: list[str], cwd: Optional[Path] = None) -> None:
pretty = " ".join(command)
print(f"+ {pretty}")
subprocess.run(command, cwd=str(cwd or ROOT), check=True)
def capture(command: list[str], cwd: Optional[Path] = None) -> str:
return subprocess.check_output(command, cwd=str(cwd or ROOT), text=True).strip()
def command_exists(name: str) -> bool:
return shutil.which(name) is not None
def get_python_version(python_bin: str) -> tuple[int, int, int]:
version_json = capture(
[
python_bin,
"-c",
"import json, sys; print(json.dumps(list(sys.version_info[:3])))",
]
)
version_info = json.loads(version_json)
if not isinstance(version_info, list) or len(version_info) < 3:
raise RuntimeError(f"无法识别 Python 版本信息:{python_bin}")
return int(version_info[0]), int(version_info[1]), int(version_info[2])
def discover_supported_python() -> Optional[str]:
candidates = [
f"python3.{minor}" for minor in range(20, MIN_PYTHON_VERSION[1] - 1, -1)
]
if sys.executable:
candidates.append(sys.executable)
candidates.extend(["python3", "python"])
seen: set[str] = set()
for candidate in candidates:
if not candidate or candidate in seen:
continue
seen.add(candidate)
python_path = (
candidate if os.sep in candidate else (shutil.which(candidate) or "")
)
if not python_path:
continue
try:
version = get_python_version(python_path)
except (OSError, subprocess.CalledProcessError, json.JSONDecodeError):
continue
if version >= MIN_PYTHON_VERSION:
return python_path
return None
DEFAULT_BOOTSTRAP_PYTHON = discover_supported_python() or sys.executable
def get_venv_python(venv_dir: Path) -> Path:
if os.name == "nt":
return venv_dir / "Scripts" / "python.exe"
return venv_dir / "bin" / "python"
def ensure_supported_python(python_bin: str) -> None:
version = get_python_version(python_bin)
if version < MIN_PYTHON_VERSION:
raise RuntimeError(
f"MoviePilot 本地安装需要 {SUPPORTED_PYTHON_TEXT},当前解释器为 {python_bin} "
f"({version[0]}.{version[1]}.{version[2]})"
)
def ensure_local_dirs() -> None:
for path in (CONFIG_DIR, LOG_DIR, CACHE_DIR, TEMP_DIR, COOKIE_DIR, RUNTIME_DIR):
path.mkdir(parents=True, exist_ok=True)
_seed_default_config_files(CONFIG_DIR)
def _load_env_lines() -> list[str]:
if not ENV_FILE.exists():
return []
return ENV_FILE.read_text(encoding="utf-8").splitlines(keepends=True)
def _serialize_env_value(value: Any) -> str:
if isinstance(value, Path):
value = str(value)
if value is None:
return '""'
return json.dumps(value, ensure_ascii=False)
def read_env_value(key: str) -> Optional[str]:
for line in _load_env_lines():
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in line:
continue
env_key, value = line.split("=", 1)
if env_key.strip() == key:
return value.strip().strip('"').strip("'")
return None
def write_env_value(key: str, value: str) -> None:
ensure_local_dirs()
lines = _load_env_lines()
new_line = f"{key}={_serialize_env_value(value)}\n"
for index, line in enumerate(lines):
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in line:
continue
env_key, _ = line.split("=", 1)
if env_key.strip() == key:
lines[index] = new_line
break
else:
if lines and not lines[-1].endswith("\n"):
lines[-1] += "\n"
lines.append(new_line)
ENV_FILE.write_text("".join(lines), encoding="utf-8")
def write_env_values(values: dict[str, Any]) -> None:
for key, value in values.items():
write_env_value(key, value)
def ensure_api_token(force_token: bool = False, token: Optional[str] = None) -> str:
ensure_local_dirs()
current_token = read_env_value("API_TOKEN") or ""
if token is not None:
token = str(token).strip()
if len(token) < 16:
raise ValueError("API_TOKEN 长度不能少于 16 个字符")
write_env_value("API_TOKEN", token)
print_step(f"已写入 API_TOKEN 到 {ENV_FILE}")
return token
if current_token and len(current_token) >= 16 and not force_token:
print_step("保留现有 API_TOKEN")
return current_token
new_token = secrets.token_urlsafe(16)
write_env_value("API_TOKEN", new_token)
print_step(f"已写入 API_TOKEN 到 {ENV_FILE}")
return new_token
def _download_to_stdout(url: str) -> str:
headers = [
"-H",
"Accept: application/vnd.github+json",
"-H",
"User-Agent: MoviePilot-CLI",
]
if command_exists("curl"):
return capture(["curl", "-fsSL", *headers, url])
if command_exists("wget"):
return capture(
[
"wget",
"-qO-",
"--header=Accept: application/vnd.github+json",
"--header=User-Agent: MoviePilot-CLI",
url,
]
)
raise RuntimeError("未找到可用的下载工具,请先安装 curl 或 wget")
def download_file(url: str, target: Path) -> None:
target.parent.mkdir(parents=True, exist_ok=True)
if command_exists("curl"):
run(["curl", "-fsSL", url, "-o", str(target)])
return
if command_exists("wget"):
run(["wget", "-qO", str(target), url])
return
raise RuntimeError("未找到可用的下载工具,请先安装 curl 或 wget")
def fetch_json(url: str) -> dict[str, Any]:
payload = _download_to_stdout(url)
try:
data = json.loads(payload)
except json.JSONDecodeError as exc:
raise RuntimeError(f"无法解析远程响应:{url}") from exc
if (
isinstance(data, dict)
and data.get("message")
and isinstance(data.get("message"), str)
and "API rate limit" in data["message"]
):
raise RuntimeError(f"访问 GitHub API 失败:{data['message']}")
if not isinstance(data, dict):
raise RuntimeError(f"接口返回格式异常:{url}")
return data
def extract_archive(archive_path: Path, target_dir: Path) -> None:
target_dir.mkdir(parents=True, exist_ok=True)
if archive_path.suffix == ".zip":
with zipfile.ZipFile(archive_path) as zip_file:
zip_file.extractall(target_dir)
return
with tarfile.open(archive_path, "r:*") as tar_file:
extract_kwargs: dict[str, Any] = {}
if sys.version_info >= (3, 12):
extract_kwargs["filter"] = "data"
tar_file.extractall(target_dir, **extract_kwargs)
def _remove_path(path: Path) -> None:
if not path.exists():
return
if path.is_symlink() or path.is_file():
path.unlink()
return
shutil.rmtree(path)
def _resolve_frontend_release(frontend_version: str) -> tuple[str, str]:
if frontend_version == "latest":
release = fetch_json(FRONTEND_LATEST_API)
else:
release = fetch_json(FRONTEND_TAG_API.format(tag=frontend_version))
tag_name = str(release.get("tag_name") or "").strip()
if not tag_name:
raise RuntimeError("未能获取前端版本号")
for asset in release.get("assets") or []:
if asset.get("name") == "dist.zip" and asset.get("browser_download_url"):
return tag_name, str(asset["browser_download_url"])
raise RuntimeError(f"前端版本 {tag_name} 未找到 dist.zip 发布资产")
def _frontend_runtime_ready(frontend_version: str) -> bool:
version_file = PUBLIC_DIR / "version.txt"
if not version_file.exists() or not (PUBLIC_DIR / "service.js").exists():
return False
if not (PUBLIC_DIR / "node_modules" / "express").exists():
return False
try:
return version_file.read_text(encoding="utf-8").strip() == frontend_version
except OSError:
return False
def _write_local_frontend_service_script(target_dir: Path) -> None:
"""
覆盖前端 release 自带的 service.js统一使用本地 CLI 的受控代理脚本。
"""
(target_dir / "service.js").write_text(
LOCAL_FRONTEND_SERVICE_SCRIPT,
encoding="utf-8",
)
def _node_platform() -> tuple[str, str]:
system_name = platform.system().lower()
machine = platform.machine().lower()
if system_name == "darwin":
if machine in {"arm64", "aarch64"}:
return "darwin-arm64", "tar.gz"
if machine in {"x86_64", "amd64"}:
return "darwin-x64", "tar.gz"
elif system_name == "linux":
if machine in {"aarch64", "arm64"}:
return "linux-arm64", "tar.xz"
if machine in {"x86_64", "amd64"}:
return "linux-x64", "tar.xz"
raise RuntimeError(
f"当前系统暂不支持自动安装本地 Node 运行时:{platform.system()} / {platform.machine()}"
)
def get_node_bin(node_dir: Path = NODE_DIR) -> Path:
if os.name == "nt":
return node_dir / "node.exe"
return node_dir / "bin" / "node"
def get_npm_bin(node_dir: Path = NODE_DIR) -> Path:
if os.name == "nt":
return node_dir / "npm.cmd"
return node_dir / "bin" / "npm"
def install_node_runtime(node_version: str) -> Path:
node_bin = get_node_bin()
if node_bin.exists():
try:
current_version = capture([str(node_bin), "--version"]).lstrip("v")
except subprocess.CalledProcessError:
current_version = ""
if current_version == node_version:
return node_bin
_remove_path(NODE_DIR)
platform_tag, archive_ext = _node_platform()
archive_name = f"node-v{node_version}-{platform_tag}.{archive_ext}"
download_url = f"https://nodejs.org/dist/v{node_version}/{archive_name}"
print_step(f"下载本地 Node 运行时 v{node_version} ({platform_tag})")
with TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
archive_path = temp_path / archive_name
extract_dir = temp_path / "extract"
download_file(download_url, archive_path)
extract_archive(archive_path, extract_dir)
extracted_roots = [item for item in extract_dir.iterdir() if item.is_dir()]
if not extracted_roots:
raise RuntimeError("Node 运行时解压失败")
_remove_path(NODE_DIR)
shutil.move(str(extracted_roots[0]), str(NODE_DIR))
node_bin = get_node_bin()
if not node_bin.exists():
raise RuntimeError("Node 运行时安装失败,未找到 node 可执行文件")
print_step(f"Node 运行时已安装到 {NODE_DIR}")
return node_bin
def install_frontend(frontend_version: str, node_version: str) -> dict[str, str]:
version_tag, download_url = _resolve_frontend_release(frontend_version)
node_bin = install_node_runtime(node_version)
if _frontend_runtime_ready(version_tag):
_write_local_frontend_service_script(PUBLIC_DIR)
print_step(f"前端发布包已是最新版本:{version_tag}")
return {"version": version_tag, "node": str(node_bin)}
print_step(f"下载前端发布包:{version_tag}")
with TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
archive_path = temp_path / "dist.zip"
extract_dir = temp_path / "extract"
download_file(download_url, archive_path)
extract_archive(archive_path, extract_dir)
dist_dir = extract_dir / "dist"
if not dist_dir.exists():
raise RuntimeError("前端发布包中未找到 dist 目录")
_remove_path(PUBLIC_DIR)
shutil.move(str(dist_dir), str(PUBLIC_DIR))
_write_local_frontend_service_script(PUBLIC_DIR)
runtime_package = dict(RUNTIME_PACKAGE)
runtime_package["version"] = version_tag
(PUBLIC_DIR / "package.json").write_text(
json.dumps(runtime_package, ensure_ascii=False, indent=2) + "\n",
encoding="utf-8",
)
npm_bin = get_npm_bin()
if not npm_bin.exists():
raise RuntimeError("未找到 npm 可执行文件Node 运行时可能损坏")
print_step("安装前端运行依赖")
run(
[
str(npm_bin),
"install",
"--no-fund",
"--no-audit",
"--omit=dev",
],
cwd=PUBLIC_DIR,
)
return {"version": version_tag, "node": str(node_bin)}
def local_resource_status() -> bool:
return (HELPER_DIR / "user.sites.v2.bin").exists() and bool(
list(HELPER_DIR.glob("sites*"))
)
def copy_resource_files(source_dir: Path) -> list[str]:
if not source_dir.is_dir():
raise FileNotFoundError(f"资源目录不存在:{source_dir}")
copied: list[str] = []
for source in sorted(source_dir.iterdir()):
if source.is_dir():
continue
target = HELPER_DIR / source.name
shutil.copy2(source, target)
copied.append(source.name)
if not copied:
raise RuntimeError(f"资源目录中未找到可复制文件:{source_dir}")
print_step(f"已同步资源文件到 {HELPER_DIR}")
return copied
def _get_platform_tag() -> str:
system = platform.system().lower()
machine = platform.machine().lower()
if system == "darwin":
if machine in {"arm64", "aarch64"}:
return "darwin", "arm64"
if machine in {"x86_64", "amd64"}:
return "darwin", "x86_64"
elif system == "linux":
if machine in {"aarch64", "arm64"}:
return "linux", "aarch64"
if machine in {"x86_64", "amd64"}:
return "linux", "x86_64"
elif system == "windows":
return "windows", "amd64"
raise RuntimeError(f"不支持的平台:{system} / {machine}")
def _get_python_version_tag() -> str:
version = sys.version_info
return f"cp{version.major}{version.minor}"
def _filter_resources_files(
source_dir: Path, platform_tag: str, python_version: str
) -> list[Path]:
matched_files: list[Path] = []
for file in source_dir.iterdir():
if not file.is_file():
continue
filename = file.name
if filename == "user.sites.v2.bin":
matched_files.append(file)
continue
if not filename.startswith("sites."):
continue
if platform_tag == "windows":
if filename == f"sites.cp{python_version.replace('cp', '')}-win_amd64.pyd":
matched_files.append(file)
elif platform_tag == "darwin":
if (
filename
== f"sites.cpython-{python_version.replace('cp', '')}-darwin.so"
):
matched_files.append(file)
elif platform_tag == "linux":
if (
f"cpython-{python_version.replace('cp', '')}" in filename
and "linux-gnu" in filename
):
matched_files.append(file)
return matched_files
def _download_resources_dir() -> Path:
with TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
archive_path = temp_path / "resources.zip"
extract_dir = temp_path / "extract"
print_step("下载资源包")
download_file(RESOURCES_MAIN_ZIP, archive_path)
extract_archive(archive_path, extract_dir)
source_dir = extract_dir / "MoviePilot-Resources-main" / "resources.v2"
if not source_dir.exists():
raise RuntimeError("资源压缩包中未找到 resources.v2 目录")
platform_name, machine = _get_platform_tag()
python_version = _get_python_version_tag()
print_step(
f"当前平台:{platform_name}-{machine}Python 版本:{python_version}"
)
matched_files = _filter_resources_files(
source_dir, platform_name, python_version
)
if not matched_files:
raise RuntimeError(
f"未找到匹配的 sites 资源文件:{platform_name} / {python_version}"
)
staging_dir = temp_path / "staging"
staging_dir.mkdir(parents=True, exist_ok=True)
for file in matched_files:
target = staging_dir / file.name
shutil.copy2(file, target)
persisted = TEMP_DIR / "resources.v2"
_remove_path(persisted)
shutil.copytree(staging_dir, persisted)
print_step(f"已筛选对应平台的资源文件,共 {len(matched_files)}")
return persisted
def _resolve_local_resource_dir(
resources_repo: Optional[Path], resource_dir: Optional[Path]
) -> Optional[Path]:
if resource_dir:
resolved = resource_dir.expanduser().resolve()
if resolved.is_dir():
return resolved
raise FileNotFoundError(f"资源目录不存在:{resolved}")
if resources_repo:
repo_dir = resources_repo.expanduser().resolve()
candidates = [
repo_dir / "resources.v2",
repo_dir / "resources" / "resources.v2",
repo_dir / "resources" / "v2",
repo_dir / "resources.v2",
]
for candidate in candidates:
if candidate.is_dir():
return candidate
raise FileNotFoundError(f"未在 {repo_dir} 下找到 resources.v2 目录")
return None
def install_resources(
resources_repo: Optional[Path], resource_dir: Optional[Path]
) -> list[str]:
ensure_local_dirs()
source_dir = _resolve_local_resource_dir(resources_repo, resource_dir)
if source_dir is None:
source_dir = _download_resources_dir()
copied = copy_resource_files(source_dir)
print_step(f"资源初始化完成,共处理 {len(copied)} 个文件")
return copied
def _is_interactive() -> bool:
return sys.stdin.isatty() and sys.stdout.isatty()
def _normalize_choice(value: str) -> str:
return value.strip().lower().replace("_", "").replace("-", "")
def _env_default(key: str, default: str = "") -> str:
value = read_env_value(key)
if value is None or value == "":
return default
return value
def _env_bool(key: str, default: bool) -> bool:
value = read_env_value(key)
if value is None or value == "":
return default
return value.strip().lower() in {"1", "true", "yes", "y", "on"}
def _env_int(key: str, default: int) -> int:
value = read_env_value(key)
if value is None or value == "":
return default
try:
return int(value)
except (TypeError, ValueError):
return default
def _prompt_text(
label: str,
*,
default: Optional[str] = None,
allow_empty: bool = False,
secret: bool = False,
) -> str:
while True:
suffix = f" [{default}]" if default not in (None, "") and not secret else ""
prompt = f"{label}{suffix}: "
value = getpass.getpass(prompt) if secret else input(prompt)
value = value.strip()
if not value and default is not None:
return str(default)
if value:
return value
if allow_empty:
return ""
print("请输入有效内容,或使用回车接受默认值。")
def _prompt_secret_text(
label: str,
*,
current_value: Optional[str] = None,
allow_empty: bool = False,
required: bool = False,
) -> str:
while True:
suffix = " [留空保持现有值]" if current_value not in (None, "") else ""
prompt = f"{label}{suffix}: "
value = getpass.getpass(prompt).strip()
if value:
return value
if current_value is not None and current_value != "":
return current_value
if allow_empty and not required:
return ""
if not required:
return ""
print("请输入有效内容。")
def _prompt_yes_no(label: str, default: bool = True) -> bool:
suffix = "Y/n" if default else "y/N"
while True:
raw = input(f"{label} [{suffix}]: ").strip().lower()
if not raw:
return default
if raw in {"y", "yes"}:
return True
if raw in {"n", "no"}:
return False
print("请输入 y 或 n。")
def _prompt_choice(label: str, choices: dict[str, str], default: str) -> str:
labels = []
normalized_map: dict[str, str] = {}
for key, desc in choices.items():
labels.append(f"{key}({desc})")
normalized_map[_normalize_choice(key)] = key
while True:
raw = input(f"{label} [{'/'.join(labels)}] (默认 {default}): ").strip()
if not raw:
return default
normalized = _normalize_choice(raw)
if normalized in normalized_map:
return normalized_map[normalized]
print("请输入列表中的可选值。")
def _env_llm_thinking_level_default() -> str:
value = _normalize_choice(_env_default("LLM_THINKING_LEVEL", ""))
alias_map = {
"none": "off",
"disabled": "off",
"disable": "off",
"enabled": "auto",
"enable": "auto",
"default": "auto",
"dynamic": "auto",
}
normalized = alias_map.get(value, value)
if normalized in {
"off",
"auto",
"minimal",
"low",
"medium",
"high",
"max",
"xhigh",
}:
return normalized
return "auto"
def _prompt_path(label: str, *, default: Path, allow_empty: bool = False) -> str:
value = _prompt_text(label, default=str(default), allow_empty=allow_empty)
if not value:
return ""
return str(Path(value).expanduser().resolve())
def _resolve_interactive_config_dir(
command: str, explicit_config_dir: Optional[Path]
) -> Optional[Path]:
"""
`setup` / `init` 是最常见的本地安装入口。
当用户没有显式传入 `--config-dir` 且当前终端可交互时,先询问一次配置目录,
并把程序外默认路径展示出来,避免用户安装后才发现配置写到了别处。
"""
if explicit_config_dir or command not in {"init", "setup"} or not _is_interactive():
return explicit_config_dir
default_config_dir = resolve_config_dir(prefer_external=True)
print_step("安装将使用程序目录外的配置目录,直接回车可接受默认值")
selected_path = _prompt_path("配置目录", default=default_config_dir)
return Path(selected_path) if selected_path else default_config_dir
def _validate_superuser_name(username: str) -> Optional[str]:
if not username:
return "超级管理员用户名不能为空。"
if any(char.isspace() for char in username):
return "超级管理员用户名不能包含空白字符。"
if len(username) > 64:
return "超级管理员用户名长度不能超过 64 个字符。"
return None
def _validate_superuser_password(password: str) -> Optional[str]:
if len(password) < 6 or len(password) > 50:
return "超级管理员密码长度需为 6 到 50 位。"
categories = 0
if re.search(r"[A-Za-z]", password):
categories += 1
if re.search(r"\d", password):
categories += 1
if re.search(r"[^\w\s]", password):
categories += 1
if categories < 2:
return "超级管理员密码需至少包含字母、数字、特殊字符中的两类。"
return None
def _collect_superuser_config(
*,
preset_username: Optional[str] = None,
preset_password: Optional[str] = None,
) -> dict[str, str]:
print_step("超级管理员配置")
default_username = (
preset_username or _env_default("SUPERUSER", "admin")
).strip() or "admin"
while True:
username = _prompt_text("超级管理员用户名", default=default_username).strip()
error = _validate_superuser_name(username)
if not error:
break
print(error)
if preset_password is not None:
password = preset_password.strip()
if not password:
return {"SUPERUSER": username}
error = _validate_superuser_password(password)
if error:
raise RuntimeError(error)
return {
"SUPERUSER": username,
"SUPERUSER_PASSWORD": password,
}
current_password = read_env_value("SUPERUSER_PASSWORD")
while True:
password = _prompt_secret_text(
"超级管理员密码(留空则保留现有值或首次启动时随机生成)",
current_value=current_password,
allow_empty=True,
).strip()
if not password:
return {"SUPERUSER": username}
error = _validate_superuser_password(password)
if error:
print(error)
continue
confirmed = _prompt_secret_text(
"请再次输入超级管理员密码", required=True
).strip()
if password != confirmed:
print("两次输入的超级管理员密码不一致,请重新输入。")
continue
return {
"SUPERUSER": username,
"SUPERUSER_PASSWORD": password,
}
def _collect_path_mapping() -> list[tuple[str, str]]:
if not _prompt_yes_no("是否配置下载器路径映射", default=False):
return []
storage_path = _prompt_path(
"MoviePilot 可访问的下载目录根路径", default=ROOT.parent / "downloads"
)
download_path = _prompt_text("下载器中对应的目录根路径", default="/downloads")
return [(storage_path, download_path)]
def _collect_directory_config() -> dict[str, Any]:
default_download_dir = ROOT.parent / "downloads"
default_library_dir = ROOT.parent / "media"
print_step("目录配置")
download_path = _prompt_path("下载目录", default=default_download_dir)
library_path = _prompt_path("媒体库目录", default=default_library_dir)
transfer_type = _prompt_choice(
"整理方式",
{
"link": "硬链接",
"softlink": "软链接",
"copy": "复制",
"move": "移动",
},
default="link",
)
return {
"name": "默认目录",
"priority": 0,
"storage": "local",
"download_path": download_path,
"monitor_type": "downloader",
"monitor_mode": "fast",
"transfer_type": transfer_type,
"overwrite_mode": "latest",
"library_path": library_path,
"library_storage": "local",
"renaming": False,
"scraping": False,
"notify": True,
"download_type_folder": False,
"download_category_folder": False,
"library_type_folder": True,
"library_category_folder": False,
}
def _collect_database_config() -> dict[str, Any]:
print_step("数据库配置")
current_db_type = _env_default("DB_TYPE", "sqlite").lower()
if current_db_type not in {"sqlite", "postgresql"}:
current_db_type = "sqlite"
db_type = _prompt_choice(
"选择数据库类型",
{
"sqlite": "SQLite",
"postgresql": "PostgreSQL",
},
default=current_db_type,
)
config: dict[str, Any] = {
"DB_TYPE": db_type,
}
if db_type == "sqlite":
return config
config.update(
{
"DB_POSTGRESQL_HOST": _prompt_text(
"PostgreSQL 主机地址",
default=_env_default("DB_POSTGRESQL_HOST", "localhost"),
),
"DB_POSTGRESQL_PORT": _prompt_text(
"PostgreSQL 端口",
default=str(_env_int("DB_POSTGRESQL_PORT", 5432)),
),
"DB_POSTGRESQL_DATABASE": _prompt_text(
"PostgreSQL 数据库名(需已创建)",
default=_env_default("DB_POSTGRESQL_DATABASE", "moviepilot"),
),
"DB_POSTGRESQL_USERNAME": _prompt_text(
"PostgreSQL 用户名",
default=_env_default("DB_POSTGRESQL_USERNAME", "moviepilot"),
),
"DB_POSTGRESQL_PASSWORD": _prompt_secret_text(
"PostgreSQL 密码",
current_value=read_env_value("DB_POSTGRESQL_PASSWORD"),
allow_empty=True,
),
}
)
return config
def _collect_downloader_config() -> Optional[dict[str, Any]]:
print_step("下载器配置")
downloader_type = _prompt_choice(
"选择下载器类型",
{
"skip": "跳过",
"qbittorrent": "qBittorrent",
"transmission": "Transmission",
},
default="skip",
)
if downloader_type == "skip":
return None
config_name = _prompt_text("下载器名称", default=downloader_type)
if downloader_type == "qbittorrent":
host = _prompt_text("qBittorrent 地址", default="http://127.0.0.1:8080")
username = _prompt_text("qBittorrent 用户名", default="admin")
password = _prompt_text("qBittorrent 密码", secret=True)
category = _prompt_yes_no("是否启用 qBittorrent 分类", default=False)
return {
"name": config_name,
"type": "qbittorrent",
"default": True,
"enabled": True,
"config": {
"host": host,
"username": username,
"password": password,
"category": category,
},
"path_mapping": _collect_path_mapping(),
}
host = _prompt_text("Transmission RPC 地址", default="http://127.0.0.1:9091")
username = _prompt_text("Transmission 用户名", allow_empty=True, default="")
password = _prompt_text("Transmission 密码", allow_empty=True, secret=True)
return {
"name": config_name,
"type": "transmission",
"default": True,
"enabled": True,
"config": {
"host": host,
"username": username,
"password": password,
},
"path_mapping": _collect_path_mapping(),
}
def _collect_media_server_config() -> Optional[dict[str, Any]]:
print_step("媒体服务器配置")
server_type = _prompt_choice(
"选择媒体服务器类型",
{
"skip": "跳过",
"emby": "Emby",
"jellyfin": "Jellyfin",
"plex": "Plex",
},
default="skip",
)
if server_type == "skip":
return None
config_name = _prompt_text("媒体服务器名称", default=server_type)
default_host = {
"emby": "http://127.0.0.1:8096",
"jellyfin": "http://127.0.0.1:8096",
"plex": "http://127.0.0.1:32400",
}[server_type]
host = _prompt_text("媒体服务器地址", default=default_host)
play_host = _prompt_text("外部访问地址(可选)", default="", allow_empty=True)
if server_type == "plex":
config = {
"host": host,
"token": _prompt_text("Plex Token", secret=True),
}
else:
config = {
"host": host,
"apikey": _prompt_text("媒体服务器 API Key", secret=True),
}
if server_type == "emby":
username = _prompt_text(
"Emby 管理员用户名(可选)", default="", allow_empty=True
)
if username:
config["username"] = username
if play_host:
config["play_host"] = play_host
return {
"name": config_name,
"type": server_type,
"enabled": True,
"config": config,
"sync_libraries": [],
}
def _collect_notification_config() -> Optional[dict[str, Any]]:
print_step("消息通知配置")
notification_type = _prompt_choice(
"选择通知渠道类型",
{
"skip": "跳过",
"telegram": "Telegram",
"wechat": "企业微信机器人",
"slack": "Slack",
},
default="skip",
)
if notification_type == "skip":
return None
config_name = _prompt_text("通知渠道名称", default=notification_type)
if notification_type == "telegram":
config = {
"TELEGRAM_TOKEN": _prompt_text("Telegram Bot Token", secret=True),
"TELEGRAM_CHAT_ID": _prompt_text("Telegram Chat ID"),
}
api_url = _prompt_text(
"自定义 Telegram API 地址(可选)", default="", allow_empty=True
)
if api_url:
config["API_URL"] = api_url
elif notification_type == "wechat":
config = {
"WECHAT_MODE": "bot",
"WECHAT_BOT_ID": _prompt_text("企业微信机器人 ID"),
"WECHAT_BOT_SECRET": _prompt_text("企业微信机器人 Secret", secret=True),
}
chat_id = _prompt_text("默认发送对象(可选)", default="", allow_empty=True)
admins = _prompt_text(
"管理员用户列表,多个逗号分隔(可选)", default="", allow_empty=True
)
if chat_id:
config["WECHAT_BOT_CHAT_ID"] = chat_id
if admins:
config["WECHAT_ADMINS"] = admins
else:
config = {
"SLACK_OAUTH_TOKEN": _prompt_text("Slack OAuth Token", secret=True),
"SLACK_APP_TOKEN": _prompt_text("Slack App Token", secret=True),
}
channel = _prompt_text("Slack 默认频道(可选)", default="", allow_empty=True)
if channel:
config["SLACK_CHANNEL"] = channel
return {
"name": config_name,
"type": notification_type,
"enabled": True,
"config": config,
"switchs": list(NOTIFICATION_SWITCH_TYPES),
}
def _collect_agent_config() -> dict[str, Any]:
print_step("AI Agent 配置")
enabled = _prompt_yes_no(
"是否启用 AI 智能体",
default=_env_bool("AI_AGENT_ENABLE", False),
)
if not enabled:
return {
"AI_AGENT_ENABLE": False,
"AI_AGENT_GLOBAL": False,
}
current_provider = _env_default("LLM_PROVIDER", "deepseek").lower()
if current_provider not in LLM_PROVIDER_DEFAULTS:
current_provider = "deepseek"
provider = _prompt_choice(
"选择 LLM 提供商",
{
"deepseek": "DeepSeek",
"openai": "OpenAI",
"google": "Google",
},
default=current_provider,
)
defaults = LLM_PROVIDER_DEFAULTS[provider]
current_model = _env_default("LLM_MODEL", defaults["model"])
current_base_url = _env_default("LLM_BASE_URL", defaults["base_url"])
config: dict[str, Any] = {
"AI_AGENT_ENABLE": True,
"AI_AGENT_GLOBAL": _prompt_yes_no(
"是否启用全局 AI 智能体",
default=_env_bool("AI_AGENT_GLOBAL", False),
),
"LLM_PROVIDER": provider,
"LLM_MODEL": _prompt_text(
"LLM 模型名称",
default=current_model,
),
"LLM_API_KEY": _prompt_secret_text(
"LLM API Key",
current_value=read_env_value("LLM_API_KEY"),
required=True,
),
"LLM_THINKING_LEVEL": _prompt_choice(
"LLM 思考模式/深度",
choices={
"off": "关闭思考",
"auto": "自动",
"minimal": "最小",
"low": "",
"medium": "",
"high": "",
"max": "极高",
"xhigh": "超高",
},
default=_env_llm_thinking_level_default(),
),
"LLM_SUPPORT_IMAGE_INPUT": _prompt_yes_no(
"是否启用图片输入支持",
default=_env_bool("LLM_SUPPORT_IMAGE_INPUT", True),
),
}
if provider == "google":
config["LLM_BASE_URL"] = _prompt_text(
"自定义 Google API Base URL可选",
default=current_base_url,
allow_empty=True,
)
else:
config["LLM_BASE_URL"] = _prompt_text(
"LLM Base URL",
default=current_base_url,
allow_empty=True,
)
return config
def _load_auth_site_definitions_inner() -> dict[str, Any]:
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from app.helper.sites import SitesHelper # noqa
auth_sites = SitesHelper().get_authsites() or {}
definitions: dict[str, Any] = {}
for site_key, site_conf in auth_sites.items():
site_name = str(site_conf.get("name") or site_key).strip()
params: dict[str, Any] = {}
for param_key, param_conf in (site_conf.get("params") or {}).items():
params[param_key] = {
"name": str(param_conf.get("name") or param_key).strip(),
"type": str(param_conf.get("type") or "text").strip().lower(),
"placeholder": str(param_conf.get("placeholder") or "").strip(),
"tooltip": str(param_conf.get("tooltip") or "").strip(),
"convert": str(param_conf.get("convert") or "").strip().lower(),
}
if params:
definitions[site_key] = {
"name": site_name,
"params": params,
}
return definitions
def _load_auth_site_definitions(
runtime_python: Optional[Path] = None,
) -> dict[str, Any]:
try:
return _load_auth_site_definitions_inner()
except Exception as exc:
if runtime_python and not _current_python_matches(runtime_python):
try:
with TemporaryDirectory() as temp_dir:
output_path = Path(temp_dir) / "auth-sites.json"
subprocess.run(
[
str(runtime_python),
str(Path(__file__).resolve()),
"query-auth-sites",
"--output-json-file",
str(output_path),
],
cwd=str(ROOT),
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
data = json.loads(output_path.read_text(encoding="utf-8"))
if isinstance(data, dict):
return data
except Exception as runtime_exc:
print_step(
f"当前环境暂时无法读取站点认证资源,已跳过站点认证配置:{runtime_exc}"
)
return {}
print_step(f"当前环境暂时无法读取站点认证资源,已跳过站点认证配置:{exc}")
return {}
def _print_auth_sites(auth_sites: dict[str, Any]) -> None:
print("可用认证站点:")
items = [
f"{site_key}({site_conf.get('name') or site_key})"
for site_key, site_conf in sorted(auth_sites.items())
]
line: list[str] = []
for item in items:
line.append(item)
if len(line) >= 4:
print(f" {' '.join(line)}")
line = []
if line:
print(f" {' '.join(line)}")
def _prompt_auth_param(param_key: str, param_meta: dict[str, Any]) -> Any:
label = str(param_meta.get("name") or param_key).strip()
placeholder = str(param_meta.get("placeholder") or "").strip()
tooltip = str(param_meta.get("tooltip") or "").strip()
prompt_label = label if not placeholder else f"{label} ({placeholder})"
if tooltip:
print(f"{prompt_label}{tooltip}")
while True:
if str(param_meta.get("type") or "text").strip().lower() == "password":
value = _prompt_secret_text(prompt_label, required=True)
else:
value = _prompt_text(prompt_label)
if str(param_meta.get("convert") or "").strip().lower() != "int":
return value
try:
return int(value)
except ValueError:
print("请输入有效数字。")
def _collect_site_auth_config(
runtime_python: Optional[Path] = None,
) -> Optional[dict[str, Any]]:
print_step("用户站点认证配置")
if not _prompt_yes_no("是否配置用户站点认证", default=False):
return None
auth_sites = _load_auth_site_definitions(runtime_python=runtime_python)
if not auth_sites:
print_step("未能读取可用站点认证清单,已跳过用户站点认证配置")
return None
_print_auth_sites(auth_sites)
while True:
selected_site = _prompt_text("请输入认证站点代号").strip().lower()
if selected_site in auth_sites:
break
print("请输入上面列表中的站点代号。")
site_conf = auth_sites[selected_site]
print_step(f"正在配置站点认证:{site_conf.get('name') or selected_site}")
params = {
param_key: _prompt_auth_param(param_key, param_meta)
for param_key, param_meta in (site_conf.get("params") or {}).items()
}
return {
"site": selected_site,
"params": params,
}
def _collect_autostart_config() -> dict[str, Any]:
print_step("开机自启配置")
current_status = _autostart_status()
default_enabled = bool(current_status.get("enabled")) or _env_bool(
AUTOSTART_ENV_KEY, False
)
if current_status.get("enabled"):
print(
f"当前已检测到开机自启:{current_status.get('label') or _startup_platform_name()}"
)
else:
print(f"当前系统将使用:{_startup_platform_name()}")
enabled = _prompt_yes_no("是否设置开机自启", default=default_enabled)
return {"enabled": enabled}
def run_setup_wizard(
force_token: bool,
runtime_python: Optional[Path] = None,
preset_superuser: Optional[str] = None,
preset_superuser_password: Optional[str] = None,
) -> dict[str, Any]:
if not _is_interactive():
raise RuntimeError(
"交互式向导需要在终端中运行,请直接执行 moviepilot setup --wizard 或 moviepilot init --wizard"
)
print_step("启动本地初始化向导,直接回车可接受默认值,部分步骤可选择跳过")
existing_token = read_env_value("API_TOKEN") or ""
if existing_token and len(existing_token) >= 16 and not force_token:
if _prompt_yes_no("检测到现有 API_TOKEN是否继续使用", default=True):
api_token = ensure_api_token(force_token=False)
else:
if _prompt_yes_no("是否自动生成新的 API_TOKEN", default=True):
api_token = ensure_api_token(force_token=True)
else:
while True:
custom_token = _prompt_text(
"请输入新的 API_TOKEN至少 16 位)", secret=True
)
if len(custom_token) >= 16:
api_token = ensure_api_token(
force_token=True, token=custom_token
)
break
print("API_TOKEN 长度不能少于 16 个字符。")
else:
if _prompt_yes_no("是否自动生成 API_TOKEN", default=True):
api_token = ensure_api_token(
force_token=force_token or bool(existing_token)
)
else:
while True:
custom_token = _prompt_text(
"请输入 API_TOKEN至少 16 位)", secret=True
)
if len(custom_token) >= 16:
api_token = ensure_api_token(force_token=True, token=custom_token)
break
print("API_TOKEN 长度不能少于 16 个字符。")
return {
"api_token": api_token,
"env_settings": {
**_collect_superuser_config(
preset_username=preset_superuser,
preset_password=preset_superuser_password,
),
**_collect_database_config(),
**_collect_agent_config(),
},
"directories": [_collect_directory_config()],
"downloader": _collect_downloader_config(),
"mediaserver": _collect_media_server_config(),
"notification": _collect_notification_config(),
"site_auth": _collect_site_auth_config(runtime_python=runtime_python),
"autostart": _collect_autostart_config(),
}
def _merge_named_item(existing_items: list[dict], new_item: dict) -> list[dict]:
merged = list(existing_items or [])
new_name = new_item.get("name")
for index, item in enumerate(merged):
if item.get("name") == new_name:
merged[index] = new_item
return merged
merged.append(new_item)
return merged
def _merge_directory_item(existing_items: list[dict], new_item: dict) -> list[dict]:
merged = list(existing_items or [])
for index, item in enumerate(merged):
if item.get("name") == new_item.get("name") or (
item.get("download_path") == new_item.get("download_path")
and item.get("library_path") == new_item.get("library_path")
):
new_copy = dict(new_item)
new_copy["priority"] = item.get("priority", new_item.get("priority", 0))
merged[index] = new_copy
return merged
new_copy = dict(new_item)
max_priority = max(
(int(item.get("priority", 0) or 0) for item in merged), default=-1
)
new_copy["priority"] = (
max_priority + 1 if merged else int(new_item.get("priority", 0) or 0)
)
merged.append(new_copy)
return merged
def _merge_notification_switches(existing_items: list[dict]) -> list[dict]:
merged: dict[str, dict] = {}
for item in existing_items or []:
switch_type = str(item.get("type") or "").strip()
if switch_type:
merged[switch_type] = dict(item)
for switch_type in NOTIFICATION_SWITCH_TYPES:
merged.setdefault(
switch_type,
{
"type": switch_type,
"action": "all",
},
)
preferred_order = [
switch for switch in NOTIFICATION_SWITCH_TYPES if switch in merged
]
extras = [key for key in merged if key not in preferred_order]
return [merged[key] for key in [*preferred_order, *extras]]
def _apply_local_system_config_inner(config_payload: dict[str, Any]) -> None:
for directory in config_payload.get("directories") or []:
download_path = directory.get("download_path")
library_path = directory.get("library_path")
if download_path:
Path(download_path).mkdir(parents=True, exist_ok=True)
if library_path:
Path(library_path).mkdir(parents=True, exist_ok=True)
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
try:
from app.db.init import init_db, update_db
from app.db.systemconfig_oper import SystemConfigOper
from app.schemas.types import SystemConfigKey
except ModuleNotFoundError as exc:
raise RuntimeError(
"当前环境尚未安装 MoviePilot 运行依赖,请先执行 moviepilot install deps 或 moviepilot setup"
) from exc
init_db()
generated_password = _prepare_superuser_password_for_bootstrap()
update_db()
_ensure_superuser_account_inner()
if generated_password:
print_step(f"超级管理员初始密码:{generated_password}")
system_config = SystemConfigOper()
directory_items = config_payload.get("directories") or []
if directory_items:
current_directories = system_config.get(SystemConfigKey.Directories) or []
for item in directory_items:
current_directories = _merge_directory_item(current_directories, item)
system_config.set(SystemConfigKey.Directories, current_directories)
downloader_item = config_payload.get("downloader")
if downloader_item:
current_downloaders = system_config.get(SystemConfigKey.Downloaders) or []
current_downloaders = _merge_named_item(current_downloaders, downloader_item)
system_config.set(SystemConfigKey.Downloaders, current_downloaders)
mediaserver_item = config_payload.get("mediaserver")
if mediaserver_item:
current_servers = system_config.get(SystemConfigKey.MediaServers) or []
current_servers = _merge_named_item(current_servers, mediaserver_item)
system_config.set(SystemConfigKey.MediaServers, current_servers)
notification_item = config_payload.get("notification")
if notification_item:
current_notifications = system_config.get(SystemConfigKey.Notifications) or []
current_notifications = _merge_named_item(
current_notifications, notification_item
)
system_config.set(SystemConfigKey.Notifications, current_notifications)
current_switches = system_config.get(SystemConfigKey.NotificationSwitchs) or []
system_config.set(
SystemConfigKey.NotificationSwitchs,
_merge_notification_switches(current_switches),
)
site_auth_item = config_payload.get("site_auth")
if (
isinstance(site_auth_item, dict)
and site_auth_item.get("site")
and site_auth_item.get("params")
):
system_config.set(SystemConfigKey.UserSiteAuthParams, site_auth_item)
try:
from app.helper.sites import SitesHelper # noqa
status, msg = SitesHelper().check_user(
site_auth_item.get("site"), site_auth_item.get("params")
)
if status:
print_step(f"站点认证校验成功:{msg}")
else:
print_step(f"已保存站点认证配置,当前校验未通过:{msg}")
except Exception as exc:
print_step(f"已保存站点认证配置,当前未完成校验:{exc}")
system_config.set(SystemConfigKey.SetupWizardState, True)
print_step("已写入本地系统配置")
def _current_python_matches(target_python: Optional[Path]) -> bool:
if not target_python:
return True
current_python = Path(sys.executable).expanduser()
target_python = target_python.expanduser()
if not current_python.is_absolute():
current_python = (ROOT / current_python).absolute()
if not target_python.is_absolute():
target_python = (ROOT / target_python).absolute()
return str(current_python) == str(target_python)
def _ensure_superuser_account_inner() -> None:
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from app.core.config import settings
from app.core.security import get_password_hash
from app.db.user_oper import UserOper
username = str(settings.SUPERUSER or "").strip()
username_error = _validate_superuser_name(username)
if username_error:
raise RuntimeError(username_error)
password = str(settings.SUPERUSER_PASSWORD or "").strip()
if password:
password_error = _validate_superuser_password(password)
if password_error:
raise RuntimeError(password_error)
user_oper = UserOper()
user = user_oper.get_by_name(username)
if not user:
init_password = password or secrets.token_urlsafe(16)
user_oper.add(
name=username,
email="admin@movie-pilot.org",
hashed_password=get_password_hash(init_password),
is_active=True,
is_superuser=True,
avatar="",
)
print_step(f"已创建超级管理员用户:{username}")
if not password:
print_step(f"超级管理员初始密码:{init_password}")
return
update_payload: dict[str, Any] = {}
if not user.is_active:
update_payload["is_active"] = True
if not user.is_superuser:
update_payload["is_superuser"] = True
if password:
update_payload["hashed_password"] = get_password_hash(password)
if update_payload:
user.update(user_oper._db, update_payload)
if password:
print_step(f"已同步超级管理员账号与密码:{username}")
else:
print_step(f"已同步超级管理员账号权限:{username}")
else:
print_step(f"已确认超级管理员账号:{username}")
def _prepare_superuser_password_for_bootstrap() -> Optional[str]:
from app.core.config import settings
from app.db.user_oper import UserOper
username = str(settings.SUPERUSER or "").strip()
username_error = _validate_superuser_name(username)
if username_error:
raise RuntimeError(username_error)
if str(settings.SUPERUSER_PASSWORD or "").strip():
return None
if UserOper().get_by_name(username):
return None
generated_password = secrets.token_urlsafe(16)
settings.SUPERUSER_PASSWORD = generated_password
return generated_password
def _sync_superuser_account_inner() -> None:
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
try:
from app.db.init import init_db, update_db
except ModuleNotFoundError as exc:
raise RuntimeError(
"当前环境尚未安装 MoviePilot 运行依赖,请先执行 moviepilot install deps 或 moviepilot setup"
) from exc
init_db()
generated_password = _prepare_superuser_password_for_bootstrap()
update_db()
_ensure_superuser_account_inner()
if generated_password:
print_step(f"超级管理员初始密码:{generated_password}")
def sync_superuser_account(runtime_python: Optional[Path] = None) -> None:
if _current_python_matches(runtime_python):
_sync_superuser_account_inner()
return
run(
[
str(runtime_python),
str(Path(__file__).resolve()),
"sync-superuser",
],
cwd=ROOT,
)
def apply_local_system_config(
config_payload: dict[str, Any], runtime_python: Optional[Path] = None
) -> None:
if _current_python_matches(runtime_python):
_apply_local_system_config_inner(config_payload)
return
with TemporaryDirectory() as temp_dir:
payload_path = Path(temp_dir) / "moviepilot-config.json"
payload_path.write_text(
json.dumps(config_payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
run(
[
str(runtime_python),
str(Path(__file__).resolve()),
"apply-config",
"--config-json-file",
str(payload_path),
],
cwd=ROOT,
)
def _apply_autostart_choice(
autostart_payload: Optional[dict[str, Any]],
*,
config_dir: Path,
runtime_python: Optional[Path],
venv_dir: Optional[Path],
) -> None:
if not isinstance(autostart_payload, dict):
return
if autostart_payload.get("enabled"):
result = enable_autostart(
config_dir=config_dir,
runtime_python=runtime_python,
venv_dir=venv_dir,
)
print_step(f"已启用开机自启:{result.get('method')}")
if result.get("artifact"):
print(f" 注册文件:{result['artifact']}")
if result.get("note"):
print(f" 说明:{result['note']}")
return
result = disable_autostart()
removed_paths = result.get("removed_paths") or []
if removed_paths:
print_step("已取消开机自启注册")
else:
print_step("当前未配置开机自启,无需取消")
def init_local(
*,
resources_repo: Optional[Path],
resource_dir: Optional[Path],
skip_resources: bool,
resources_ready: bool,
force_token: bool,
wizard: bool,
superuser: Optional[str],
superuser_password: Optional[str],
runtime_python: Optional[Path] = None,
venv_dir: Optional[Path] = None,
) -> None:
ensure_local_dirs()
wizard_payload: Optional[dict[str, Any]] = None
direct_env_settings: dict[str, str] = {}
if superuser:
superuser = superuser.strip()
error = _validate_superuser_name(superuser)
if error:
raise RuntimeError(error)
direct_env_settings["SUPERUSER"] = superuser
if superuser_password is not None:
superuser_password = superuser_password.strip()
if superuser_password:
error = _validate_superuser_password(superuser_password)
if error:
raise RuntimeError(error)
direct_env_settings["SUPERUSER_PASSWORD"] = superuser_password
if wizard:
wizard_payload = run_setup_wizard(
force_token=force_token,
runtime_python=runtime_python,
preset_superuser=direct_env_settings.get("SUPERUSER"),
preset_superuser_password=direct_env_settings.get("SUPERUSER_PASSWORD"),
)
else:
ensure_api_token(force_token=force_token)
if direct_env_settings:
write_env_values(direct_env_settings)
print_step(f"已写入环境配置到 {ENV_FILE}")
if wizard_payload and wizard_payload.get("env_settings"):
write_env_values(wizard_payload["env_settings"])
print_step(f"已写入环境配置到 {ENV_FILE}")
if skip_resources:
if resources_ready:
print_step("资源文件已完成同步")
else:
print_step("已跳过资源初始化")
else:
install_resources(resources_repo=resources_repo, resource_dir=resource_dir)
if wizard_payload:
apply_local_system_config(wizard_payload, runtime_python=runtime_python)
elif direct_env_settings:
sync_superuser_account(runtime_python=runtime_python)
if wizard_payload:
try:
_apply_autostart_choice(
wizard_payload.get("autostart"),
config_dir=CONFIG_DIR,
runtime_python=runtime_python,
venv_dir=venv_dir,
)
except Exception as exc:
print_step(f"开机自启配置未完成:{exc}")
def install_deps(*, python_bin: str, venv_dir: Path, recreate: bool) -> Path:
ensure_supported_python(python_bin)
venv_dir = venv_dir.expanduser().resolve()
venv_python = get_venv_python(venv_dir)
print_step(f"使用 Python 解释器:{python_bin}")
if recreate and venv_dir.exists():
print_step(f"删除已有虚拟环境:{venv_dir}")
shutil.rmtree(venv_dir)
if not venv_python.exists():
print_step(f"创建虚拟环境:{venv_dir}")
run([python_bin, "-m", "venv", str(venv_dir)])
else:
print_step(f"复用已有虚拟环境:{venv_dir}")
print_step("升级 pip")
run([str(venv_python), "-m", "pip", "install", "--upgrade", "pip"])
print_step("安装项目依赖")
run(
[str(venv_python), "-m", "pip", "install", "-r", str(ROOT / "requirements.txt")]
)
return venv_python
def _startup_platform_name() -> str:
system = platform.system()
if system == "Darwin":
return "macOS LaunchAgent"
if system == "Linux":
return "Linux systemd/XDG"
if system == "Windows":
return "Windows Startup"
return system or "unknown"
def _runtime_python_candidates(
runtime_python: Optional[Path], venv_dir: Optional[Path]
) -> list[Path]:
candidates: list[Path] = []
seen: set[str] = set()
raw_candidates = [
runtime_python,
get_venv_python((venv_dir or (ROOT / "venv")).expanduser().resolve()),
Path(sys.executable) if sys.executable else None,
]
for candidate in raw_candidates:
if not candidate:
continue
resolved = Path(candidate).expanduser().resolve()
key = str(resolved)
if key in seen:
continue
seen.add(key)
candidates.append(resolved)
return candidates
def _can_run_moviepilot_cli(python_bin: Path) -> bool:
if not python_bin.exists():
return False
result = subprocess.run(
[str(python_bin), "-m", "app.cli", "--help"],
cwd=str(ROOT),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
return result.returncode == 0
def _resolve_runtime_python_for_startup(
runtime_python: Optional[Path], venv_dir: Optional[Path]
) -> Path:
for candidate in _runtime_python_candidates(runtime_python, venv_dir):
if _can_run_moviepilot_cli(candidate):
return candidate
raise RuntimeError(
"未找到可用于启动 MoviePilot 的 Python 运行环境,请先执行 moviepilot install deps 或 moviepilot setup"
)
def _linux_user_systemd_dir() -> Path:
return (
Path(os.getenv("XDG_CONFIG_HOME") or (Path.home() / ".config"))
/ "systemd"
/ "user"
)
def _linux_xdg_autostart_dir() -> Path:
return Path(os.getenv("XDG_CONFIG_HOME") or (Path.home() / ".config")) / "autostart"
def _macos_launch_agent_path() -> Path:
return Path.home() / "Library" / "LaunchAgents" / f"{MACOS_LAUNCH_AGENT_LABEL}.plist"
def _linux_systemd_unit_path() -> Path:
return _linux_user_systemd_dir() / LINUX_SYSTEMD_UNIT_NAME
def _linux_xdg_autostart_path() -> Path:
return _linux_xdg_autostart_dir() / LINUX_XDG_AUTOSTART_FILENAME
def _windows_startup_dir() -> Path:
appdata = os.getenv("APPDATA")
if appdata:
return (
Path(appdata)
/ "Microsoft"
/ "Windows"
/ "Start Menu"
/ "Programs"
/ "Startup"
)
return (
Path.home()
/ "AppData"
/ "Roaming"
/ "Microsoft"
/ "Windows"
/ "Start Menu"
/ "Programs"
/ "Startup"
)
def _windows_startup_path() -> Path:
return _windows_startup_dir() / WINDOWS_STARTUP_FILENAME
def _launcher_paths_for_platform(system_name: Optional[str] = None) -> list[Path]:
system_name = system_name or platform.system()
if system_name == "Windows":
return [AUTOSTART_WINDOWS_LAUNCHER]
return [AUTOSTART_UNIX_LAUNCHER]
def _cleanup_startup_launchers(system_name: Optional[str] = None) -> None:
for path in _launcher_paths_for_platform(system_name):
if path.exists():
_remove_path(path)
if AUTOSTART_RUNTIME_DIR.exists() and not any(AUTOSTART_RUNTIME_DIR.iterdir()):
AUTOSTART_RUNTIME_DIR.rmdir()
def _write_unix_startup_launcher(config_dir: Path, python_bin: Path) -> Path:
AUTOSTART_RUNTIME_DIR.mkdir(parents=True, exist_ok=True)
launcher_content = textwrap.dedent(
f"""\
#!/usr/bin/env bash
set -euo pipefail
export CONFIG_DIR={shlex.quote(str(config_dir))}
cd {shlex.quote(str(ROOT))}
exec {shlex.quote(str(python_bin))} -m app.cli start --timeout {AUTOSTART_TIMEOUT}
"""
)
AUTOSTART_UNIX_LAUNCHER.write_text(launcher_content, encoding="utf-8")
AUTOSTART_UNIX_LAUNCHER.chmod(0o755)
return AUTOSTART_UNIX_LAUNCHER
def _write_windows_startup_launcher(config_dir: Path, python_bin: Path) -> Path:
AUTOSTART_RUNTIME_DIR.mkdir(parents=True, exist_ok=True)
launcher_content = textwrap.dedent(
f"""\
@echo off
setlocal
set "CONFIG_DIR={config_dir}"
cd /d "{ROOT}"
"{python_bin}" -m app.cli start --timeout {AUTOSTART_TIMEOUT}
endlocal
"""
)
AUTOSTART_WINDOWS_LAUNCHER.write_text(launcher_content, encoding="utf-8")
return AUTOSTART_WINDOWS_LAUNCHER
def _double_quote(value: Any) -> str:
escaped = str(value).replace("\\", "\\\\").replace('"', '\\"')
return f'"{escaped}"'
def _run_optional_command(command: list[str]) -> subprocess.CompletedProcess[str]:
return subprocess.run(
command,
cwd=str(ROOT),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
errors="ignore",
check=False,
)
def _last_command_line(result: subprocess.CompletedProcess[str]) -> str:
lines = [line.strip() for line in (result.stdout or "").splitlines() if line.strip()]
return lines[-1] if lines else "命令未返回更多信息"
def _linux_linger_enabled() -> Optional[bool]:
loginctl_bin = shutil.which("loginctl")
if not loginctl_bin:
return None
result = _run_optional_command(
[loginctl_bin, "show-user", getpass.getuser(), "-p", "Linger", "--value"]
)
if result.returncode != 0:
return None
value = (result.stdout or "").strip().lower()
if value in {"yes", "no"}:
return value == "yes"
return None
def _autostart_status() -> dict[str, Any]:
system_name = platform.system()
if system_name == "Darwin":
artifact = _macos_launch_agent_path()
return {
"enabled": artifact.exists(),
"method": "launchagent",
"label": "LaunchAgent",
"artifact": artifact,
}
if system_name == "Linux":
systemd_unit = _linux_systemd_unit_path()
if systemd_unit.exists():
return {
"enabled": True,
"method": "systemd-user",
"label": "systemd --user",
"artifact": systemd_unit,
"linger_enabled": _linux_linger_enabled(),
}
desktop_file = _linux_xdg_autostart_path()
return {
"enabled": desktop_file.exists(),
"method": "xdg-autostart" if desktop_file.exists() else "none",
"label": "XDG autostart" if desktop_file.exists() else "not-configured",
"artifact": desktop_file if desktop_file.exists() else None,
}
if system_name == "Windows":
artifact = _windows_startup_path()
return {
"enabled": artifact.exists(),
"method": "startup-folder",
"label": "Startup Folder",
"artifact": artifact,
}
return {
"enabled": False,
"method": "unsupported",
"label": _startup_platform_name(),
"artifact": None,
}
def _enable_autostart_macos(config_dir: Path, python_bin: Path) -> dict[str, Any]:
launcher = _write_unix_startup_launcher(config_dir=config_dir, python_bin=python_bin)
agent_path = _macos_launch_agent_path()
agent_path.parent.mkdir(parents=True, exist_ok=True)
LOG_DIR.mkdir(parents=True, exist_ok=True)
plist_content = textwrap.dedent(
f"""\
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>{MACOS_LAUNCH_AGENT_LABEL}</string>
<key>ProgramArguments</key>
<array>
<string>/bin/bash</string>
<string>{launcher}</string>
</array>
<key>WorkingDirectory</key>
<string>{ROOT}</string>
<key>RunAtLoad</key>
<true/>
<key>StandardOutPath</key>
<string>{LOG_DIR / "moviepilot.launchagent.stdout.log"}</string>
<key>StandardErrorPath</key>
<string>{LOG_DIR / "moviepilot.launchagent.stderr.log"}</string>
</dict>
</plist>
"""
)
agent_path.write_text(plist_content, encoding="utf-8")
uid = str(os.getuid())
_run_optional_command(["launchctl", "bootout", f"gui/{uid}", str(agent_path)])
bootstrap_result = _run_optional_command(
["launchctl", "bootstrap", f"gui/{uid}", str(agent_path)]
)
if bootstrap_result.returncode != 0:
note = _last_command_line(bootstrap_result)
else:
enable_result = _run_optional_command(
["launchctl", "enable", f"gui/{uid}/{MACOS_LAUNCH_AGENT_LABEL}"]
)
note = (
_last_command_line(enable_result)
if enable_result.returncode != 0
else "已加载到当前登录会话"
)
write_env_value(AUTOSTART_ENV_KEY, "true")
return {
"method": "LaunchAgent",
"artifact": agent_path,
"note": note,
}
def _enable_autostart_linux_systemd(
config_dir: Path, python_bin: Path
) -> Optional[dict[str, Any]]:
systemctl_bin = shutil.which("systemctl")
if not systemctl_bin:
return None
launcher = _write_unix_startup_launcher(config_dir=config_dir, python_bin=python_bin)
unit_path = _linux_systemd_unit_path()
unit_path.parent.mkdir(parents=True, exist_ok=True)
unit_content = textwrap.dedent(
f"""\
[Unit]
Description=MoviePilot local autostart
Wants=network-online.target
After=network-online.target
[Service]
Type=oneshot
WorkingDirectory={ROOT}
ExecStart=/bin/bash {_double_quote(launcher)}
[Install]
WantedBy=default.target
"""
)
unit_path.write_text(unit_content, encoding="utf-8")
_run_optional_command([systemctl_bin, "--user", "daemon-reload"])
enable_result = _run_optional_command(
[systemctl_bin, "--user", "enable", LINUX_SYSTEMD_UNIT_NAME]
)
if enable_result.returncode != 0:
_remove_path(unit_path)
_run_optional_command([systemctl_bin, "--user", "daemon-reload"])
return None
start_result = _run_optional_command(
[systemctl_bin, "--user", "start", LINUX_SYSTEMD_UNIT_NAME]
)
desktop_path = _linux_xdg_autostart_path()
if desktop_path.exists():
_remove_path(desktop_path)
note = (
_last_command_line(start_result)
if start_result.returncode != 0
else "已注册 systemd --user 并尝试在当前会话执行一次"
)
linger_enabled = _linux_linger_enabled()
if linger_enabled is False:
note += ";如需无人登录时随系统启动,请手动执行 sudo loginctl enable-linger $USER"
write_env_value(AUTOSTART_ENV_KEY, "true")
return {
"method": "systemd --user",
"artifact": unit_path,
"note": note,
}
def _enable_autostart_linux_xdg(config_dir: Path, python_bin: Path) -> dict[str, Any]:
launcher = _write_unix_startup_launcher(config_dir=config_dir, python_bin=python_bin)
desktop_path = _linux_xdg_autostart_path()
desktop_path.parent.mkdir(parents=True, exist_ok=True)
unit_path = _linux_systemd_unit_path()
if unit_path.exists():
_remove_path(unit_path)
systemctl_bin = shutil.which("systemctl")
if systemctl_bin:
_run_optional_command([systemctl_bin, "--user", "daemon-reload"])
desktop_content = textwrap.dedent(
f"""\
[Desktop Entry]
Type=Application
Version=1.0
Name=MoviePilot
Comment=Start MoviePilot on login
Exec=/bin/bash {_double_quote(launcher)}
Path={ROOT}
Terminal=false
X-GNOME-Autostart-enabled=true
"""
)
desktop_path.write_text(desktop_content, encoding="utf-8")
write_env_value(AUTOSTART_ENV_KEY, "true")
return {
"method": "XDG autostart",
"artifact": desktop_path,
"note": "当前环境未启用 systemd --user已回退为图形会话登录自启动",
}
def _enable_autostart_windows(config_dir: Path, python_bin: Path) -> dict[str, Any]:
launcher = _write_windows_startup_launcher(config_dir=config_dir, python_bin=python_bin)
startup_path = _windows_startup_path()
startup_path.parent.mkdir(parents=True, exist_ok=True)
startup_content = textwrap.dedent(
f"""\
@echo off
call "{launcher}"
"""
)
startup_path.write_text(startup_content, encoding="utf-8")
write_env_value(AUTOSTART_ENV_KEY, "true")
return {
"method": "Startup Folder",
"artifact": startup_path,
"note": "将在当前用户登录 Windows 后自动启动",
}
def enable_autostart(
*, config_dir: Path, runtime_python: Optional[Path], venv_dir: Optional[Path]
) -> dict[str, Any]:
config_dir = config_dir.expanduser().resolve()
python_bin = _resolve_runtime_python_for_startup(runtime_python, venv_dir)
system_name = platform.system()
if system_name == "Darwin":
return _enable_autostart_macos(config_dir=config_dir, python_bin=python_bin)
if system_name == "Linux":
return _enable_autostart_linux_systemd(
config_dir=config_dir, python_bin=python_bin
) or _enable_autostart_linux_xdg(config_dir=config_dir, python_bin=python_bin)
if system_name == "Windows":
return _enable_autostart_windows(config_dir=config_dir, python_bin=python_bin)
raise RuntimeError(f"当前系统暂不支持自动注册开机自启:{platform.system()}")
def disable_autostart() -> dict[str, Any]:
system_name = platform.system()
removed_paths: list[Path] = []
if system_name == "Darwin":
agent_path = _macos_launch_agent_path()
uid = str(os.getuid())
_run_optional_command(["launchctl", "bootout", f"gui/{uid}", str(agent_path)])
if agent_path.exists():
_remove_path(agent_path)
removed_paths.append(agent_path)
_cleanup_startup_launchers(system_name)
elif system_name == "Linux":
systemctl_bin = shutil.which("systemctl")
unit_path = _linux_systemd_unit_path()
desktop_path = _linux_xdg_autostart_path()
if systemctl_bin:
_run_optional_command(
[systemctl_bin, "--user", "disable", LINUX_SYSTEMD_UNIT_NAME]
)
_run_optional_command([systemctl_bin, "--user", "daemon-reload"])
for path in (unit_path, desktop_path):
if path.exists():
_remove_path(path)
removed_paths.append(path)
_cleanup_startup_launchers(system_name)
elif system_name == "Windows":
startup_path = _windows_startup_path()
for path in (startup_path, AUTOSTART_WINDOWS_LAUNCHER):
if path.exists():
_remove_path(path)
removed_paths.append(path)
_cleanup_startup_launchers(system_name)
else:
raise RuntimeError(f"当前系统暂不支持自动取消开机自启:{platform.system()}")
write_env_value(AUTOSTART_ENV_KEY, "false")
return {"removed_paths": removed_paths}
def print_autostart_status() -> None:
status = _autostart_status()
if not status.get("enabled"):
print_step(f"当前未启用开机自启({_startup_platform_name()}")
return
print_step(
f"当前已启用开机自启:{status.get('label') or _startup_platform_name()}"
)
artifact = status.get("artifact")
if artifact:
print(f" 注册文件:{artifact}")
linger_enabled = status.get("linger_enabled")
if linger_enabled is False:
print(
" 说明:当前为 systemd --user 模式,通常会在用户登录后启动;如需无人登录即启动,请手动启用 linger。"
)
def _read_runtime_file(path: Path) -> Optional[dict[str, Any]]:
if not path.exists():
return None
try:
return json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
def _pid_exists(pid: int) -> bool:
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
return True
def _read_process_start_time(pid: int) -> Optional[float]:
try:
output = capture(["ps", "-p", str(pid), "-o", "lstart="])
except (OSError, subprocess.CalledProcessError):
return None
started = output.strip()
if not started:
return None
try:
return datetime.strptime(started, "%a %b %d %H:%M:%S %Y").timestamp()
except ValueError:
return None
def _services_running() -> list[str]:
running: list[str] = []
runtime_files = {
"backend": TEMP_DIR / "moviepilot.runtime.json",
"frontend": TEMP_DIR / "moviepilot.frontend.runtime.json",
}
for name, runtime_file in runtime_files.items():
payload = _read_runtime_file(runtime_file)
pid = payload.get("pid") if isinstance(payload, dict) else None
if not pid:
continue
try:
pid_int = int(pid)
except (TypeError, ValueError):
continue
if not _pid_exists(pid_int):
continue
runtime_start_time = payload.get("create_time") if isinstance(payload, dict) else None
process_start_time = _read_process_start_time(pid_int)
if runtime_start_time is not None and process_start_time is not None:
try:
if abs(process_start_time - float(runtime_start_time)) > 3:
continue
except (TypeError, ValueError):
pass
running.append(name)
return running
def ensure_services_stopped() -> None:
running = _services_running()
if running:
raise RuntimeError(
"检测到本地服务仍在运行(%s),请先执行 `moviepilot stop` 后再更新。"
% ", ".join(running)
)
def _stop_managed_services(venv_dir: Path) -> None:
venv_dir = venv_dir.expanduser().resolve()
venv_python = get_venv_python(venv_dir)
if venv_python.exists():
print_step("停止本地前后端服务")
run(
[str(venv_python), "-m", "app.cli", "stop", "--timeout", "30", "--force"],
cwd=ROOT,
)
return
running = _services_running()
if running:
raise RuntimeError(
"检测到本地服务仍在运行(%s),但当前未找到虚拟环境 %s,无法安全停止。"
" 请先执行 `moviepilot stop`,或在卸载时通过 `--venv PATH` 指定正确的虚拟环境目录。"
% (", ".join(running), venv_dir)
)
def _collect_cli_link_candidates(
*, command_path: Optional[str] = None, launch_path: Optional[str] = None
) -> list[Path]:
candidates: list[Path] = []
seen: set[str] = set()
raw_candidates = [
launch_path,
command_path,
os.getenv("MOVIEPILOT_LAUNCH_PATH"),
os.getenv("MOVIEPILOT_COMMAND_PATH"),
shutil.which("moviepilot"),
]
for raw_value in raw_candidates:
if not raw_value:
continue
candidate = Path(raw_value).expanduser()
if not candidate.is_absolute():
candidate = (ROOT / candidate).resolve()
try:
key = str(candidate.resolve())
except OSError:
key = str(candidate)
if key in seen:
continue
seen.add(key)
candidates.append(candidate)
return candidates
def _remove_cli_symlinks(
*, command_path: Optional[str] = None, launch_path: Optional[str] = None
) -> list[Path]:
removed: list[Path] = []
script_path = (ROOT / "moviepilot").resolve()
for candidate in _collect_cli_link_candidates(
command_path=command_path, launch_path=launch_path
):
if not candidate.is_symlink():
continue
try:
if candidate.resolve() != script_path:
continue
except OSError:
continue
candidate.unlink()
removed.append(candidate)
return removed
def _remove_runtime_state_files() -> list[Path]:
removed: list[Path] = []
for path in (
TEMP_DIR / "moviepilot.runtime.json",
TEMP_DIR / "moviepilot.frontend.runtime.json",
):
if not path.exists():
continue
_remove_path(path)
removed.append(path)
return removed
def _remove_installed_resource_files() -> list[Path]:
removed: list[Path] = []
seen: set[Path] = set()
for pattern in RESOURCE_FILE_PATTERNS:
for path in sorted(HELPER_DIR.glob(pattern)):
if path in seen or not path.exists() or path.is_dir():
continue
_remove_path(path)
removed.append(path)
seen.add(path)
return removed
def _remove_config_data(config_dir: Path) -> list[Path]:
config_dir = config_dir.expanduser().resolve()
removed: list[Path] = []
if config_dir.exists():
_remove_path(config_dir)
removed.append(config_dir)
return removed
def uninstall_local(
*,
venv_dir: Path,
config_dir: Path,
command_path: Optional[str] = None,
launch_path: Optional[str] = None,
) -> dict[str, Any]:
if not _is_interactive():
raise RuntimeError("卸载命令需要在交互式终端中运行,以完成两次确认。")
venv_dir = venv_dir.expanduser().resolve()
config_dir = config_dir.expanduser().resolve()
cli_links = _collect_cli_link_candidates(
command_path=command_path, launch_path=launch_path
)
script_path = (ROOT / "moviepilot").resolve()
linked_cli_paths = [
path
for path in cli_links
if path.is_symlink() and path.exists() and path.resolve() == script_path
]
autostart_status = _autostart_status()
delete_config = _prompt_yes_no(
f"是否同时删除配置目录 {config_dir}", default=False
)
print_step("卸载将执行以下操作")
print(f" - 保留源码目录:{ROOT}")
print(f" - 删除虚拟环境:{venv_dir}")
print(f" - 删除前端运行时目录:{PUBLIC_DIR}")
print(f" - 删除本地 Node 运行时目录:{RUNTIME_DIR}")
print(f" - 删除资源文件:{HELPER_DIR}/sites*、{HELPER_DIR}/user.sites*.bin")
if linked_cli_paths:
print(" - 删除全局 CLI 软链接:")
for path in linked_cli_paths:
print(f" {path}")
else:
print(" - 未检测到指向当前仓库的全局 CLI 软链接")
if autostart_status.get("enabled"):
print(
f" - 取消开机自启:{autostart_status.get('label') or _startup_platform_name()}"
)
else:
print(" - 当前未配置开机自启")
if delete_config:
print(f" - 删除配置目录:{config_dir}")
if config_dir == LEGACY_CONFIG_DIR.resolve():
print(" 包括 legacy config 目录中的 category.yaml 等配置文件")
else:
print(f" - 保留配置目录:{config_dir}")
if not _prompt_yes_no("第一次确认:是否继续卸载 MoviePilot", default=False):
print_step("已取消卸载")
return {"cancelled": True}
confirm_text = _prompt_text(
f"第二次确认:请输入 {UNINSTALL_CONFIRM_TEXT} 以继续",
allow_empty=False,
)
if confirm_text != UNINSTALL_CONFIRM_TEXT:
print_step("确认文本不匹配,已取消卸载")
return {"cancelled": True}
_stop_managed_services(venv_dir=venv_dir)
if autostart_status.get("enabled"):
disable_autostart()
removed_paths: list[Path] = []
removed_paths.extend(
_remove_cli_symlinks(command_path=command_path, launch_path=launch_path)
)
removed_paths.extend(_remove_runtime_state_files())
removed_paths.extend(_remove_installed_resource_files())
for path in (venv_dir, RUNTIME_DIR, PUBLIC_DIR):
if not path.exists():
continue
_remove_path(path)
removed_paths.append(path)
removed_config_paths: list[Path] = []
if delete_config:
removed_config_paths = _remove_config_data(config_dir)
removed_paths.extend(removed_config_paths)
if INSTALL_ENV_FILE.exists():
_remove_path(INSTALL_ENV_FILE)
removed_paths.append(INSTALL_ENV_FILE)
print_step("卸载完成")
if delete_config:
print_step(f"已删除配置目录:{config_dir}")
else:
print_step(f"已保留配置目录:{config_dir}")
print_step(f"源码目录仍保留在:{ROOT}")
return {
"cancelled": False,
"config_deleted": delete_config,
"removed_paths": [str(path) for path in removed_paths],
"removed_config_paths": [str(path) for path in removed_config_paths],
}
def _git_output(*args: str) -> str:
return capture(["git", *args], cwd=ROOT)
def _ensure_git_clean() -> None:
status = _git_output("status", "--porcelain", "--untracked-files=no")
if not status.strip():
return
changed_files: list[str] = []
for line in status.splitlines():
if len(line) < 4:
continue
changed_files.append(line[3:].strip())
detail = ""
if changed_files:
preview = "".join(changed_files[:5])
if len(changed_files) > 5:
preview += ""
detail = f"{preview}"
raise RuntimeError(
f"检测到当前仓库有未提交的源码改动{detail},请先提交或清理后再执行更新。"
)
def _update_backend_ref(ref: str) -> str:
if not (ROOT / ".git").exists():
raise RuntimeError("当前目录不是 Git 仓库,无法更新后端代码。")
_ensure_git_clean()
print_step("获取远端更新")
run(["git", "fetch", "--tags", "origin"], cwd=ROOT)
current_branch = _git_output("rev-parse", "--abbrev-ref", "HEAD")
if ref == "latest":
if current_branch == "HEAD":
raise RuntimeError(
"当前仓库处于 detached HEAD 状态,请使用 `moviepilot update backend --ref <tag|branch>` 指定版本。"
)
print_step(f"更新后端代码到当前分支最新版本:{current_branch}")
run(["git", "pull", "--ff-only", "origin", current_branch], cwd=ROOT)
return current_branch
print_step(f"切换后端代码到指定版本:{ref}")
run(["git", "checkout", ref], cwd=ROOT)
return ref
def update_backend(
*, ref: str, python_bin: str, venv_dir: Path, recreate: bool
) -> Path:
ensure_services_stopped()
resolved_ref = _update_backend_ref(ref=ref)
venv_python = install_deps(
python_bin=python_bin, venv_dir=venv_dir, recreate=recreate
)
print_step(f"后端更新完成:{resolved_ref}")
return venv_python
def handle_startup_command(
*,
action: str,
config_dir: Path,
runtime_python: Optional[Path],
venv_dir: Optional[Path],
) -> None:
if action == "status":
print_autostart_status()
return
if action == "enable":
result = enable_autostart(
config_dir=config_dir,
runtime_python=runtime_python,
venv_dir=venv_dir,
)
print_step(f"已启用开机自启:{result.get('method')}")
if result.get("artifact"):
print(f"注册文件:{result['artifact']}")
if result.get("note"):
print(f"说明:{result['note']}")
return
if action == "disable":
result = disable_autostart()
removed_paths = result.get("removed_paths") or []
if removed_paths:
print_step("已取消开机自启注册")
for path in removed_paths:
print(f"已移除:{path}")
else:
print_step("当前未配置开机自启,无需取消")
return
raise RuntimeError(f"未知的 startup 动作:{action}")
def run_agent_request(
*, message: str, session_id: Optional[str], new_session: bool, user_id: str
) -> dict[str, str]:
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
try:
from app.db.init import init_db, update_db
from app.agent import MoviePilotAgent
from app.core.config import settings
except ModuleNotFoundError as exc:
raise RuntimeError(
"当前环境尚未安装 MoviePilot 运行依赖,请先执行 moviepilot install deps 或 moviepilot setup"
) from exc
if not settings.AI_AGENT_ENABLE:
raise RuntimeError("MoviePilot 智能体未启用,请先在配置中打开 AI_AGENT_ENABLE")
init_db()
update_db()
session = (session_id or "").strip()
if new_session or not session:
session = f"cli-{uuid.uuid4().hex[:12]}"
async def _run_agent() -> dict[str, str]:
agent = MoviePilotAgent(session_id=session, user_id=user_id or "cli")
agent.suppress_user_reply = True
await agent.process(message.strip())
return {
"session_id": session,
"result": (agent._streamed_output or "").strip(),
}
return asyncio.run(_run_agent())
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="MoviePilot 本地安装与初始化工具")
subparsers = parser.add_subparsers(dest="command", required=True)
install_parser = subparsers.add_parser(
"install-deps", help="创建虚拟环境并安装后端依赖"
)
install_parser.add_argument(
"--python",
default=DEFAULT_BOOTSTRAP_PYTHON,
help="用于创建虚拟环境的 Python 解释器,默认自动选择本地 3.11+ 版本",
)
install_parser.add_argument(
"--venv", default=str(ROOT / "venv"), help="虚拟环境目录"
)
install_parser.add_argument(
"--recreate", action="store_true", help="删除并重建虚拟环境"
)
install_parser.add_argument(
"--config-dir", help="配置目录,默认使用程序目录外的系统配置目录"
)
frontend_parser = subparsers.add_parser(
"install-frontend", help="下载前端 release 并安装本地运行时"
)
frontend_parser.add_argument(
"--version", default="latest", help="前端版本,默认 latest"
)
frontend_parser.add_argument(
"--node-version", default=DEFAULT_NODE_VERSION, help="本地 Node 运行时版本"
)
frontend_parser.add_argument(
"--config-dir", help="配置目录,默认使用程序目录外的系统配置目录"
)
resources_parser = subparsers.add_parser(
"install-resources", help="下载资源文件并同步到 app/helper"
)
resources_parser.add_argument(
"--resources-repo", help="本地 MoviePilot-Resources 仓库路径"
)
resources_parser.add_argument("--resource-dir", help="直接指定 resources.v2 目录")
resources_parser.add_argument(
"--config-dir", help="配置目录,默认使用程序目录外的系统配置目录"
)
init_parser = subparsers.add_parser("init", help="初始化本地配置与资源文件")
init_parser.add_argument(
"--resources-repo", help="本地 MoviePilot-Resources 仓库路径"
)
init_parser.add_argument("--resource-dir", help="直接指定 resources.v2 目录")
init_parser.add_argument(
"--skip-resources", action="store_true", help="只初始化配置,不同步资源文件"
)
init_parser.add_argument(
"--force-token", action="store_true", help="强制重置 API_TOKEN"
)
init_parser.add_argument(
"--wizard", action="store_true", help="启动交互式初始化向导"
)
init_parser.add_argument("--superuser", help="预设超级管理员用户名")
init_parser.add_argument("--superuser-password", help="预设超级管理员密码")
init_parser.add_argument(
"--config-dir", help="配置目录,默认使用程序目录外的系统配置目录"
)
setup_parser = subparsers.add_parser(
"setup", help="执行 install-deps、install-frontend、install-resources 和 init"
)
setup_parser.add_argument(
"--python",
default=DEFAULT_BOOTSTRAP_PYTHON,
help="用于创建虚拟环境的 Python 解释器,默认自动选择本地 3.11+ 版本",
)
setup_parser.add_argument("--venv", default=str(ROOT / "venv"), help="虚拟环境目录")
setup_parser.add_argument(
"--recreate", action="store_true", help="删除并重建虚拟环境"
)
setup_parser.add_argument(
"--frontend-version", default="latest", help="前端版本,默认 latest"
)
setup_parser.add_argument(
"--node-version", default=DEFAULT_NODE_VERSION, help="本地 Node 运行时版本"
)
setup_parser.add_argument(
"--resources-repo", help="本地 MoviePilot-Resources 仓库路径"
)
setup_parser.add_argument("--resource-dir", help="直接指定 resources.v2 目录")
setup_parser.add_argument(
"--skip-resources", action="store_true", help="只初始化配置,不同步资源文件"
)
setup_parser.add_argument(
"--force-token", action="store_true", help="强制重置 API_TOKEN"
)
setup_parser.add_argument(
"--wizard", action="store_true", help="安装完成后启动交互式初始化向导"
)
setup_parser.add_argument("--superuser", help="预设超级管理员用户名")
setup_parser.add_argument("--superuser-password", help="预设超级管理员密码")
setup_parser.add_argument(
"--config-dir", help="配置目录,默认使用程序目录外的系统配置目录"
)
uninstall_parser = subparsers.add_parser(
"uninstall", help="卸载本地安装产物,并可选删除配置目录"
)
uninstall_parser.add_argument(
"--venv", default=str(ROOT / "venv"), help="虚拟环境目录"
)
uninstall_parser.add_argument(
"--config-dir", help="配置目录,默认使用当前安装配置"
)
agent_parser = subparsers.add_parser(
"agent", help="直接向 MoviePilot 智能体发送一次请求"
)
agent_parser.add_argument("message", nargs="+", help="发给智能体的文本请求")
agent_parser.add_argument("--session", help="会话 ID默认自动生成")
agent_parser.add_argument(
"--new-session", action="store_true", help="忽略传入会话,强制创建新会话"
)
agent_parser.add_argument(
"--user-id", default="cli", help="智能体上下文中的用户 ID"
)
agent_parser.add_argument(
"--config-dir", help="配置目录,默认使用程序目录外的系统配置目录"
)
update_parser = subparsers.add_parser("update", help="更新本地后端、前端或全部组件")
update_parser.add_argument(
"target", choices=["backend", "frontend", "all"], help="更新目标"
)
update_parser.add_argument(
"--ref", default="latest", help="后端 Git 版本,默认 latest"
)
update_parser.add_argument(
"--frontend-version", default="latest", help="前端版本,默认 latest"
)
update_parser.add_argument(
"--node-version", default=DEFAULT_NODE_VERSION, help="本地 Node 运行时版本"
)
update_parser.add_argument(
"--python",
default=DEFAULT_BOOTSTRAP_PYTHON,
help="用于安装后端依赖的 Python 解释器,默认自动选择本地 3.11+ 版本",
)
update_parser.add_argument(
"--venv", default=str(ROOT / "venv"), help="虚拟环境目录"
)
update_parser.add_argument(
"--recreate", action="store_true", help="删除并重建虚拟环境"
)
update_parser.add_argument(
"--skip-resources", action="store_true", help="更新 all 时跳过资源同步"
)
update_parser.add_argument(
"--config-dir", help="配置目录,默认使用程序目录外的系统配置目录"
)
startup_parser = subparsers.add_parser(
"startup", help="注册、取消或查看本地开机自启"
)
startup_parser.add_argument(
"action", choices=["enable", "disable", "status"], help="开机自启动作"
)
startup_parser.add_argument(
"--venv", default=str(ROOT / "venv"), help="虚拟环境目录"
)
startup_parser.add_argument(
"--config-dir", help="配置目录,默认使用当前安装配置"
)
apply_config_parser = subparsers.add_parser("apply-config", help=argparse.SUPPRESS)
apply_config_parser.add_argument(
"--config-json-file", required=True, help=argparse.SUPPRESS
)
sync_superuser_parser = subparsers.add_parser(
"sync-superuser", help=argparse.SUPPRESS
)
sync_superuser_parser.add_argument("--config-dir", help=argparse.SUPPRESS)
query_auth_sites_parser = subparsers.add_parser(
"query-auth-sites", help=argparse.SUPPRESS
)
query_auth_sites_parser.add_argument(
"--output-json-file", required=True, help=argparse.SUPPRESS
)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
explicit_config_dir = (
Path(args.config_dir) if getattr(args, "config_dir", None) else None
)
explicit_config_dir = _resolve_interactive_config_dir(
args.command, explicit_config_dir
)
persist_config_commands = {
"install-deps",
"install-frontend",
"install-resources",
"init",
"setup",
"agent",
"update",
}
config_dir = configure_config_dir(
explicit=explicit_config_dir,
persist=args.command in persist_config_commands,
prefer_external=args.command in persist_config_commands,
)
try:
if args.command == "install-deps":
venv_python = install_deps(
python_bin=args.python,
venv_dir=Path(args.venv),
recreate=args.recreate,
)
print_step(f"后端依赖安装完成,可执行:{venv_python} -m app.cli")
print_step(f"当前配置目录:{config_dir}")
return 0
if args.command == "install-frontend":
result = install_frontend(
frontend_version=args.version, node_version=args.node_version
)
print_step(f"前端安装完成,版本:{result['version']}")
return 0
if args.command == "install-resources":
install_resources(
resources_repo=Path(args.resources_repo)
if args.resources_repo
else None,
resource_dir=Path(args.resource_dir) if args.resource_dir else None,
)
return 0
if args.command == "init":
init_local(
resources_repo=Path(args.resources_repo)
if args.resources_repo
else None,
resource_dir=Path(args.resource_dir) if args.resource_dir else None,
skip_resources=args.skip_resources,
resources_ready=False,
force_token=args.force_token,
wizard=args.wizard,
superuser=args.superuser,
superuser_password=args.superuser_password,
runtime_python=None,
venv_dir=ROOT / "venv",
)
print_step("初始化完成")
print_step(f"当前配置目录:{config_dir}")
return 0
if args.command == "setup":
venv_python = install_deps(
python_bin=args.python,
venv_dir=Path(args.venv),
recreate=args.recreate,
)
install_frontend(
frontend_version=args.frontend_version, node_version=args.node_version
)
resources_installed = False
if not args.skip_resources:
install_resources(
resources_repo=Path(args.resources_repo)
if args.resources_repo
else None,
resource_dir=Path(args.resource_dir) if args.resource_dir else None,
)
resources_installed = True
init_local(
resources_repo=Path(args.resources_repo)
if args.resources_repo
else None,
resource_dir=Path(args.resource_dir) if args.resource_dir else None,
skip_resources=args.skip_resources or resources_installed,
resources_ready=resources_installed,
force_token=args.force_token,
wizard=args.wizard,
superuser=args.superuser,
superuser_password=args.superuser_password,
runtime_python=venv_python,
venv_dir=Path(args.venv),
)
print_step(f"本地环境已完成安装与初始化:{venv_python}")
print_step(f"当前配置目录:{config_dir}")
return 0
if args.command == "uninstall":
uninstall_local(
venv_dir=Path(args.venv),
config_dir=config_dir,
command_path=os.getenv("MOVIEPILOT_COMMAND_PATH"),
launch_path=os.getenv("MOVIEPILOT_LAUNCH_PATH"),
)
return 0
if args.command == "agent":
result = run_agent_request(
message=" ".join(args.message),
session_id=args.session,
new_session=args.new_session,
user_id=args.user_id,
)
if result.get("session_id"):
print_step(f"智能体会话:{result['session_id']}")
print(result.get("result") or "")
return 0
if args.command == "update":
ensure_services_stopped()
if args.target in {"backend", "all"}:
update_backend(
ref=args.ref,
python_bin=args.python,
venv_dir=Path(args.venv),
recreate=args.recreate,
)
if args.target in {"frontend", "all"}:
frontend_result = install_frontend(
frontend_version=args.frontend_version,
node_version=args.node_version,
)
print_step(f"前端更新完成,版本:{frontend_result['version']}")
if args.target == "all" and not args.skip_resources:
install_resources(resources_repo=None, resource_dir=None)
print_step("资源文件已同步到最新")
print_step(f"更新完成,当前配置目录:{config_dir}")
return 0
if args.command == "startup":
runtime_python = None
if args.action == "enable":
runtime_python = _resolve_runtime_python_for_startup(
None, Path(args.venv)
)
handle_startup_command(
action=args.action,
config_dir=config_dir,
runtime_python=runtime_python,
venv_dir=Path(args.venv),
)
return 0
if args.command == "apply-config":
payload = json.loads(
Path(args.config_json_file).read_text(encoding="utf-8")
)
if not isinstance(payload, dict):
raise RuntimeError("配置负载格式错误")
_apply_local_system_config_inner(payload)
return 0
if args.command == "sync-superuser":
_sync_superuser_account_inner()
return 0
if args.command == "query-auth-sites":
payload = _load_auth_site_definitions_inner()
Path(args.output_json_file).write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
return 0
except subprocess.CalledProcessError as exc:
print(f"命令执行失败,退出码:{exc.returncode}", file=sys.stderr)
return exc.returncode
except Exception as exc:
print(str(exc), file=sys.stderr)
return 1
parser.print_help()
return 1
if __name__ == "__main__":
sys.exit(main())