feat(monitor): enhance file handling with transfer history checks

This commit is contained in:
jxxghp
2026-06-26 07:02:59 +08:00
parent 52c5f2900f
commit 1b6a548dee
2 changed files with 271 additions and 160 deletions

View File

@@ -17,6 +17,7 @@ from app.chain.storage import StorageChain
from app.chain.transfer import TransferChain
from app.core.cache import TTLCache, FileCache
from app.core.config import settings
from app.db.transferhistory_oper import TransferHistoryOper
from app.helper.directory import DirectoryHelper
from app.helper.message import MessageHelper
from app.log import logger
@@ -328,10 +329,9 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
try:
if not self.__is_transfer_candidate_path(Path(file_path)):
continue
logger.info(f"处理文件:{file_path}")
file_size = file_info.get('size', 0) if isinstance(file_info, dict) else file_info
self.__handle_file(storage=storage, event_path=Path(file_path), file_size=file_size)
processed_count += 1
if self.__handle_file(storage=storage, event_path=Path(file_path), file_size=file_size):
processed_count += 1
except Exception as e:
logger.error(f"处理文件 {file_path} 失败: {e}")
continue
@@ -453,6 +453,26 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
return False
return self.__has_suffix_in(file_path, self.all_exts)
@staticmethod
def __build_transfer_src_path(event_path: Path, is_bluray_folder: bool) -> str:
"""
生成整理记录使用的源路径。
"""
if is_bluray_folder:
return f"{event_path.as_posix()}/"
return event_path.as_posix()
@staticmethod
def __has_transfer_history(storage: str, src_path: str) -> Optional[bool]:
"""
判断源文件是否已经存在整理记录。
"""
try:
return bool(TransferHistoryOper().get_by_src(src_path, storage=storage))
except Exception as err:
logger.error(f"查询整理历史失败: {src_path} - {err}")
return None
@staticmethod
def count_directory_files(directory: Path, max_check: int = 10000) -> int:
"""
@@ -761,22 +781,24 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
]
# 处理新增文件
handled_added_count = 0
for new_file in added_files:
logger.info(f"发现新增文件:{new_file}")
file_info = new_snapshot.get(new_file, {})
file_size = file_info.get('size', 0) if isinstance(file_info, dict) else file_info
self.__handle_file(storage=storage, event_path=Path(new_file), file_size=file_size)
if self.__handle_file(storage=storage, event_path=Path(new_file), file_size=file_size):
handled_added_count += 1
# 处理修改文件
handled_modified_count = 0
for modified_file in modified_files:
logger.info(f"发现修改文件:{modified_file}")
file_info = new_snapshot.get(modified_file, {})
file_size = file_info.get('size', 0) if isinstance(file_info, dict) else file_info
self.__handle_file(storage=storage, event_path=Path(modified_file), file_size=file_size)
if self.__handle_file(storage=storage, event_path=Path(modified_file), file_size=file_size):
handled_modified_count += 1
if added_files or modified_files:
if handled_added_count or handled_modified_count:
logger.info(
f"{storage} 发现 {len(added_files)} 个新增文件,{len(modified_files)} 个修改文件")
f"{storage} 发现 {handled_added_count} 个新增文件,{handled_modified_count} 个修改文件")
else:
logger.debug(f"{storage} 无文件变化")
else:
@@ -813,17 +835,16 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
if not event.is_directory:
if not self.__is_transfer_candidate_path(Path(event_path)):
return
# 文件发生变化
logger.debug(f"检测到文件变化: {event_path} [{text}]")
# 整理文件
self.__handle_file(storage="local", event_path=Path(event_path), file_size=file_size)
def __handle_file(self, storage: str, event_path: Path, file_size: float = None):
def __handle_file(self, storage: str, event_path: Path, file_size: float = None) -> bool:
"""
整理一个文件
:param storage: 存储
:param event_path: 事件文件路径
:param file_size: 文件大小
:return: 是否进入整理链
"""
# 全程加锁
with lock:
@@ -832,17 +853,27 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
if self.__is_bluray_sub(event_path):
event_path = self.__get_bluray_dir(event_path)
if not event_path:
return
return False
is_bluray_folder = True
elif not self.__is_transfer_candidate_path(event_path):
return
return False
# TTL缓存控重
if self._cache.get(str(event_path)):
logger.debug(f"文件 {event_path} 在缓存中,跳过处理")
return
return False
self._cache[str(event_path)] = True
src_path = self.__build_transfer_src_path(
event_path=event_path,
is_bluray_folder=is_bluray_folder,
)
has_transfer_history = self.__has_transfer_history(
storage=storage,
src_path=src_path,
)
if has_transfer_history is not False:
return False
try:
if is_bluray_folder:
logger.info(f"开始整理蓝光原盘: {event_path}")
@@ -852,11 +883,7 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
TransferChain().do_transfer(
fileitem=FileItem(
storage=storage,
path=(
event_path.as_posix()
if not is_bluray_folder
else event_path.as_posix() + "/"
),
path=src_path,
type="file" if not is_bluray_folder else "dir",
name=event_path.name,
basename=event_path.stem,
@@ -864,8 +891,10 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
size=file_size
)
)
return True
except Exception as e:
logger.error("目录监控整理文件发生错误:%s - %s" % (str(e), traceback.format_exc()))
return False
def stop(self):
"""

View File

@@ -1,5 +1,3 @@
import tempfile
import unittest
from pathlib import Path
from unittest.mock import MagicMock
@@ -30,160 +28,244 @@ class CallbackRecorder:
self.events.append((event, text, event_path, file_size))
class LocalDirectoryWatcherTest(unittest.TestCase):
def test_handle_changes_dispatches_added_and_modified_files(tmp_path):
"""
watchfiles 本地目录监控测试
新增和修改文件应转换成目录监控整理回调
"""
added_file = tmp_path / "a_added.mkv"
modified_file = tmp_path / "b_modified.mkv"
skipped_dir = tmp_path / "c_dir"
added_file.write_bytes(b"added")
modified_file.write_bytes(b"modified")
skipped_dir.mkdir()
def test_handle_changes_dispatches_added_and_modified_files(self):
"""
新增和修改文件应转换成目录监控整理回调。
"""
with tempfile.TemporaryDirectory() as temp_dir:
watch_dir = Path(temp_dir)
added_file = watch_dir / "a_added.mkv"
modified_file = watch_dir / "b_modified.mkv"
skipped_dir = watch_dir / "c_dir"
added_file.write_bytes(b"added")
modified_file.write_bytes(b"modified")
skipped_dir.mkdir()
callback = CallbackRecorder()
watcher = LocalDirectoryWatcher(tmp_path, callback=callback, force_polling=True)
watcher._handle_changes({
(Change.added, added_file.as_posix()),
(Change.modified, modified_file.as_posix()),
(Change.deleted, added_file.as_posix()),
(Change.added, skipped_dir.as_posix()),
})
callback = CallbackRecorder()
watcher = LocalDirectoryWatcher(watch_dir, callback=callback, force_polling=True)
watcher._handle_changes({
(Change.added, added_file.as_posix()),
(Change.modified, modified_file.as_posix()),
(Change.deleted, added_file.as_posix()),
(Change.added, skipped_dir.as_posix()),
})
self.assertEqual(2, len(callback.events))
self.assertEqual((Change.added, "新增", added_file.as_posix(), 5),
(callback.events[0][0].change_type,
callback.events[0][1],
callback.events[0][2],
callback.events[0][3]))
self.assertEqual((Change.modified, "修改", modified_file.as_posix(), 8),
(callback.events[1][0].change_type,
callback.events[1][1],
callback.events[1][2],
callback.events[1][3]))
def test_handle_changes_skips_missing_paths(self):
"""
事件到达时已经消失的路径不应触发整理。
"""
with tempfile.TemporaryDirectory() as temp_dir:
watch_dir = Path(temp_dir)
missing_file = watch_dir / "missing.mkv"
callback = CallbackRecorder()
watcher = LocalDirectoryWatcher(watch_dir, callback=callback, force_polling=True)
watcher._handle_changes({(Change.added, missing_file.as_posix())})
self.assertEqual([], callback.events)
assert len(callback.events) == 2
assert (
Change.added,
"新增",
added_file.as_posix(),
5,
) == (
callback.events[0][0].change_type,
callback.events[0][1],
callback.events[0][2],
callback.events[0][3],
)
assert (
Change.modified,
"修改",
modified_file.as_posix(),
8,
) == (
callback.events[1][0].change_type,
callback.events[1][1],
callback.events[1][2],
callback.events[1][3],
)
class MonitorWatchfilesEventTest(unittest.TestCase):
def test_handle_changes_skips_missing_paths(tmp_path):
"""
Monitor 对 watchfiles 事件的兼容处理测试
事件到达时已经消失的路径不应触发整理
"""
missing_file = tmp_path / "missing.mkv"
def test_event_handler_routes_file_events_to_transfer_handler(self):
callback = CallbackRecorder()
watcher = LocalDirectoryWatcher(tmp_path, callback=callback, force_polling=True)
watcher._handle_changes({(Change.added, missing_file.as_posix())})
assert callback.events == []
def test_event_handler_routes_file_events_to_transfer_handler():
"""
文件事件应继续按 local 存储交给整理流程。
"""
monitor = object.__new__(Monitor)
monitor.all_exts = [".mkv"]
handle_file = MagicMock()
setattr(monitor, "_Monitor__handle_file", handle_file)
event_path = Path("/downloads/movie.mkv")
event = DirectoryChangeEvent(
change_type=Change.added,
src_path=event_path.as_posix(),
is_directory=False
)
monitor.event_handler(
event=event,
text="新增",
event_path=event_path.as_posix(),
file_size=1024
)
handle_file.assert_called_once_with(
storage="local",
event_path=event_path,
file_size=1024
)
def test_event_handler_ignores_directory_events():
"""
目录事件不应进入文件整理流程。
"""
monitor = object.__new__(Monitor)
monitor.all_exts = [".mkv"]
handle_file = MagicMock()
setattr(monitor, "_Monitor__handle_file", handle_file)
event_path = Path("/downloads/folder")
event = DirectoryChangeEvent(
change_type=Change.added,
src_path=event_path.as_posix(),
is_directory=True
)
monitor.event_handler(
event=event,
text="新增",
event_path=event_path.as_posix()
)
handle_file.assert_not_called()
def test_event_handler_ignores_download_temp_files():
"""
下载器临时文件不应进入整理流程。
"""
monitor = object.__new__(Monitor)
monitor.all_exts = [".mkv"]
handle_file = MagicMock()
setattr(monitor, "_Monitor__handle_file", handle_file)
event_path = Path("/downloads/movie.mkv.!qB")
event = DirectoryChangeEvent(
change_type=Change.modified,
src_path=event_path.as_posix(),
is_directory=False
)
monitor.event_handler(
event=event,
text="修改",
event_path=event_path.as_posix(),
file_size=1024
)
handle_file.assert_not_called()
def test_event_handler_ignores_non_transferable_files():
"""
非可整理后缀文件不应进入整理流程。
"""
monitor = object.__new__(Monitor)
monitor.all_exts = [".mkv"]
handle_file = MagicMock()
setattr(monitor, "_Monitor__handle_file", handle_file)
event_path = Path("/downloads/movie.nfo")
event = DirectoryChangeEvent(
change_type=Change.added,
src_path=event_path.as_posix(),
is_directory=False
)
monitor.event_handler(
event=event,
text="新增",
event_path=event_path.as_posix(),
file_size=1024
)
handle_file.assert_not_called()
def test_handle_file_skips_transfer_when_history_exists(monkeypatch):
"""
已有整理记录的源文件不应再次进入整理链。
"""
monitor = object.__new__(Monitor)
monitor.all_exts = [".mkv"]
monitor._cache = {}
event_path = Path("/downloads/movie.mkv")
lookups = []
class FakeTransferHistoryOper:
"""
文件事件应继续按 local 存储交给整理流程
测试用整理历史查询
"""
monitor = object.__new__(Monitor)
monitor.all_exts = [".mkv"]
handle_file = MagicMock()
setattr(monitor, "_Monitor__handle_file", handle_file)
event_path = Path("/downloads/movie.mkv")
event = DirectoryChangeEvent(
change_type=Change.added,
src_path=event_path.as_posix(),
is_directory=False
)
monitor.event_handler(
event=event,
text="新增",
event_path=event_path.as_posix(),
file_size=1024
)
def get_by_src(self, src: str, storage: str = None):
"""
记录查询参数并返回已有记录。
"""
lookups.append((src, storage))
return object()
handle_file.assert_called_once_with(
storage="local",
event_path=event_path,
file_size=1024
)
transfer_chain = MagicMock()
logger_info = MagicMock()
logger_debug = MagicMock()
monkeypatch.setattr("app.monitor.TransferHistoryOper", FakeTransferHistoryOper)
monkeypatch.setattr("app.monitor.TransferChain", transfer_chain)
monkeypatch.setattr("app.monitor.logger.info", logger_info)
monkeypatch.setattr("app.monitor.logger.debug", logger_debug)
def test_event_handler_ignores_directory_events(self):
handled = monitor._Monitor__handle_file(
storage="local",
event_path=event_path,
file_size=1024,
)
assert not handled
assert lookups == [(event_path.as_posix(), "local")]
transfer_chain.assert_not_called()
logger_info.assert_not_called()
logger_debug.assert_not_called()
def test_handle_file_invokes_transfer_when_history_missing(monkeypatch):
"""
没有整理记录的源文件应继续进入整理链。
"""
monitor = object.__new__(Monitor)
monitor.all_exts = [".mkv"]
monitor._cache = {}
event_path = Path("/downloads/movie.mkv")
class FakeTransferHistoryOper:
"""
目录事件不应进入文件整理流程
测试用空整理历史查询
"""
monitor = object.__new__(Monitor)
monitor.all_exts = [".mkv"]
handle_file = MagicMock()
setattr(monitor, "_Monitor__handle_file", handle_file)
event_path = Path("/downloads/folder")
event = DirectoryChangeEvent(
change_type=Change.added,
src_path=event_path.as_posix(),
is_directory=True
)
monitor.event_handler(
event=event,
text="新增",
event_path=event_path.as_posix()
)
def get_by_src(self, src: str, storage: str = None):
"""
返回空整理记录。
"""
return None
handle_file.assert_not_called()
transfer_chain_instance = MagicMock()
transfer_chain = MagicMock(return_value=transfer_chain_instance)
monkeypatch.setattr("app.monitor.TransferHistoryOper", FakeTransferHistoryOper)
monkeypatch.setattr("app.monitor.TransferChain", transfer_chain)
def test_event_handler_ignores_download_temp_files(self):
"""
下载器临时文件不应进入整理流程。
"""
monitor = object.__new__(Monitor)
monitor.all_exts = [".mkv"]
handle_file = MagicMock()
setattr(monitor, "_Monitor__handle_file", handle_file)
event_path = Path("/downloads/movie.mkv.!qB")
event = DirectoryChangeEvent(
change_type=Change.modified,
src_path=event_path.as_posix(),
is_directory=False
)
handled = monitor._Monitor__handle_file(
storage="local",
event_path=event_path,
file_size=1024,
)
monitor.event_handler(
event=event,
text="修改",
event_path=event_path.as_posix(),
file_size=1024
)
handle_file.assert_not_called()
def test_event_handler_ignores_non_transferable_files(self):
"""
非可整理后缀文件不应进入整理流程。
"""
monitor = object.__new__(Monitor)
monitor.all_exts = [".mkv"]
handle_file = MagicMock()
setattr(monitor, "_Monitor__handle_file", handle_file)
event_path = Path("/downloads/movie.nfo")
event = DirectoryChangeEvent(
change_type=Change.added,
src_path=event_path.as_posix(),
is_directory=False
)
monitor.event_handler(
event=event,
text="新增",
event_path=event_path.as_posix(),
file_size=1024
)
handle_file.assert_not_called()
assert handled
transfer_chain_instance.do_transfer.assert_called_once()
fileitem = transfer_chain_instance.do_transfer.call_args.kwargs["fileitem"]
assert fileitem.storage == "local"
assert fileitem.path == event_path.as_posix()
assert fileitem.size == 1024