mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-02-03 02:25:32 +08:00
refactor:优化目录监控
This commit is contained in:
318
app/monitor.py
318
app/monitor.py
@@ -1,6 +1,7 @@
|
||||
import json
|
||||
import platform
|
||||
import re
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
@@ -114,7 +115,7 @@ class Monitor(metaclass=Singleton):
|
||||
'snapshot': snapshot
|
||||
}
|
||||
with open(cache_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(snapshot_data, f, ensure_ascii=False, indent=2)
|
||||
json.dump(snapshot_data, f, ensure_ascii=False, indent=2) # noqa
|
||||
logger.debug(f"快照已保存到 {cache_file}")
|
||||
except Exception as e:
|
||||
logger.error(f"保存快照失败: {e}")
|
||||
@@ -189,6 +190,159 @@ class Monitor(metaclass=Singleton):
|
||||
|
||||
return changes
|
||||
|
||||
@staticmethod
|
||||
def count_directory_files(directory: Path, max_check: int = 10000) -> int:
|
||||
"""
|
||||
统计目录下的文件数量(用于检测是否超过系统限制)
|
||||
:param directory: 目录路径
|
||||
:param max_check: 最大检查数量,避免长时间阻塞
|
||||
:return: 文件数量
|
||||
"""
|
||||
try:
|
||||
count = 0
|
||||
import os
|
||||
for root, dirs, files in os.walk(str(directory)):
|
||||
count += len(files)
|
||||
if count > max_check:
|
||||
return count
|
||||
return count
|
||||
except Exception as err:
|
||||
logger.debug(f"统计目录文件数量失败: {err}")
|
||||
return 0
|
||||
|
||||
@staticmethod
|
||||
def check_system_limits() -> Dict[str, Any]:
|
||||
"""
|
||||
检查系统限制
|
||||
:return: 系统限制信息
|
||||
"""
|
||||
limits = {
|
||||
'max_user_watches': 0,
|
||||
'max_user_instances': 0,
|
||||
'current_watches': 0,
|
||||
'warnings': []
|
||||
}
|
||||
|
||||
try:
|
||||
system = platform.system()
|
||||
if system == 'Linux':
|
||||
# 检查 inotify 限制
|
||||
try:
|
||||
with open('/proc/sys/fs/inotify/max_user_watches', 'r') as f:
|
||||
limits['max_user_watches'] = int(f.read().strip())
|
||||
except Exception as e:
|
||||
logger.debug(f"读取 inotify 限制失败: {e}")
|
||||
limits['max_user_watches'] = 8192 # 默认值
|
||||
|
||||
try:
|
||||
with open('/proc/sys/fs/inotify/max_user_instances', 'r') as f:
|
||||
limits['max_user_instances'] = int(f.read().strip())
|
||||
except Exception as e:
|
||||
logger.debug(f"读取 inotify 实例限制失败: {e}")
|
||||
|
||||
# 检查当前使用的watches
|
||||
try:
|
||||
import subprocess
|
||||
result = subprocess.run(['find', '/proc/*/fd', '-lname', 'anon_inode:inotify', '-printf', '%h\n'],
|
||||
capture_output=True, text=True, timeout=5)
|
||||
if result.returncode == 0:
|
||||
limits['current_watches'] = len(result.stdout.strip().split('\n'))
|
||||
except Exception as e:
|
||||
logger.debug(f"检查当前 inotify 使用失败: {e}")
|
||||
|
||||
except Exception as e:
|
||||
limits['warnings'].append(f"检查系统限制时出错: {e}")
|
||||
|
||||
return limits
|
||||
|
||||
@staticmethod
|
||||
def get_system_optimization_tips() -> List[str]:
|
||||
"""
|
||||
获取系统优化建议
|
||||
:return: 优化建议列表
|
||||
"""
|
||||
tips = []
|
||||
system = platform.system()
|
||||
|
||||
if system == 'Linux':
|
||||
tips.extend([
|
||||
"增加 inotify 监控数量限制:",
|
||||
"echo fs.inotify.max_user_watches=524288 | sudo tee -a /etc/sysctl.conf",
|
||||
"echo fs.inotify.max_user_instances=524288 | sudo tee -a /etc/sysctl.conf",
|
||||
"sudo sysctl -p",
|
||||
"",
|
||||
"如果在Docker中运行,请在宿主机上执行以上命令"
|
||||
])
|
||||
elif system == 'Darwin':
|
||||
tips.extend([
|
||||
"macOS 系统优化建议:",
|
||||
"sudo sysctl kern.maxfiles=65536",
|
||||
"sudo sysctl kern.maxfilesperproc=32768",
|
||||
"ulimit -n 32768"
|
||||
])
|
||||
elif system == 'Windows':
|
||||
tips.extend([
|
||||
"Windows 系统优化建议:",
|
||||
"1. 关闭不必要的实时保护软件对监控目录的扫描",
|
||||
"2. 将监控目录添加到Windows Defender排除列表",
|
||||
"3. 确保有足够的可用内存"
|
||||
])
|
||||
|
||||
return tips
|
||||
|
||||
def should_use_polling(self, directory: Path, monitor_mode: str,
|
||||
file_count: int, limits: dict) -> tuple[bool, str]:
|
||||
"""
|
||||
判断是否应该使用轮询模式
|
||||
:param directory: 监控目录
|
||||
:param monitor_mode: 配置的监控模式
|
||||
:param file_count: 目录文件数量
|
||||
:param limits: 系统限制信息
|
||||
:return: (是否使用轮询, 原因)
|
||||
"""
|
||||
if monitor_mode == "compatible":
|
||||
return True, "用户配置为兼容模式"
|
||||
|
||||
# 检查网络文件系统
|
||||
if self.is_network_filesystem(directory):
|
||||
return True, "检测到网络文件系统,建议使用兼容模式"
|
||||
|
||||
max_watches = limits.get('max_user_watches')
|
||||
if max_watches and file_count > max_watches * 0.8:
|
||||
return True, f"目录文件数量({file_count})接近系统限制({max_watches})"
|
||||
return False, "使用快速模式"
|
||||
|
||||
@staticmethod
|
||||
def is_network_filesystem(directory: Path) -> bool:
|
||||
"""
|
||||
检测是否为网络文件系统
|
||||
:param directory: 目录路径
|
||||
:return: 是否为网络文件系统
|
||||
"""
|
||||
try:
|
||||
system = platform.system()
|
||||
if system == 'Linux':
|
||||
# 检查挂载信息
|
||||
result = subprocess.run(['df', '-T', str(directory)],
|
||||
capture_output=True, text=True, timeout=5)
|
||||
if result.returncode == 0:
|
||||
output = result.stdout.lower()
|
||||
network_fs = ['nfs', 'cifs', 'smbfs', 'fuse', 'sshfs', 'ftpfs']
|
||||
return any(fs in output for fs in network_fs)
|
||||
elif system == 'Darwin':
|
||||
# macOS 检查
|
||||
result = subprocess.run(['df', '-T', str(directory)],
|
||||
capture_output=True, text=True, timeout=5)
|
||||
if result.returncode == 0:
|
||||
output = result.stdout.lower()
|
||||
return 'nfs' in output or 'smbfs' in output
|
||||
elif system == 'Windows':
|
||||
# Windows 检查网络驱动器
|
||||
return str(directory).startswith('\\\\')
|
||||
except Exception as e:
|
||||
logger.debug(f"检测网络文件系统时出错: {e}")
|
||||
return False
|
||||
|
||||
def init(self):
|
||||
"""
|
||||
启动监控
|
||||
@@ -199,10 +353,12 @@ class Monitor(metaclass=Singleton):
|
||||
# 读取目录配置
|
||||
monitor_dirs = DirectoryHelper().get_download_dirs()
|
||||
if not monitor_dirs:
|
||||
logger.info("未找到任何目录监控配置")
|
||||
return
|
||||
|
||||
# 按下载目录去重
|
||||
monitor_dirs = list({f"{d.storage}_{d.download_path}": d for d in monitor_dirs}.values())
|
||||
logger.info(f"找到 {len(monitor_dirs)} 个目录监控配置")
|
||||
|
||||
# 启动定时服务进程
|
||||
self._scheduler = BackgroundScheduler(timezone=settings.TZ)
|
||||
@@ -210,9 +366,12 @@ class Monitor(metaclass=Singleton):
|
||||
messagehelper = MessageHelper()
|
||||
for mon_dir in monitor_dirs:
|
||||
if not mon_dir.library_path:
|
||||
logger.warn(f"跳过监控配置 {mon_dir.download_path}:未设置媒体库目录")
|
||||
continue
|
||||
if mon_dir.monitor_type != "monitor":
|
||||
logger.debug(f"跳过监控配置 {mon_dir.download_path}:监控类型为 {mon_dir.monitor_type}")
|
||||
continue
|
||||
|
||||
# 检查媒体库目录是不是下载目录的子目录
|
||||
mon_path = Path(mon_dir.download_path)
|
||||
target_path = Path(mon_dir.library_path)
|
||||
@@ -224,31 +383,70 @@ class Monitor(metaclass=Singleton):
|
||||
# 启动监控
|
||||
if mon_dir.storage == "local":
|
||||
# 本地目录监控
|
||||
logger.info(f"正在启动本地目录监控: {mon_path}")
|
||||
logger.info("*** 重要提示:目录监控只处理新增和修改的文件,不会处理监控启动前已存在的文件 ***")
|
||||
|
||||
try:
|
||||
if mon_dir.monitor_mode == "fast":
|
||||
observer = self.__choose_observer()
|
||||
else:
|
||||
# 统计文件数量并给出提示
|
||||
file_count = self.count_directory_files(mon_path)
|
||||
logger.info(f"监控目录 {mon_path} 包含约 {file_count} 个文件")
|
||||
|
||||
# 检查系统限制
|
||||
limits = self.check_system_limits()
|
||||
|
||||
# 检查是否需要使用轮询模式
|
||||
use_polling, reason = self.should_use_polling(mon_path,
|
||||
monitor_mode=mon_dir.monitor_mode,
|
||||
file_count=file_count,
|
||||
limits=limits)
|
||||
logger.info(f"监控模式决策: {reason}")
|
||||
|
||||
if use_polling:
|
||||
observer = PollingObserver()
|
||||
logger.info(f"使用兼容模式(轮询)监控 {mon_path}")
|
||||
else:
|
||||
observer = self.__choose_observer()
|
||||
if observer is None:
|
||||
logger.warn(f"快速模式不可用,自动切换到兼容模式监控 {mon_path}")
|
||||
observer = PollingObserver()
|
||||
else:
|
||||
logger.info(f"使用快速模式监控 {mon_path}")
|
||||
if limits['warnings']:
|
||||
for warning in limits['warnings']:
|
||||
logger.warn(f"系统限制警告: {warning}")
|
||||
if limits['max_user_watches'] > 0:
|
||||
usage_percent = (file_count / limits['max_user_watches']) * 100
|
||||
logger.info(
|
||||
f"系统监控资源使用率: {usage_percent:.1f}% ({file_count}/{limits['max_user_watches']})")
|
||||
|
||||
self._observers.append(observer)
|
||||
observer.schedule(FileMonitorHandler(mon_path=mon_path, callback=self),
|
||||
path=str(mon_path),
|
||||
recursive=True)
|
||||
observer.daemon = True
|
||||
observer.start()
|
||||
logger.info(f"已启动 {mon_path} 的目录监控服务, 监控模式:{mon_dir.monitor_mode}")
|
||||
|
||||
mode_name = "兼容模式(轮询)" if use_polling else "快速模式"
|
||||
logger.info(f"✓ 本地目录监控已启动: {mon_path} [{mode_name}]")
|
||||
|
||||
except Exception as e:
|
||||
err_msg = str(e)
|
||||
if "inotify" in err_msg and "reached" in err_msg:
|
||||
logger.warn(
|
||||
f"目录监控服务启动出现异常:{err_msg},请在宿主机上(不是docker容器内)执行以下命令并重启:"
|
||||
+ """
|
||||
echo fs.inotify.max_user_watches=524288 | sudo tee -a /etc/sysctl.conf
|
||||
echo fs.inotify.max_user_instances=524288 | sudo tee -a /etc/sysctl.conf
|
||||
sudo sysctl -p
|
||||
""")
|
||||
logger.error(f"启动本地目录监控失败: {mon_path}")
|
||||
logger.error(f"错误详情: {err_msg}")
|
||||
|
||||
if "inotify" in err_msg.lower():
|
||||
logger.error("inotify 相关错误,这通常是由于系统监控数量限制导致的")
|
||||
logger.error("解决方案:")
|
||||
tips = self.get_system_optimization_tips()
|
||||
for tip in tips:
|
||||
logger.error(f" {tip}")
|
||||
logger.error("执行上述命令后重启 MoviePilot")
|
||||
elif "permission" in err_msg.lower():
|
||||
logger.error("权限错误,请检查 MoviePilot 是否有足够的权限访问监控目录")
|
||||
else:
|
||||
logger.error(f"{mon_path} 启动目录监控失败:{err_msg}")
|
||||
messagehelper.put(f"{mon_path} 启动目录监控失败:{err_msg}", title="目录监控")
|
||||
logger.error("建议尝试使用兼容模式进行监控")
|
||||
|
||||
messagehelper.put(f"启动本地目录监控失败: {mon_path}\n错误: {err_msg}", title="目录监控")
|
||||
else:
|
||||
# 远程目录监控 - 使用智能间隔
|
||||
# 先尝试加载已有快照获取文件数量
|
||||
@@ -256,6 +454,10 @@ class Monitor(metaclass=Singleton):
|
||||
file_count = snapshot_data.get('file_count', 0) if snapshot_data else 0
|
||||
interval = self.adjust_monitor_interval(file_count)
|
||||
|
||||
logger.info(f"正在启动远程目录监控: {mon_path} [{mon_dir.storage}]")
|
||||
logger.info("*** 重要提示:远程目录监控只处理新增和修改的文件,不会处理监控启动前已存在的文件 ***")
|
||||
logger.info(f"预估文件数量: {file_count}, 监控间隔: {interval}分钟")
|
||||
|
||||
self._scheduler.add_job(
|
||||
self.polling_observer,
|
||||
'interval',
|
||||
@@ -267,33 +469,76 @@ class Monitor(metaclass=Singleton):
|
||||
id=f"monitor_{mon_dir.storage}_{mon_dir.download_path}",
|
||||
replace_existing=True
|
||||
)
|
||||
logger.info(f"已启动 {mon_path} 的远程目录监控,存储:{mon_dir.storage},间隔:{interval}分钟")
|
||||
logger.info(f"✓ 远程目录监控已启动: {mon_path} [间隔: {interval}分钟]")
|
||||
|
||||
# 启动定时服务
|
||||
if self._scheduler.get_jobs():
|
||||
self._scheduler.print_jobs()
|
||||
self._scheduler.start()
|
||||
logger.info("定时监控服务已启动")
|
||||
|
||||
@staticmethod
|
||||
def __choose_observer() -> Any:
|
||||
# 输出监控总结
|
||||
local_count = len([d for d in monitor_dirs if d.storage == "local" and d.monitor_type == "monitor"])
|
||||
remote_count = len([d for d in monitor_dirs if d.storage != "local" and d.monitor_type == "monitor"])
|
||||
logger.info(f"目录监控启动完成: 本地监控 {local_count} 个,远程监控 {remote_count} 个")
|
||||
|
||||
def __choose_observer(self) -> Optional[Any]:
|
||||
"""
|
||||
选择最优的监控模式
|
||||
选择最优的监控模式(带错误处理和自动回退)
|
||||
"""
|
||||
system = platform.system()
|
||||
|
||||
observers_to_try = []
|
||||
|
||||
try:
|
||||
if system == 'Linux':
|
||||
from watchdog.observers.inotify import InotifyObserver
|
||||
return InotifyObserver()
|
||||
observers_to_try = [
|
||||
('InotifyObserver',
|
||||
lambda: self.__try_import_observer('watchdog.observers.inotify', 'InotifyObserver')),
|
||||
]
|
||||
elif system == 'Darwin':
|
||||
from watchdog.observers.fsevents import FSEventsObserver
|
||||
return FSEventsObserver()
|
||||
observers_to_try = [
|
||||
('FSEventsObserver',
|
||||
lambda: self.__try_import_observer('watchdog.observers.fsevents', 'FSEventsObserver')),
|
||||
]
|
||||
elif system == 'Windows':
|
||||
from watchdog.observers.read_directory_changes import WindowsApiObserver
|
||||
return WindowsApiObserver()
|
||||
except Exception as error:
|
||||
logger.warn(f"导入模块错误:{error},将使用 PollingObserver 监控目录")
|
||||
return PollingObserver()
|
||||
observers_to_try = [
|
||||
('WindowsApiObserver',
|
||||
lambda: self.__try_import_observer('watchdog.observers.read_directory_changes',
|
||||
'WindowsApiObserver')),
|
||||
]
|
||||
|
||||
# 尝试每个观察者
|
||||
for observer_name, observer_func in observers_to_try:
|
||||
try:
|
||||
observer_class = observer_func()
|
||||
if observer_class:
|
||||
# 尝试创建实例以验证是否可用
|
||||
test_observer = observer_class()
|
||||
test_observer.stop() # 立即停止测试实例
|
||||
logger.debug(f"成功初始化 {observer_name}")
|
||||
return observer_class()
|
||||
except Exception as e:
|
||||
logger.debug(f"初始化 {observer_name} 失败: {e}")
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"选择观察者时出错: {e}")
|
||||
|
||||
logger.debug("所有快速监控模式都不可用,将使用兼容模式")
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def __try_import_observer(module_name: str, class_name: str):
|
||||
"""
|
||||
尝试导入观察者类
|
||||
"""
|
||||
try:
|
||||
module = __import__(module_name, fromlist=[class_name])
|
||||
return getattr(module, class_name)
|
||||
except (ImportError, AttributeError) as e:
|
||||
logger.debug(f"导入 {module_name}.{class_name} 失败: {e}")
|
||||
return None
|
||||
|
||||
def polling_observer(self, storage: str, mon_path: Path):
|
||||
"""
|
||||
@@ -343,8 +588,11 @@ class Monitor(metaclass=Singleton):
|
||||
if changes['added'] or changes['modified']:
|
||||
logger.info(
|
||||
f"{storage}:{mon_path} 发现 {len(changes['added'])} 个新增文件,{len(changes['modified'])} 个修改文件")
|
||||
else:
|
||||
logger.debug(f"{storage}:{mon_path} 无文件变化")
|
||||
else:
|
||||
logger.info(f"{storage}:{mon_path} 首次快照,暂不处理文件")
|
||||
logger.info(f"{storage}:{mon_path} 首次快照完成,共 {file_count} 个文件")
|
||||
logger.info("*** 首次快照仅建立基准,不会处理现有文件。后续监控将处理新增和修改的文件 ***")
|
||||
|
||||
# 保存新快照
|
||||
self.save_snapshot(storage, new_snapshot, file_count)
|
||||
@@ -375,7 +623,7 @@ class Monitor(metaclass=Singleton):
|
||||
"""
|
||||
if not event.is_directory:
|
||||
# 文件发生变化
|
||||
logger.debug(f"文件 {event_path} 发生了 {text}")
|
||||
logger.debug(f"检测到文件变化: {event_path} [{text}]")
|
||||
# 整理文件
|
||||
self.__handle_file(storage="local", event_path=Path(event_path), file_size=file_size)
|
||||
|
||||
@@ -412,10 +660,12 @@ class Monitor(metaclass=Singleton):
|
||||
|
||||
# TTL缓存控重
|
||||
if self._cache.get(str(event_path)):
|
||||
logger.debug(f"文件 {event_path} 在缓存中,跳过处理")
|
||||
return
|
||||
self._cache[str(event_path)] = True
|
||||
|
||||
try:
|
||||
logger.info(f"开始整理文件: {event_path}")
|
||||
# 开始整理
|
||||
TransferChain().do_transfer(
|
||||
fileitem=FileItem(
|
||||
@@ -429,7 +679,7 @@ class Monitor(metaclass=Singleton):
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("目录监控发生错误:%s - %s" % (str(e), traceback.format_exc()))
|
||||
logger.error("目录监控整理文件发生错误:%s - %s" % (str(e), traceback.format_exc()))
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
@@ -437,20 +687,22 @@ class Monitor(metaclass=Singleton):
|
||||
"""
|
||||
self._event.set()
|
||||
if self._observers:
|
||||
logger.info("正在停止本地目录监控服务...")
|
||||
for observer in self._observers:
|
||||
try:
|
||||
logger.info(f"正在停止目录监控服务:{observer}...")
|
||||
observer.stop()
|
||||
observer.join()
|
||||
logger.info(f"{observer} 目录监控已停止")
|
||||
logger.debug(f"已停止监控服务: {observer}")
|
||||
except Exception as e:
|
||||
logger.error(f"停止目录监控服务出现了错误:{e}")
|
||||
self._observers = []
|
||||
logger.info("本地目录监控服务已停止")
|
||||
if self._scheduler:
|
||||
self._scheduler.remove_all_jobs()
|
||||
if self._scheduler.running:
|
||||
try:
|
||||
self._scheduler.shutdown()
|
||||
logger.info("定时监控服务已停止")
|
||||
except Exception as e:
|
||||
logger.error(f"停止定时服务出现了错误:{e}")
|
||||
self._scheduler = None
|
||||
|
||||
Reference in New Issue
Block a user