diff --git a/app/monitor.py b/app/monitor.py index 710dac75..58990b75 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -1,6 +1,7 @@ import json import platform import re +import subprocess import threading import time import traceback @@ -114,7 +115,7 @@ class Monitor(metaclass=Singleton): 'snapshot': snapshot } with open(cache_file, 'w', encoding='utf-8') as f: - json.dump(snapshot_data, f, ensure_ascii=False, indent=2) + json.dump(snapshot_data, f, ensure_ascii=False, indent=2) # noqa logger.debug(f"快照已保存到 {cache_file}") except Exception as e: logger.error(f"保存快照失败: {e}") @@ -189,6 +190,159 @@ class Monitor(metaclass=Singleton): return changes + @staticmethod + def count_directory_files(directory: Path, max_check: int = 10000) -> int: + """ + 统计目录下的文件数量(用于检测是否超过系统限制) + :param directory: 目录路径 + :param max_check: 最大检查数量,避免长时间阻塞 + :return: 文件数量 + """ + try: + count = 0 + import os + for root, dirs, files in os.walk(str(directory)): + count += len(files) + if count > max_check: + return count + return count + except Exception as err: + logger.debug(f"统计目录文件数量失败: {err}") + return 0 + + @staticmethod + def check_system_limits() -> Dict[str, Any]: + """ + 检查系统限制 + :return: 系统限制信息 + """ + limits = { + 'max_user_watches': 0, + 'max_user_instances': 0, + 'current_watches': 0, + 'warnings': [] + } + + try: + system = platform.system() + if system == 'Linux': + # 检查 inotify 限制 + try: + with open('/proc/sys/fs/inotify/max_user_watches', 'r') as f: + limits['max_user_watches'] = int(f.read().strip()) + except Exception as e: + logger.debug(f"读取 inotify 限制失败: {e}") + limits['max_user_watches'] = 8192 # 默认值 + + try: + with open('/proc/sys/fs/inotify/max_user_instances', 'r') as f: + limits['max_user_instances'] = int(f.read().strip()) + except Exception as e: + logger.debug(f"读取 inotify 实例限制失败: {e}") + + # 检查当前使用的watches + try: + import subprocess + result = subprocess.run(['find', '/proc/*/fd', '-lname', 'anon_inode:inotify', '-printf', '%h\n'], + capture_output=True, text=True, timeout=5) + if result.returncode == 0: + limits['current_watches'] = len(result.stdout.strip().split('\n')) + except Exception as e: + logger.debug(f"检查当前 inotify 使用失败: {e}") + + except Exception as e: + limits['warnings'].append(f"检查系统限制时出错: {e}") + + return limits + + @staticmethod + def get_system_optimization_tips() -> List[str]: + """ + 获取系统优化建议 + :return: 优化建议列表 + """ + tips = [] + system = platform.system() + + if system == 'Linux': + tips.extend([ + "增加 inotify 监控数量限制:", + "echo fs.inotify.max_user_watches=524288 | sudo tee -a /etc/sysctl.conf", + "echo fs.inotify.max_user_instances=524288 | sudo tee -a /etc/sysctl.conf", + "sudo sysctl -p", + "", + "如果在Docker中运行,请在宿主机上执行以上命令" + ]) + elif system == 'Darwin': + tips.extend([ + "macOS 系统优化建议:", + "sudo sysctl kern.maxfiles=65536", + "sudo sysctl kern.maxfilesperproc=32768", + "ulimit -n 32768" + ]) + elif system == 'Windows': + tips.extend([ + "Windows 系统优化建议:", + "1. 关闭不必要的实时保护软件对监控目录的扫描", + "2. 将监控目录添加到Windows Defender排除列表", + "3. 确保有足够的可用内存" + ]) + + return tips + + def should_use_polling(self, directory: Path, monitor_mode: str, + file_count: int, limits: dict) -> tuple[bool, str]: + """ + 判断是否应该使用轮询模式 + :param directory: 监控目录 + :param monitor_mode: 配置的监控模式 + :param file_count: 目录文件数量 + :param limits: 系统限制信息 + :return: (是否使用轮询, 原因) + """ + if monitor_mode == "compatible": + return True, "用户配置为兼容模式" + + # 检查网络文件系统 + if self.is_network_filesystem(directory): + return True, "检测到网络文件系统,建议使用兼容模式" + + max_watches = limits.get('max_user_watches') + if max_watches and file_count > max_watches * 0.8: + return True, f"目录文件数量({file_count})接近系统限制({max_watches})" + return False, "使用快速模式" + + @staticmethod + def is_network_filesystem(directory: Path) -> bool: + """ + 检测是否为网络文件系统 + :param directory: 目录路径 + :return: 是否为网络文件系统 + """ + try: + system = platform.system() + if system == 'Linux': + # 检查挂载信息 + result = subprocess.run(['df', '-T', str(directory)], + capture_output=True, text=True, timeout=5) + if result.returncode == 0: + output = result.stdout.lower() + network_fs = ['nfs', 'cifs', 'smbfs', 'fuse', 'sshfs', 'ftpfs'] + return any(fs in output for fs in network_fs) + elif system == 'Darwin': + # macOS 检查 + result = subprocess.run(['df', '-T', str(directory)], + capture_output=True, text=True, timeout=5) + if result.returncode == 0: + output = result.stdout.lower() + return 'nfs' in output or 'smbfs' in output + elif system == 'Windows': + # Windows 检查网络驱动器 + return str(directory).startswith('\\\\') + except Exception as e: + logger.debug(f"检测网络文件系统时出错: {e}") + return False + def init(self): """ 启动监控 @@ -199,10 +353,12 @@ class Monitor(metaclass=Singleton): # 读取目录配置 monitor_dirs = DirectoryHelper().get_download_dirs() if not monitor_dirs: + logger.info("未找到任何目录监控配置") return # 按下载目录去重 monitor_dirs = list({f"{d.storage}_{d.download_path}": d for d in monitor_dirs}.values()) + logger.info(f"找到 {len(monitor_dirs)} 个目录监控配置") # 启动定时服务进程 self._scheduler = BackgroundScheduler(timezone=settings.TZ) @@ -210,9 +366,12 @@ class Monitor(metaclass=Singleton): messagehelper = MessageHelper() for mon_dir in monitor_dirs: if not mon_dir.library_path: + logger.warn(f"跳过监控配置 {mon_dir.download_path}:未设置媒体库目录") continue if mon_dir.monitor_type != "monitor": + logger.debug(f"跳过监控配置 {mon_dir.download_path}:监控类型为 {mon_dir.monitor_type}") continue + # 检查媒体库目录是不是下载目录的子目录 mon_path = Path(mon_dir.download_path) target_path = Path(mon_dir.library_path) @@ -224,31 +383,70 @@ class Monitor(metaclass=Singleton): # 启动监控 if mon_dir.storage == "local": # 本地目录监控 + logger.info(f"正在启动本地目录监控: {mon_path}") + logger.info("*** 重要提示:目录监控只处理新增和修改的文件,不会处理监控启动前已存在的文件 ***") + try: - if mon_dir.monitor_mode == "fast": - observer = self.__choose_observer() - else: + # 统计文件数量并给出提示 + file_count = self.count_directory_files(mon_path) + logger.info(f"监控目录 {mon_path} 包含约 {file_count} 个文件") + + # 检查系统限制 + limits = self.check_system_limits() + + # 检查是否需要使用轮询模式 + use_polling, reason = self.should_use_polling(mon_path, + monitor_mode=mon_dir.monitor_mode, + file_count=file_count, + limits=limits) + logger.info(f"监控模式决策: {reason}") + + if use_polling: observer = PollingObserver() + logger.info(f"使用兼容模式(轮询)监控 {mon_path}") + else: + observer = self.__choose_observer() + if observer is None: + logger.warn(f"快速模式不可用,自动切换到兼容模式监控 {mon_path}") + observer = PollingObserver() + else: + logger.info(f"使用快速模式监控 {mon_path}") + if limits['warnings']: + for warning in limits['warnings']: + logger.warn(f"系统限制警告: {warning}") + if limits['max_user_watches'] > 0: + usage_percent = (file_count / limits['max_user_watches']) * 100 + logger.info( + f"系统监控资源使用率: {usage_percent:.1f}% ({file_count}/{limits['max_user_watches']})") + self._observers.append(observer) observer.schedule(FileMonitorHandler(mon_path=mon_path, callback=self), path=str(mon_path), recursive=True) observer.daemon = True observer.start() - logger.info(f"已启动 {mon_path} 的目录监控服务, 监控模式:{mon_dir.monitor_mode}") + + mode_name = "兼容模式(轮询)" if use_polling else "快速模式" + logger.info(f"✓ 本地目录监控已启动: {mon_path} [{mode_name}]") + except Exception as e: err_msg = str(e) - if "inotify" in err_msg and "reached" in err_msg: - logger.warn( - f"目录监控服务启动出现异常:{err_msg},请在宿主机上(不是docker容器内)执行以下命令并重启:" - + """ - echo fs.inotify.max_user_watches=524288 | sudo tee -a /etc/sysctl.conf - echo fs.inotify.max_user_instances=524288 | sudo tee -a /etc/sysctl.conf - sudo sysctl -p - """) + logger.error(f"启动本地目录监控失败: {mon_path}") + logger.error(f"错误详情: {err_msg}") + + if "inotify" in err_msg.lower(): + logger.error("inotify 相关错误,这通常是由于系统监控数量限制导致的") + logger.error("解决方案:") + tips = self.get_system_optimization_tips() + for tip in tips: + logger.error(f" {tip}") + logger.error("执行上述命令后重启 MoviePilot") + elif "permission" in err_msg.lower(): + logger.error("权限错误,请检查 MoviePilot 是否有足够的权限访问监控目录") else: - logger.error(f"{mon_path} 启动目录监控失败:{err_msg}") - messagehelper.put(f"{mon_path} 启动目录监控失败:{err_msg}", title="目录监控") + logger.error("建议尝试使用兼容模式进行监控") + + messagehelper.put(f"启动本地目录监控失败: {mon_path}\n错误: {err_msg}", title="目录监控") else: # 远程目录监控 - 使用智能间隔 # 先尝试加载已有快照获取文件数量 @@ -256,6 +454,10 @@ class Monitor(metaclass=Singleton): file_count = snapshot_data.get('file_count', 0) if snapshot_data else 0 interval = self.adjust_monitor_interval(file_count) + logger.info(f"正在启动远程目录监控: {mon_path} [{mon_dir.storage}]") + logger.info("*** 重要提示:远程目录监控只处理新增和修改的文件,不会处理监控启动前已存在的文件 ***") + logger.info(f"预估文件数量: {file_count}, 监控间隔: {interval}分钟") + self._scheduler.add_job( self.polling_observer, 'interval', @@ -267,33 +469,76 @@ class Monitor(metaclass=Singleton): id=f"monitor_{mon_dir.storage}_{mon_dir.download_path}", replace_existing=True ) - logger.info(f"已启动 {mon_path} 的远程目录监控,存储:{mon_dir.storage},间隔:{interval}分钟") + logger.info(f"✓ 远程目录监控已启动: {mon_path} [间隔: {interval}分钟]") # 启动定时服务 if self._scheduler.get_jobs(): self._scheduler.print_jobs() self._scheduler.start() + logger.info("定时监控服务已启动") - @staticmethod - def __choose_observer() -> Any: + # 输出监控总结 + local_count = len([d for d in monitor_dirs if d.storage == "local" and d.monitor_type == "monitor"]) + remote_count = len([d for d in monitor_dirs if d.storage != "local" and d.monitor_type == "monitor"]) + logger.info(f"目录监控启动完成: 本地监控 {local_count} 个,远程监控 {remote_count} 个") + + def __choose_observer(self) -> Optional[Any]: """ - 选择最优的监控模式 + 选择最优的监控模式(带错误处理和自动回退) """ system = platform.system() + observers_to_try = [] + try: if system == 'Linux': - from watchdog.observers.inotify import InotifyObserver - return InotifyObserver() + observers_to_try = [ + ('InotifyObserver', + lambda: self.__try_import_observer('watchdog.observers.inotify', 'InotifyObserver')), + ] elif system == 'Darwin': - from watchdog.observers.fsevents import FSEventsObserver - return FSEventsObserver() + observers_to_try = [ + ('FSEventsObserver', + lambda: self.__try_import_observer('watchdog.observers.fsevents', 'FSEventsObserver')), + ] elif system == 'Windows': - from watchdog.observers.read_directory_changes import WindowsApiObserver - return WindowsApiObserver() - except Exception as error: - logger.warn(f"导入模块错误:{error},将使用 PollingObserver 监控目录") - return PollingObserver() + observers_to_try = [ + ('WindowsApiObserver', + lambda: self.__try_import_observer('watchdog.observers.read_directory_changes', + 'WindowsApiObserver')), + ] + + # 尝试每个观察者 + for observer_name, observer_func in observers_to_try: + try: + observer_class = observer_func() + if observer_class: + # 尝试创建实例以验证是否可用 + test_observer = observer_class() + test_observer.stop() # 立即停止测试实例 + logger.debug(f"成功初始化 {observer_name}") + return observer_class() + except Exception as e: + logger.debug(f"初始化 {observer_name} 失败: {e}") + continue + + except Exception as e: + logger.debug(f"选择观察者时出错: {e}") + + logger.debug("所有快速监控模式都不可用,将使用兼容模式") + return None + + @staticmethod + def __try_import_observer(module_name: str, class_name: str): + """ + 尝试导入观察者类 + """ + try: + module = __import__(module_name, fromlist=[class_name]) + return getattr(module, class_name) + except (ImportError, AttributeError) as e: + logger.debug(f"导入 {module_name}.{class_name} 失败: {e}") + return None def polling_observer(self, storage: str, mon_path: Path): """ @@ -343,8 +588,11 @@ class Monitor(metaclass=Singleton): if changes['added'] or changes['modified']: logger.info( f"{storage}:{mon_path} 发现 {len(changes['added'])} 个新增文件,{len(changes['modified'])} 个修改文件") + else: + logger.debug(f"{storage}:{mon_path} 无文件变化") else: - logger.info(f"{storage}:{mon_path} 首次快照,暂不处理文件") + logger.info(f"{storage}:{mon_path} 首次快照完成,共 {file_count} 个文件") + logger.info("*** 首次快照仅建立基准,不会处理现有文件。后续监控将处理新增和修改的文件 ***") # 保存新快照 self.save_snapshot(storage, new_snapshot, file_count) @@ -375,7 +623,7 @@ class Monitor(metaclass=Singleton): """ if not event.is_directory: # 文件发生变化 - logger.debug(f"文件 {event_path} 发生了 {text}") + logger.debug(f"检测到文件变化: {event_path} [{text}]") # 整理文件 self.__handle_file(storage="local", event_path=Path(event_path), file_size=file_size) @@ -412,10 +660,12 @@ class Monitor(metaclass=Singleton): # TTL缓存控重 if self._cache.get(str(event_path)): + logger.debug(f"文件 {event_path} 在缓存中,跳过处理") return self._cache[str(event_path)] = True try: + logger.info(f"开始整理文件: {event_path}") # 开始整理 TransferChain().do_transfer( fileitem=FileItem( @@ -429,7 +679,7 @@ class Monitor(metaclass=Singleton): ) ) except Exception as e: - logger.error("目录监控发生错误:%s - %s" % (str(e), traceback.format_exc())) + logger.error("目录监控整理文件发生错误:%s - %s" % (str(e), traceback.format_exc())) def stop(self): """ @@ -437,20 +687,22 @@ class Monitor(metaclass=Singleton): """ self._event.set() if self._observers: + logger.info("正在停止本地目录监控服务...") for observer in self._observers: try: - logger.info(f"正在停止目录监控服务:{observer}...") observer.stop() observer.join() - logger.info(f"{observer} 目录监控已停止") + logger.debug(f"已停止监控服务: {observer}") except Exception as e: logger.error(f"停止目录监控服务出现了错误:{e}") self._observers = [] + logger.info("本地目录监控服务已停止") if self._scheduler: self._scheduler.remove_all_jobs() if self._scheduler.running: try: self._scheduler.shutdown() + logger.info("定时监控服务已停止") except Exception as e: logger.error(f"停止定时服务出现了错误:{e}") self._scheduler = None