From 1b6a548dee3df14d860406f55685fc88e49cb086 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Fri, 26 Jun 2026 07:02:59 +0800 Subject: [PATCH] feat(monitor): enhance file handling with transfer history checks --- app/monitor.py | 71 ++++-- tests/test_monitor_watchfiles.py | 360 +++++++++++++++++++------------ 2 files changed, 271 insertions(+), 160 deletions(-) diff --git a/app/monitor.py b/app/monitor.py index 11d61af5..63d28786 100644 --- a/app/monitor.py +++ b/app/monitor.py @@ -17,6 +17,7 @@ from app.chain.storage import StorageChain from app.chain.transfer import TransferChain from app.core.cache import TTLCache, FileCache from app.core.config import settings +from app.db.transferhistory_oper import TransferHistoryOper from app.helper.directory import DirectoryHelper from app.helper.message import MessageHelper from app.log import logger @@ -328,10 +329,9 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass): try: if not self.__is_transfer_candidate_path(Path(file_path)): continue - logger.info(f"处理文件:{file_path}") file_size = file_info.get('size', 0) if isinstance(file_info, dict) else file_info - self.__handle_file(storage=storage, event_path=Path(file_path), file_size=file_size) - processed_count += 1 + if self.__handle_file(storage=storage, event_path=Path(file_path), file_size=file_size): + processed_count += 1 except Exception as e: logger.error(f"处理文件 {file_path} 失败: {e}") continue @@ -453,6 +453,26 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass): return False return self.__has_suffix_in(file_path, self.all_exts) + @staticmethod + def __build_transfer_src_path(event_path: Path, is_bluray_folder: bool) -> str: + """ + 生成整理记录使用的源路径。 + """ + if is_bluray_folder: + return f"{event_path.as_posix()}/" + return event_path.as_posix() + + @staticmethod + def __has_transfer_history(storage: str, src_path: str) -> Optional[bool]: + """ + 判断源文件是否已经存在整理记录。 + """ + try: + return bool(TransferHistoryOper().get_by_src(src_path, storage=storage)) + except Exception as err: + logger.error(f"查询整理历史失败: {src_path} - {err}") + return None + @staticmethod def count_directory_files(directory: Path, max_check: int = 10000) -> int: """ @@ -761,22 +781,24 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass): ] # 处理新增文件 + handled_added_count = 0 for new_file in added_files: - logger.info(f"发现新增文件:{new_file}") file_info = new_snapshot.get(new_file, {}) file_size = file_info.get('size', 0) if isinstance(file_info, dict) else file_info - self.__handle_file(storage=storage, event_path=Path(new_file), file_size=file_size) + if self.__handle_file(storage=storage, event_path=Path(new_file), file_size=file_size): + handled_added_count += 1 # 处理修改文件 + handled_modified_count = 0 for modified_file in modified_files: - logger.info(f"发现修改文件:{modified_file}") file_info = new_snapshot.get(modified_file, {}) file_size = file_info.get('size', 0) if isinstance(file_info, dict) else file_info - self.__handle_file(storage=storage, event_path=Path(modified_file), file_size=file_size) + if self.__handle_file(storage=storage, event_path=Path(modified_file), file_size=file_size): + handled_modified_count += 1 - if added_files or modified_files: + if handled_added_count or handled_modified_count: logger.info( - f"{storage} 发现 {len(added_files)} 个新增文件,{len(modified_files)} 个修改文件") + f"{storage} 发现 {handled_added_count} 个新增文件,{handled_modified_count} 个修改文件") else: logger.debug(f"{storage} 无文件变化") else: @@ -813,17 +835,16 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass): if not event.is_directory: if not self.__is_transfer_candidate_path(Path(event_path)): return - # 文件发生变化 - logger.debug(f"检测到文件变化: {event_path} [{text}]") # 整理文件 self.__handle_file(storage="local", event_path=Path(event_path), file_size=file_size) - def __handle_file(self, storage: str, event_path: Path, file_size: float = None): + def __handle_file(self, storage: str, event_path: Path, file_size: float = None) -> bool: """ 整理一个文件 :param storage: 存储 :param event_path: 事件文件路径 :param file_size: 文件大小 + :return: 是否进入整理链 """ # 全程加锁 with lock: @@ -832,17 +853,27 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass): if self.__is_bluray_sub(event_path): event_path = self.__get_bluray_dir(event_path) if not event_path: - return + return False is_bluray_folder = True elif not self.__is_transfer_candidate_path(event_path): - return + return False # TTL缓存控重 if self._cache.get(str(event_path)): - logger.debug(f"文件 {event_path} 在缓存中,跳过处理") - return + return False self._cache[str(event_path)] = True + src_path = self.__build_transfer_src_path( + event_path=event_path, + is_bluray_folder=is_bluray_folder, + ) + has_transfer_history = self.__has_transfer_history( + storage=storage, + src_path=src_path, + ) + if has_transfer_history is not False: + return False + try: if is_bluray_folder: logger.info(f"开始整理蓝光原盘: {event_path}") @@ -852,11 +883,7 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass): TransferChain().do_transfer( fileitem=FileItem( storage=storage, - path=( - event_path.as_posix() - if not is_bluray_folder - else event_path.as_posix() + "/" - ), + path=src_path, type="file" if not is_bluray_folder else "dir", name=event_path.name, basename=event_path.stem, @@ -864,8 +891,10 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass): size=file_size ) ) + return True except Exception as e: logger.error("目录监控整理文件发生错误:%s - %s" % (str(e), traceback.format_exc())) + return False def stop(self): """ diff --git a/tests/test_monitor_watchfiles.py b/tests/test_monitor_watchfiles.py index 0417f5e9..c02d9e73 100644 --- a/tests/test_monitor_watchfiles.py +++ b/tests/test_monitor_watchfiles.py @@ -1,5 +1,3 @@ -import tempfile -import unittest from pathlib import Path from unittest.mock import MagicMock @@ -30,160 +28,244 @@ class CallbackRecorder: self.events.append((event, text, event_path, file_size)) -class LocalDirectoryWatcherTest(unittest.TestCase): +def test_handle_changes_dispatches_added_and_modified_files(tmp_path): """ - watchfiles 本地目录监控测试。 + 新增和修改文件应转换成目录监控整理回调。 """ + added_file = tmp_path / "a_added.mkv" + modified_file = tmp_path / "b_modified.mkv" + skipped_dir = tmp_path / "c_dir" + added_file.write_bytes(b"added") + modified_file.write_bytes(b"modified") + skipped_dir.mkdir() - def test_handle_changes_dispatches_added_and_modified_files(self): - """ - 新增和修改文件应转换成目录监控整理回调。 - """ - with tempfile.TemporaryDirectory() as temp_dir: - watch_dir = Path(temp_dir) - added_file = watch_dir / "a_added.mkv" - modified_file = watch_dir / "b_modified.mkv" - skipped_dir = watch_dir / "c_dir" - added_file.write_bytes(b"added") - modified_file.write_bytes(b"modified") - skipped_dir.mkdir() + callback = CallbackRecorder() + watcher = LocalDirectoryWatcher(tmp_path, callback=callback, force_polling=True) + watcher._handle_changes({ + (Change.added, added_file.as_posix()), + (Change.modified, modified_file.as_posix()), + (Change.deleted, added_file.as_posix()), + (Change.added, skipped_dir.as_posix()), + }) - callback = CallbackRecorder() - watcher = LocalDirectoryWatcher(watch_dir, callback=callback, force_polling=True) - watcher._handle_changes({ - (Change.added, added_file.as_posix()), - (Change.modified, modified_file.as_posix()), - (Change.deleted, added_file.as_posix()), - (Change.added, skipped_dir.as_posix()), - }) - - self.assertEqual(2, len(callback.events)) - self.assertEqual((Change.added, "新增", added_file.as_posix(), 5), - (callback.events[0][0].change_type, - callback.events[0][1], - callback.events[0][2], - callback.events[0][3])) - self.assertEqual((Change.modified, "修改", modified_file.as_posix(), 8), - (callback.events[1][0].change_type, - callback.events[1][1], - callback.events[1][2], - callback.events[1][3])) - - def test_handle_changes_skips_missing_paths(self): - """ - 事件到达时已经消失的路径不应触发整理。 - """ - with tempfile.TemporaryDirectory() as temp_dir: - watch_dir = Path(temp_dir) - missing_file = watch_dir / "missing.mkv" - - callback = CallbackRecorder() - watcher = LocalDirectoryWatcher(watch_dir, callback=callback, force_polling=True) - watcher._handle_changes({(Change.added, missing_file.as_posix())}) - - self.assertEqual([], callback.events) + assert len(callback.events) == 2 + assert ( + Change.added, + "新增", + added_file.as_posix(), + 5, + ) == ( + callback.events[0][0].change_type, + callback.events[0][1], + callback.events[0][2], + callback.events[0][3], + ) + assert ( + Change.modified, + "修改", + modified_file.as_posix(), + 8, + ) == ( + callback.events[1][0].change_type, + callback.events[1][1], + callback.events[1][2], + callback.events[1][3], + ) -class MonitorWatchfilesEventTest(unittest.TestCase): +def test_handle_changes_skips_missing_paths(tmp_path): """ - Monitor 对 watchfiles 事件的兼容处理测试。 + 事件到达时已经消失的路径不应触发整理。 """ + missing_file = tmp_path / "missing.mkv" - def test_event_handler_routes_file_events_to_transfer_handler(self): + callback = CallbackRecorder() + watcher = LocalDirectoryWatcher(tmp_path, callback=callback, force_polling=True) + watcher._handle_changes({(Change.added, missing_file.as_posix())}) + + assert callback.events == [] + + +def test_event_handler_routes_file_events_to_transfer_handler(): + """ + 文件事件应继续按 local 存储交给整理流程。 + """ + monitor = object.__new__(Monitor) + monitor.all_exts = [".mkv"] + handle_file = MagicMock() + setattr(monitor, "_Monitor__handle_file", handle_file) + event_path = Path("/downloads/movie.mkv") + event = DirectoryChangeEvent( + change_type=Change.added, + src_path=event_path.as_posix(), + is_directory=False + ) + + monitor.event_handler( + event=event, + text="新增", + event_path=event_path.as_posix(), + file_size=1024 + ) + + handle_file.assert_called_once_with( + storage="local", + event_path=event_path, + file_size=1024 + ) + + +def test_event_handler_ignores_directory_events(): + """ + 目录事件不应进入文件整理流程。 + """ + monitor = object.__new__(Monitor) + monitor.all_exts = [".mkv"] + handle_file = MagicMock() + setattr(monitor, "_Monitor__handle_file", handle_file) + event_path = Path("/downloads/folder") + event = DirectoryChangeEvent( + change_type=Change.added, + src_path=event_path.as_posix(), + is_directory=True + ) + + monitor.event_handler( + event=event, + text="新增", + event_path=event_path.as_posix() + ) + + handle_file.assert_not_called() + + +def test_event_handler_ignores_download_temp_files(): + """ + 下载器临时文件不应进入整理流程。 + """ + monitor = object.__new__(Monitor) + monitor.all_exts = [".mkv"] + handle_file = MagicMock() + setattr(monitor, "_Monitor__handle_file", handle_file) + event_path = Path("/downloads/movie.mkv.!qB") + event = DirectoryChangeEvent( + change_type=Change.modified, + src_path=event_path.as_posix(), + is_directory=False + ) + + monitor.event_handler( + event=event, + text="修改", + event_path=event_path.as_posix(), + file_size=1024 + ) + + handle_file.assert_not_called() + + +def test_event_handler_ignores_non_transferable_files(): + """ + 非可整理后缀文件不应进入整理流程。 + """ + monitor = object.__new__(Monitor) + monitor.all_exts = [".mkv"] + handle_file = MagicMock() + setattr(monitor, "_Monitor__handle_file", handle_file) + event_path = Path("/downloads/movie.nfo") + event = DirectoryChangeEvent( + change_type=Change.added, + src_path=event_path.as_posix(), + is_directory=False + ) + + monitor.event_handler( + event=event, + text="新增", + event_path=event_path.as_posix(), + file_size=1024 + ) + + handle_file.assert_not_called() + + +def test_handle_file_skips_transfer_when_history_exists(monkeypatch): + """ + 已有整理记录的源文件不应再次进入整理链。 + """ + monitor = object.__new__(Monitor) + monitor.all_exts = [".mkv"] + monitor._cache = {} + event_path = Path("/downloads/movie.mkv") + lookups = [] + + class FakeTransferHistoryOper: """ - 文件事件应继续按 local 存储交给整理流程。 + 测试用整理历史查询。 """ - monitor = object.__new__(Monitor) - monitor.all_exts = [".mkv"] - handle_file = MagicMock() - setattr(monitor, "_Monitor__handle_file", handle_file) - event_path = Path("/downloads/movie.mkv") - event = DirectoryChangeEvent( - change_type=Change.added, - src_path=event_path.as_posix(), - is_directory=False - ) - monitor.event_handler( - event=event, - text="新增", - event_path=event_path.as_posix(), - file_size=1024 - ) + def get_by_src(self, src: str, storage: str = None): + """ + 记录查询参数并返回已有记录。 + """ + lookups.append((src, storage)) + return object() - handle_file.assert_called_once_with( - storage="local", - event_path=event_path, - file_size=1024 - ) + transfer_chain = MagicMock() + logger_info = MagicMock() + logger_debug = MagicMock() + monkeypatch.setattr("app.monitor.TransferHistoryOper", FakeTransferHistoryOper) + monkeypatch.setattr("app.monitor.TransferChain", transfer_chain) + monkeypatch.setattr("app.monitor.logger.info", logger_info) + monkeypatch.setattr("app.monitor.logger.debug", logger_debug) - def test_event_handler_ignores_directory_events(self): + handled = monitor._Monitor__handle_file( + storage="local", + event_path=event_path, + file_size=1024, + ) + + assert not handled + assert lookups == [(event_path.as_posix(), "local")] + transfer_chain.assert_not_called() + logger_info.assert_not_called() + logger_debug.assert_not_called() + + +def test_handle_file_invokes_transfer_when_history_missing(monkeypatch): + """ + 没有整理记录的源文件应继续进入整理链。 + """ + monitor = object.__new__(Monitor) + monitor.all_exts = [".mkv"] + monitor._cache = {} + event_path = Path("/downloads/movie.mkv") + + class FakeTransferHistoryOper: """ - 目录事件不应进入文件整理流程。 + 测试用空整理历史查询。 """ - monitor = object.__new__(Monitor) - monitor.all_exts = [".mkv"] - handle_file = MagicMock() - setattr(monitor, "_Monitor__handle_file", handle_file) - event_path = Path("/downloads/folder") - event = DirectoryChangeEvent( - change_type=Change.added, - src_path=event_path.as_posix(), - is_directory=True - ) - monitor.event_handler( - event=event, - text="新增", - event_path=event_path.as_posix() - ) + def get_by_src(self, src: str, storage: str = None): + """ + 返回空整理记录。 + """ + return None - handle_file.assert_not_called() + transfer_chain_instance = MagicMock() + transfer_chain = MagicMock(return_value=transfer_chain_instance) + monkeypatch.setattr("app.monitor.TransferHistoryOper", FakeTransferHistoryOper) + monkeypatch.setattr("app.monitor.TransferChain", transfer_chain) - def test_event_handler_ignores_download_temp_files(self): - """ - 下载器临时文件不应进入整理流程。 - """ - monitor = object.__new__(Monitor) - monitor.all_exts = [".mkv"] - handle_file = MagicMock() - setattr(monitor, "_Monitor__handle_file", handle_file) - event_path = Path("/downloads/movie.mkv.!qB") - event = DirectoryChangeEvent( - change_type=Change.modified, - src_path=event_path.as_posix(), - is_directory=False - ) + handled = monitor._Monitor__handle_file( + storage="local", + event_path=event_path, + file_size=1024, + ) - monitor.event_handler( - event=event, - text="修改", - event_path=event_path.as_posix(), - file_size=1024 - ) - - handle_file.assert_not_called() - - def test_event_handler_ignores_non_transferable_files(self): - """ - 非可整理后缀文件不应进入整理流程。 - """ - monitor = object.__new__(Monitor) - monitor.all_exts = [".mkv"] - handle_file = MagicMock() - setattr(monitor, "_Monitor__handle_file", handle_file) - event_path = Path("/downloads/movie.nfo") - event = DirectoryChangeEvent( - change_type=Change.added, - src_path=event_path.as_posix(), - is_directory=False - ) - - monitor.event_handler( - event=event, - text="新增", - event_path=event_path.as_posix(), - file_size=1024 - ) - - handle_file.assert_not_called() + assert handled + transfer_chain_instance.do_transfer.assert_called_once() + fileitem = transfer_chain_instance.do_transfer.call_args.kwargs["fileitem"] + assert fileitem.storage == "local" + assert fileitem.path == event_path.as_posix() + assert fileitem.size == 1024