From 8593a6cdd08daf01d10da898a41a1b2022e9f7dd Mon Sep 17 00:00:00 2001 From: jxxghp Date: Mon, 30 Jun 2025 12:40:37 +0800 Subject: [PATCH] =?UTF-8?q?refactor=EF=BC=9A=E4=BC=98=E5=8C=96=E7=9B=AE?= =?UTF-8?q?=E5=BD=95=E7=9B=91=E6=8E=A7=E5=BF=AB=E7=85=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/chain/storage.py | 10 +- app/modules/filemanager/__init__.py | 9 +- app/modules/filemanager/storages/__init__.py | 41 +++- app/modules/filemanager/storages/alist.py | 26 +-- app/monitor.py | 204 ++++++++++++++++--- 5 files changed, 230 insertions(+), 60 deletions(-) diff --git a/app/chain/storage.py b/app/chain/storage.py index 50d9d25b..c5ad52ea 100644 --- a/app/chain/storage.py +++ b/app/chain/storage.py @@ -110,11 +110,17 @@ class StorageChain(ChainBase): """ return self.run_module("get_parent_item", fileitem=fileitem) - def snapshot_storage(self, storage: str, path: Path) -> Optional[Dict[str, float]]: + def snapshot_storage(self, storage: str, path: Path, + last_snapshot_time: float = None, max_depth: int = 5) -> Optional[Dict[str, Dict]]: """ 快照存储 + :param storage: 存储类型 + :param path: 路径 + :param last_snapshot_time: 上次快照时间,用于增量快照 + :param max_depth: 最大递归深度,避免过深遍历 """ - return self.run_module("snapshot_storage", storage=storage, path=path) + return self.run_module("snapshot_storage", storage=storage, path=path, + last_snapshot_time=last_snapshot_time, max_depth=max_depth) def storage_usage(self, storage: str) -> Optional[schemas.StorageUsage]: """ diff --git a/app/modules/filemanager/__init__.py b/app/modules/filemanager/__init__.py index 54fd5fac..3ed87abe 100644 --- a/app/modules/filemanager/__init__.py +++ b/app/modules/filemanager/__init__.py @@ -344,9 +344,14 @@ class FileManagerModule(_ModuleBase): return None return storage_oper.get_parent(fileitem) - def snapshot_storage(self, storage: str, path: Path) -> Optional[Dict[str, float]]: + def snapshot_storage(self, storage: str, path: Path, + last_snapshot_time: float = None, max_depth: int = 5) -> Optional[Dict[str, Dict]]: """ 快照存储 + :param storage: 存储类型 + :param path: 路径 + :param last_snapshot_time: 上次快照时间,用于增量快照 + :param max_depth: 最大递归深度,避免过深遍历 """ if storage not in self._support_storages: return None @@ -354,7 +359,7 @@ class FileManagerModule(_ModuleBase): if not storage_oper: logger.error(f"不支持 {storage} 的快照处理") return None - return storage_oper.snapshot(path) + return storage_oper.snapshot(path, last_snapshot_time=last_snapshot_time, max_depth=max_depth) def storage_usage(self, storage: str) -> Optional[StorageUsage]: """ diff --git a/app/modules/filemanager/storages/__init__.py b/app/modules/filemanager/storages/__init__.py index a6302d2e..c00ae5b9 100644 --- a/app/modules/filemanager/storages/__init__.py +++ b/app/modules/filemanager/storages/__init__.py @@ -4,6 +4,7 @@ from typing import Optional, List, Dict, Tuple from app import schemas from app.helper.storage import StorageHelper +from app.log import logger class StorageBase(metaclass=ABCMeta): @@ -135,7 +136,8 @@ class StorageBase(metaclass=ABCMeta): pass @abstractmethod - def upload(self, fileitem: schemas.FileItem, path: Path, new_name: Optional[str] = None) -> Optional[schemas.FileItem]: + def upload(self, fileitem: schemas.FileItem, path: Path, + new_name: Optional[str] = None) -> Optional[schemas.FileItem]: """ 上传文件 :param fileitem: 上传目录项 @@ -192,21 +194,44 @@ class StorageBase(metaclass=ABCMeta): """ pass - def snapshot(self, path: Path) -> Dict[str, float]: + def snapshot(self, path: Path, last_snapshot_time: float = None, max_depth: int = 5) -> Dict[str, Dict]: """ 快照文件系统,输出所有层级文件信息(不含目录) + :param path: 路径 + :param last_snapshot_time: 上次快照时间,用于增量快照 + :param max_depth: 最大递归深度,避免过深遍历 """ files_info = {} - def __snapshot_file(_fileitm: schemas.FileItem): + def __snapshot_file(_fileitm: schemas.FileItem, current_depth: int = 0): """ 递归获取文件信息 """ - if _fileitm.type == "dir": - for sub_file in self.list(_fileitm): - __snapshot_file(sub_file) - else: - files_info[_fileitm.path] = _fileitm.size + try: + if _fileitm.type == "dir": + # 检查递归深度限制 + if current_depth >= max_depth: + return + + # 增量检查:如果目录修改时间早于上次快照,跳过 + if (last_snapshot_time and + _fileitm.modify_time and + _fileitm.modify_time <= last_snapshot_time): + return + + # 遍历子文件 + sub_files = self.list(_fileitm) + for sub_file in sub_files: + __snapshot_file(sub_file, current_depth + 1) + else: + # 记录文件的完整信息用于比对 + files_info[_fileitm.path] = { + 'size': _fileitm.size or 0, + 'modify_time': getattr(_fileitm, 'modify_time', 0), + 'type': _fileitm.type + } + except Exception as e: + logger.debug(f"Snapshot error for {_fileitm.path}: {e}") fileitem = self.get_item(path) if not fileitem: diff --git a/app/modules/filemanager/storages/alist.py b/app/modules/filemanager/storages/alist.py index 76bdd7df..a356c490 100644 --- a/app/modules/filemanager/storages/alist.py +++ b/app/modules/filemanager/storages/alist.py @@ -1,7 +1,7 @@ import json from datetime import datetime from pathlib import Path -from typing import Optional, List, Dict +from typing import Optional, List import requests @@ -710,30 +710,6 @@ class Alist(StorageBase, metaclass=Singleton): """ pass - def snapshot(self, path: Path) -> Dict[str, float]: - """ - 快照文件系统,输出所有层级文件信息(不含目录) - """ - files_info = {} - - def __snapshot_file(_fileitm: schemas.FileItem): - """ - 递归获取文件信息 - """ - if _fileitm.type == "dir": - for sub_file in self.list(_fileitm): - __snapshot_file(sub_file) - else: - files_info[_fileitm.path] = _fileitm.size - - fileitem = self.get_item(path) - if not fileitem: - return {} - - __snapshot_file(fileitem) - - return files_info - @staticmethod def __parse_timestamp(time_str: str) -> float: """ diff --git a/app/monitor.py b/app/monitor.py index 8417d2fa..710dac75 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -1,10 +1,12 @@ +import json import platform import re import threading +import time import traceback from pathlib import Path from threading import Lock -from typing import Any, Optional +from typing import Any, Optional, Dict, List from apscheduler.schedulers.background import BackgroundScheduler from cachetools import TTLCache @@ -65,8 +67,8 @@ class Monitor(metaclass=Singleton): # 定时服务 _scheduler = None - # 存储快照 - _storage_snapshot = {} + # 存储快照缓存目录 + _snapshot_cache_dir = None # 存储过照间隔(分钟) _snapshot_interval = 5 @@ -77,6 +79,9 @@ class Monitor(metaclass=Singleton): def __init__(self): super().__init__() self.all_exts = settings.RMT_MEDIAEXT + # 初始化快照缓存目录 + self._snapshot_cache_dir = settings.TEMP_PATH / "snapshots" + self._snapshot_cache_dir.mkdir(exist_ok=True) # 启动目录监控和文件整理 self.init() @@ -94,6 +99,96 @@ class Monitor(metaclass=Singleton): logger.info("配置变更事件触发,重新初始化目录监控...") self.init() + def save_snapshot(self, storage: str, snapshot: Dict, file_count: int = 0): + """ + 保存快照到文件 + :param storage: 存储名称 + :param snapshot: 快照数据 + :param file_count: 文件数量,用于调整监控间隔 + """ + try: + cache_file = self._snapshot_cache_dir / f"{storage}_snapshot.json" + snapshot_data = { + 'timestamp': time.time(), + 'file_count': file_count, + 'snapshot': snapshot + } + with open(cache_file, 'w', encoding='utf-8') as f: + json.dump(snapshot_data, f, ensure_ascii=False, indent=2) + logger.debug(f"快照已保存到 {cache_file}") + except Exception as e: + logger.error(f"保存快照失败: {e}") + + def load_snapshot(self, storage: str) -> Optional[Dict]: + """ + 从文件加载快照 + :param storage: 存储名称 + :return: 快照数据或None + """ + try: + cache_file = self._snapshot_cache_dir / f"{storage}_snapshot.json" + if cache_file.exists(): + with open(cache_file, 'r', encoding='utf-8') as f: + data = json.load(f) + return data + return None + except Exception as e: + logger.error(f"加载快照失败: {e}") + return None + + @staticmethod + def adjust_monitor_interval(file_count: int) -> int: + """ + 根据文件数量动态调整监控间隔 + :param file_count: 文件数量 + :return: 监控间隔(分钟) + """ + if file_count < 100: + return 5 # 5分钟 + elif file_count < 500: + return 10 # 10分钟 + elif file_count < 1000: + return 15 # 15分钟 + else: + return 30 # 30分钟 + + @staticmethod + def compare_snapshots(old_snapshot: Dict, new_snapshot: Dict) -> Dict[str, List]: + """ + 比对快照,找出变化的文件(只处理新增和修改,不处理删除) + :param old_snapshot: 旧快照 + :param new_snapshot: 新快照 + :return: 变化信息 + """ + changes = { + 'added': [], + 'modified': [] + } + + old_files = set(old_snapshot.keys()) + new_files = set(new_snapshot.keys()) + + # 新增文件 + changes['added'] = list(new_files - old_files) + + # 修改文件(大小或时间变化) + for file_path in old_files & new_files: + old_info = old_snapshot[file_path] + new_info = new_snapshot[file_path] + + # 检查文件大小变化 + old_size = old_info.get('size', 0) if isinstance(old_info, dict) else old_info + new_size = new_info.get('size', 0) if isinstance(new_info, dict) else new_info + + # 检查修改时间变化(如果有的话) + old_time = old_info.get('modify_time', 0) if isinstance(old_info, dict) else 0 + new_time = new_info.get('modify_time', 0) if isinstance(new_info, dict) else 0 + + if old_size != new_size or (old_time and new_time and old_time != new_time): + changes['modified'].append(file_path) + + return changes + def init(self): """ 启动监控 @@ -155,12 +250,25 @@ class Monitor(metaclass=Singleton): logger.error(f"{mon_path} 启动目录监控失败:{err_msg}") messagehelper.put(f"{mon_path} 启动目录监控失败:{err_msg}", title="目录监控") else: - # 远程目录监控 - self._scheduler.add_job(self.polling_observer, 'interval', minutes=self._snapshot_interval, - kwargs={ - 'storage': mon_dir.storage, - 'mon_path': mon_path - }) + # 远程目录监控 - 使用智能间隔 + # 先尝试加载已有快照获取文件数量 + snapshot_data = self.load_snapshot(mon_dir.storage) + file_count = snapshot_data.get('file_count', 0) if snapshot_data else 0 + interval = self.adjust_monitor_interval(file_count) + + self._scheduler.add_job( + self.polling_observer, + 'interval', + minutes=interval, + kwargs={ + 'storage': mon_dir.storage, + 'mon_path': mon_path + }, + id=f"monitor_{mon_dir.storage}_{mon_dir.download_path}", + replace_existing=True + ) + logger.info(f"已启动 {mon_path} 的远程目录监控,存储:{mon_dir.storage},间隔:{interval}分钟") + # 启动定时服务 if self._scheduler.get_jobs(): self._scheduler.print_jobs() @@ -189,23 +297,73 @@ class Monitor(metaclass=Singleton): def polling_observer(self, storage: str, mon_path: Path): """ - 轮询监控 + 轮询监控(改进版) """ with snapshot_lock: - # 快照存储 - new_snapshot = StorageChain().snapshot_storage(storage=storage, path=mon_path) - if new_snapshot: - # 比较快照 - old_snapshot = self._storage_snapshot.get(storage) + try: + logger.debug(f"开始对 {storage}:{mon_path} 进行快照...") + + # 加载上次快照数据 + old_snapshot_data = self.load_snapshot(storage) + old_snapshot = old_snapshot_data.get('snapshot', {}) if old_snapshot_data else {} + last_snapshot_time = old_snapshot_data.get('timestamp', 0) if old_snapshot_data else 0 + + # 生成新快照(增量模式) + new_snapshot = StorageChain().snapshot_storage( + storage=storage, + path=mon_path, + last_snapshot_time=last_snapshot_time + ) + + if new_snapshot is None: + logger.warn(f"获取 {storage}:{mon_path} 快照失败") + return + + file_count = len(new_snapshot) + logger.info(f"{storage}:{mon_path} 快照完成,发现 {file_count} 个文件") + if old_snapshot: - # 新增的文件 - new_files = new_snapshot.keys() - old_snapshot.keys() - for new_file in new_files: - # 添加到待整理队列 - self.__handle_file(storage=storage, event_path=Path(new_file), - file_size=new_snapshot.get(new_file)) - # 更新快照 - self._storage_snapshot[storage] = new_snapshot + # 比较快照找出变化 + changes = self.compare_snapshots(old_snapshot, new_snapshot) + + # 处理新增文件 + for new_file in changes['added']: + logger.info(f"发现新增文件:{new_file}") + file_info = new_snapshot.get(new_file, {}) + file_size = file_info.get('size', 0) if isinstance(file_info, dict) else file_info + self.__handle_file(storage=storage, event_path=Path(new_file), file_size=file_size) + + # 处理修改文件 + for modified_file in changes['modified']: + logger.info(f"发现修改文件:{modified_file}") + file_info = new_snapshot.get(modified_file, {}) + file_size = file_info.get('size', 0) if isinstance(file_info, dict) else file_info + self.__handle_file(storage=storage, event_path=Path(modified_file), file_size=file_size) + + if changes['added'] or changes['modified']: + logger.info( + f"{storage}:{mon_path} 发现 {len(changes['added'])} 个新增文件,{len(changes['modified'])} 个修改文件") + else: + logger.info(f"{storage}:{mon_path} 首次快照,暂不处理文件") + + # 保存新快照 + self.save_snapshot(storage, new_snapshot, file_count) + + # 动态调整监控间隔 + new_interval = self.adjust_monitor_interval(file_count) + current_job = self._scheduler.get_job(f"monitor_{storage}_{mon_path}") + if current_job and current_job.trigger.interval.total_seconds() / 60 != new_interval: + # 重新安排任务 + self._scheduler.modify_job( + f"monitor_{storage}_{mon_path}", + trigger='interval', + minutes=new_interval + ) + logger.info(f"{storage}:{mon_path} 监控间隔已调整为 {new_interval} 分钟") + + except Exception as e: + logger.error(f"轮询监控 {storage}:{mon_path} 出现错误:{e}") + logger.debug(traceback.format_exc()) def event_handler(self, event, text: str, event_path: str, file_size: float = None): """