fix: 优化同名字幕附加整理

This commit is contained in:
jxxghp
2026-06-23 12:07:47 +08:00
parent d18c2d6f72
commit d7aa66853a
3 changed files with 703 additions and 154 deletions

View File

@@ -65,6 +65,48 @@ job_lock = threading.Lock()
# 任务锁
task_lock = threading.Lock()
# 字幕文件常见的语言/默认/强制标记,整理同名字幕时只允许剥离这些字幕专属尾缀。
SUBTITLE_STEM_TAGS = {
"cc",
"chi",
"chs",
"cht",
"cn",
"default",
"en",
"eng",
"english",
"forced",
"gb",
"gb2312",
"hk",
"ja",
"jap",
"japanese",
"jp",
"jpn",
"sc",
"sdh",
"tc",
"zh",
"zh-cn",
"zh-hans",
"zh-hant",
"zh-tw",
"zh_cn",
"zh_hans",
"zh_hant",
"zh_tw",
"zho",
"中英",
"中字",
"双语",
"简中",
"简体",
"繁中",
"繁体",
}
class JobManager:
"""
@@ -2274,69 +2316,67 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
return False
return True
def __get_sync_extra_fileitems(
@staticmethod
def __get_file_key(fileitem: FileItem) -> Tuple[str, str]:
"""
获取文件缓存键。
"""
normalized_path = Path(str(fileitem.path).replace("\\", "/")).as_posix()
return fileitem.storage or "local", normalized_path
@staticmethod
def __get_file_stem(fileitem: FileItem) -> str:
"""
获取文件主干名,用于判断同名附加文件。
"""
file_name = fileitem.name or Path(fileitem.path).name
return Path(file_name).stem.lower()
@classmethod
def __get_subtitle_media_stem(cls, subtitle_fileitem: FileItem) -> str:
"""
获取字幕对应主视频的候选主干名。
"""
current_stem = cls.__get_file_stem(subtitle_fileitem)
while current_stem:
media_stem, separator, suffix = current_stem.rpartition(".")
if not separator or suffix not in SUBTITLE_STEM_TAGS:
return current_stem
current_stem = media_stem
return current_stem
def __get_extra_media_stem(self, extra_fileitem: FileItem) -> str:
"""
获取附加文件对应主视频的候选主干名。
"""
if self.__is_subtitle_file(extra_fileitem):
return self.__get_subtitle_media_stem(extra_fileitem)
return self.__get_file_stem(extra_fileitem)
def __get_related_main_file_key(
self,
main_fileitem: FileItem,
main_meta: MetaBase,
meta_factory: Callable[[Path], Optional[MetaBase]],
predicate: Optional[Callable[[FileItem, bool], bool]] = None,
extra_cache: Optional[Dict[Tuple[str, str], List[FileItem]]] = None,
) -> List[Tuple[FileItem, bool]]:
extra_fileitem: FileItem,
main_fileitems: List[FileItem],
) -> Optional[Tuple[str, str]]:
"""
获取与当前主视频识别信息一致的同目录附加文件。
获取与附加文件名完全匹配的主视频键
"""
if (
not main_fileitem
or main_fileitem.type != "file"
or not self.__is_media_file(main_fileitem)
or not main_meta
if not (
self.__is_subtitle_file(extra_fileitem)
or self.__is_audio_file(extra_fileitem)
):
return []
return None
parent_key = self.__get_file_parent_key(main_fileitem)
if extra_cache is not None and parent_key in extra_cache:
extra_candidates = extra_cache[parent_key]
else:
storagechain = StorageChain()
parent_item = storagechain.get_parent_item(main_fileitem)
if not parent_item:
logger.debug(f"{main_fileitem.path} 未找到父目录,跳过同步整理附加文件")
return []
extra_media_stem = self.__get_extra_media_stem(extra_fileitem)
matched_items: List[FileItem] = []
for main_fileitem in main_fileitems:
main_stem = self.__get_file_stem(main_fileitem)
if main_stem and main_stem == extra_media_stem:
matched_items.append(main_fileitem)
parent_key = self.__get_dir_key(parent_item)
extra_candidates: List[FileItem] = []
for item in storagechain.list_files(parent_item, recursion=False) or []:
if (
not item
or item.type != "file"
or not (
self.__is_subtitle_file(item)
or self.__is_audio_file(item)
)
):
continue
if predicate and not predicate(item, False):
continue
extra_candidates.append(item)
if extra_cache is not None:
extra_cache[parent_key] = extra_candidates
extra_fileitems: List[Tuple[FileItem, bool]] = []
for item in extra_candidates:
if item.path == main_fileitem.path:
continue
extra_meta = meta_factory(Path(item.path))
# 不能直接按文件名判断归属,必须基于解析后的媒体身份和季集信息。
if self.__is_same_media_meta(main_meta, extra_meta):
extra_fileitems.append((item, False))
if extra_fileitems:
logger.info(
f"{main_fileitem.path} 同步匹配到 {len(extra_fileitems)} 个附加文件"
)
return extra_fileitems
if len(matched_items) != 1:
return None
return self.__get_file_key(matched_items[0])
@staticmethod
def __normalize_dir_path(dir_path: Union[str, Path]) -> str:
@@ -2561,6 +2601,234 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
return False
return True
def _build_main_meta(
main_fileitem: FileItem,
main_bluray_dir: bool,
download_history_oper: DownloadHistoryOper,
) -> Optional[MetaBase]:
"""
构建主视频元数据。
"""
main_path = Path(main_fileitem.path)
main_download_history = self._resolve_download_history(
downloadhis=download_history_oper,
file_path=main_path,
bluray_dir=main_bluray_dir,
download_hash=download_hash,
)
return _build_file_meta(
main_path,
custom_word_list=_get_subscribe_custom_words(main_download_history),
)
def _append_item(
planned_items: List[Tuple[FileItem, bool]],
seen_file_keys: set[Tuple[str, str]],
item: FileItem,
is_bluray_dir: bool,
) -> bool:
"""
添加待整理文件项并去重。
"""
file_key = self.__get_file_key(item)
if file_key in seen_file_keys:
return False
planned_items.append((item, is_bluray_dir))
seen_file_keys.add(file_key)
return True
def _build_directory_index(
items: List[Tuple[FileItem, bool]]
) -> Tuple[
Dict[Tuple[str, str], List[FileItem]],
Dict[Tuple[str, str], List[Tuple[FileItem, bool]]],
]:
"""
基于已遍历结果构建同目录主视频和附加文件索引。
"""
main_items_by_dir: Dict[Tuple[str, str], List[FileItem]] = {}
extra_items_by_dir: Dict[Tuple[str, str], List[Tuple[FileItem, bool]]] = {}
for item, is_bluray_dir in items:
if not item or item.type != "file":
continue
dir_key = self.__get_file_parent_key(item)
if not is_bluray_dir and self.__is_media_file(item):
main_items_by_dir.setdefault(dir_key, []).append(item)
elif self.__is_subtitle_file(item) or self.__is_audio_file(item):
extra_items_by_dir.setdefault(dir_key, []).append((item, is_bluray_dir))
return main_items_by_dir, extra_items_by_dir
def _get_single_file_sibling_items(
current_fileitem: FileItem,
) -> Tuple[List[FileItem], List[Tuple[FileItem, bool]]]:
"""
单文件整理时只额外读取一次父目录,收集同目录主视频和附加文件。
"""
storagechain = StorageChain()
if not hasattr(storagechain, "get_parent_item") or not hasattr(
storagechain, "list_files"
):
return [], []
parent_item = storagechain.get_parent_item(current_fileitem)
if not parent_item:
return [], []
main_fileitems: List[FileItem] = []
extra_items: List[Tuple[FileItem, bool]] = []
for item in storagechain.list_files(parent_item, recursion=False) or []:
if not item or item.type != "file":
continue
if self.__is_media_file(item):
main_fileitems.append(item)
continue
if not (self.__is_subtitle_file(item) or self.__is_audio_file(item)):
continue
if not _filter(item, False):
continue
extra_items.append((item, False))
return main_fileitems, extra_items
def _plan_file_items(
items: List[Tuple[FileItem, bool]]
) -> Tuple[List[Tuple[FileItem, bool]], Dict[Tuple[str, str], MetaBase]]:
"""
生成最终整理顺序:主视频优先,同名附加文件跟随,剩余附加文件最后处理。
"""
if not items:
return [], {}
download_history_oper = DownloadHistoryOper()
inherited_map: Dict[Tuple[str, str], MetaBase] = {}
main_items_by_dir, extra_items_by_dir = _build_directory_index(items)
main_items = [
(item, is_bluray_dir)
for item, is_bluray_dir in items
if item
and (is_bluray_dir or (item.type == "file" and self.__is_media_file(item)))
]
single_file_mode = len(items) == 1 and fileitem.type == "file"
if single_file_mode:
current_item, current_bluray_dir = items[0]
if current_item.type == "file":
sibling_main_items, sibling_extra_items = _get_single_file_sibling_items(
current_item
)
current_dir_key = self.__get_file_parent_key(current_item)
if not current_bluray_dir and self.__is_media_file(current_item):
main_items = [(current_item, current_bluray_dir)]
main_items_by_dir[current_dir_key] = [current_item]
extra_items_by_dir[current_dir_key] = sibling_extra_items
elif self.__is_subtitle_file(current_item) or self.__is_audio_file(current_item):
related_main_file_key = self.__get_related_main_file_key(
extra_fileitem=current_item,
main_fileitems=sibling_main_items,
)
related_main_fileitem = next(
(
main_item
for main_item in sibling_main_items
if self.__get_file_key(main_item) == related_main_file_key
),
None,
)
if related_main_fileitem:
main_meta = _build_main_meta(
related_main_fileitem,
False,
download_history_oper,
)
if main_meta:
inherited_map[self.__get_file_key(current_item)] = deepcopy(main_meta)
return list(items), inherited_map
if not main_items:
return list(items), inherited_map
planned_items: List[Tuple[FileItem, bool]] = []
seen_file_keys: set[Tuple[str, str]] = set()
extra_meta_cache: Dict[Tuple[str, Tuple[str, ...]], Optional[MetaBase]] = {}
def _get_cached_extra_meta(
extra_path: Path,
custom_word_list: Optional[List[str]],
) -> Optional[MetaBase]:
"""
同一组识别词下的附加文件只解析一次。
"""
custom_words_key = tuple(custom_word_list or [])
cache_key = (extra_path.as_posix(), custom_words_key)
if cache_key not in extra_meta_cache:
extra_meta_cache[cache_key] = _build_path_meta(
extra_path,
custom_word_list=list(custom_words_key) or None,
)
return extra_meta_cache[cache_key]
for main_item, main_bluray_dir in main_items:
_append_item(planned_items, seen_file_keys, main_item, main_bluray_dir)
if main_bluray_dir or not self.__is_media_file(main_item):
continue
main_path = Path(main_item.path)
main_download_history = self._resolve_download_history(
downloadhis=download_history_oper,
file_path=main_path,
bluray_dir=main_bluray_dir,
download_hash=download_hash,
)
subscribe_custom_words = _get_subscribe_custom_words(
main_download_history
)
main_meta = _build_file_meta(
main_path,
custom_word_list=subscribe_custom_words,
)
if not main_meta:
continue
dir_key = self.__get_file_parent_key(main_item)
main_fileitems = main_items_by_dir.get(dir_key) or [main_item]
main_file_key = self.__get_file_key(main_item)
for extra_item, extra_bluray_dir in extra_items_by_dir.get(dir_key, []):
if self.__get_file_key(extra_item) in seen_file_keys:
continue
related_main_file_key = self.__get_related_main_file_key(
extra_fileitem=extra_item,
main_fileitems=main_fileitems,
)
if related_main_file_key:
if related_main_file_key == main_file_key:
if _append_item(
planned_items,
seen_file_keys,
extra_item,
extra_bluray_dir,
):
inherited_map[self.__get_file_key(extra_item)] = deepcopy(main_meta)
continue
if single_file_mode or not sync_extra_files:
continue
extra_meta = _get_cached_extra_meta(
Path(extra_item.path),
subscribe_custom_words,
)
if not self.__is_same_media_meta(main_meta, extra_meta):
continue
if _append_item(
planned_items,
seen_file_keys,
extra_item,
extra_bluray_dir,
):
inherited_map[self.__get_file_key(extra_item)] = deepcopy(extra_meta)
for item, is_bluray_dir in items:
_append_item(planned_items, seen_file_keys, item, is_bluray_dir)
return planned_items, inherited_map
try:
# 获取经过筛选后的待整理文件项列表
file_items = self.__get_trans_fileitems(fileitem, predicate=_filter)
@@ -2571,97 +2839,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")
return False, f"{fileitem.name} 没有找到可整理的媒体文件"
if sync_extra_files:
# 单文件和目录整理都按“主视频 -> 同媒体附加文件”补齐;目录场景会逐个视频处理。
extra_file_cache: Dict[Tuple[str, str], List[FileItem]] = {}
main_file_items: List[Tuple[FileItem, bool]] = []
for candidate_item, candidate_bluray_dir in file_items:
if not candidate_item:
continue
if candidate_bluray_dir or self.__is_media_file(candidate_item):
main_file_items.append((candidate_item, candidate_bluray_dir))
continue
if (
candidate_item.type == "file"
and (
self.__is_subtitle_file(candidate_item)
or self.__is_audio_file(candidate_item)
)
):
# 目录递归阶段已拿到附加文件时,直接填入父目录缓存,避免后续重复列目录。
extra_file_cache.setdefault(
self.__get_file_parent_key(candidate_item), []
).append(candidate_item)
if main_file_items:
file_items = list(main_file_items)
seen_file_keys = {
(item.storage, item.path)
for item, _ in file_items
if item and item.path
}
downloadhis = DownloadHistoryOper()
extra_meta_cache: Dict[
Tuple[str, Tuple[str, ...]], Optional[MetaBase]
] = {}
def _get_cached_extra_meta(
extra_path: Path, custom_words_key: Tuple[str, ...]
) -> Optional[MetaBase]:
"""
同一个父目录下的附加文件只解析一次,多个主视频只做内存匹配。
"""
cache_key = (extra_path.as_posix(), custom_words_key)
if cache_key not in extra_meta_cache:
extra_meta_cache[cache_key] = _build_path_meta(
extra_path,
custom_word_list=list(custom_words_key) or None,
)
return extra_meta_cache[cache_key]
def _build_extra_meta_factory(
custom_word_list: Optional[List[str]],
) -> Callable[[Path], Optional[MetaBase]]:
"""
将可变识别词列表转成不可变缓存键,避免闭包默认参数持有可变对象。
"""
custom_words_key = tuple(custom_word_list or [])
def _extra_meta_factory(extra_path: Path) -> Optional[MetaBase]:
return _get_cached_extra_meta(extra_path, custom_words_key)
return _extra_meta_factory
for main_item, main_bluray_dir in list(main_file_items):
if main_bluray_dir or not self.__is_media_file(main_item):
continue
main_path = Path(main_item.path)
main_download_history = self._resolve_download_history(
downloadhis=downloadhis,
file_path=main_path,
bluray_dir=main_bluray_dir,
download_hash=download_hash,
)
subscribe_custom_words = _get_subscribe_custom_words(
main_download_history
)
main_meta = _build_file_meta(
main_path, custom_word_list=subscribe_custom_words
)
extra_items = self.__get_sync_extra_fileitems(
main_fileitem=main_item,
main_meta=main_meta,
meta_factory=_build_extra_meta_factory(subscribe_custom_words),
predicate=_filter,
extra_cache=extra_file_cache,
)
for extra_item, extra_bluray_dir in extra_items:
extra_key = (extra_item.storage, extra_item.path)
if extra_key in seen_file_keys:
continue
file_items.append((extra_item, extra_bluray_dir))
seen_file_keys.add(extra_key)
file_items, inherited_meta_map = _plan_file_items(file_items)
planned_file_count = len(file_items)
if preview:
@@ -2713,10 +2891,16 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
if not meta:
# 文件元数据(优先使用订阅识别词)
file_meta = _build_file_meta(
file_path,
custom_word_list=_get_subscribe_custom_words(download_history),
inherited_meta = inherited_meta_map.get(
self.__get_file_key(file_item)
)
if inherited_meta:
file_meta = deepcopy(inherited_meta)
else:
file_meta = _build_file_meta(
file_path,
custom_word_list=_get_subscribe_custom_words(download_history),
)
else:
file_meta = _build_file_meta(file_path)

View File

@@ -616,7 +616,7 @@ class TransferJobManagerTest(unittest.TestCase):
self.assertEqual([("abc123", "qbittorrent")], completed)
self.assertEqual([], chain.jobview.list_jobs())
def test_do_transfer_does_not_sync_extra_files_by_default(self):
def test_do_transfer_syncs_same_stem_extra_files_by_default(self):
chain = make_transfer_chain()
planned = []
main_fileitem = make_fileitem(
@@ -680,7 +680,13 @@ class TransferJobManagerTest(unittest.TestCase):
self.assertTrue(state)
self.assertEqual("", errmsg)
self.assertEqual([main_fileitem.path], planned)
self.assertEqual(
[
main_fileitem.path,
subtitle_fileitem.path,
],
planned,
)
def test_manual_transfer_enables_sync_extra_files(self):
chain = make_transfer_chain()
@@ -956,10 +962,11 @@ class TransferJobManagerTest(unittest.TestCase):
self.assertEqual(
[
(main_ep1_fileitem.path, 1),
(main_ep2_fileitem.path, 2),
(ep1_subtitle_fileitem.path, 1),
(ep1_audio_fileitem.path, 1),
(main_ep2_fileitem.path, 2),
(ep2_subtitle_fileitem.path, 2),
(other_title_fileitem.path, 1),
],
planned,
)

View File

@@ -0,0 +1,358 @@
from pathlib import Path
from types import SimpleNamespace
from app.chain.transfer import JobManager, TransferChain
from app.core.config import settings
from app.schemas import FileItem
from app.schemas.types import MediaType
class FakeMeta:
"""
构造整理链路所需的最小剧集元数据。
"""
def __init__(self, episode: int):
self.name = "Test Show"
self.title = f"Test Show S01E{episode:02d}"
self.year = "2026"
self.type = MediaType.TV
self.begin_season = 1
self.end_season = None
self.total_season = 1
self.begin_episode = episode
self.end_episode = None
self.total_episode = 1
self.part = None
@property
def episode_list(self) -> list[int]:
"""
返回当前文件覆盖的集数列表。
"""
return [self.begin_episode]
def make_transfer_chain() -> TransferChain:
"""
构造不启动后台线程的整理链实例。
"""
chain = object.__new__(TransferChain)
chain.jobview = JobManager()
chain._media_exts = settings.RMT_MEDIAEXT
chain._subtitle_exts = settings.RMT_SUBEXT
chain._audio_exts = settings.RMT_AUDIOEXT
chain._allowed_exts = (
chain._media_exts + chain._audio_exts + chain._subtitle_exts
)
chain._success_target_files = {}
chain._scrape_batches = {}
return chain
def make_fileitem(path: str) -> FileItem:
"""
根据路径构造文件项。
"""
file_path = Path(path)
return FileItem(
storage="local",
path=file_path.as_posix(),
type="file",
name=file_path.name,
basename=file_path.stem,
extension=file_path.suffix.lstrip("."),
size=1024,
)
def test_sync_extra_subtitle_inherits_matching_video_episode(monkeypatch):
"""
同名随片字幕应继承对应视频集数,避免字幕自身识别错误时全部落到第一集。
"""
chain = make_transfer_chain()
planned = []
main_ep1_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Test.Show.S01E01.2026.mkv"
)
main_ep2_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Test.Show.S01E02.2026.mkv"
)
ep1_subtitle_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Test.Show.S01E01.2026.zh-cn.srt"
)
ep2_subtitle_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Test.Show.S01E02.2026.zh-cn.srt"
)
parent_fileitem = FileItem(
storage="local",
path="/downloads/Test Show (2026)/",
type="dir",
name="Test Show (2026)",
)
monkeypatch.setattr(
chain,
"_TransferChain__get_trans_fileitems",
lambda fileitem, predicate: [
(main_ep1_fileitem, False),
(main_ep2_fileitem, False),
(ep1_subtitle_fileitem, False),
(ep2_subtitle_fileitem, False),
],
)
monkeypatch.setattr(chain, "_TransferChain__put_to_jobview", lambda task: True)
monkeypatch.setattr(
chain,
"_TransferChain__register_scrape_batch_task",
lambda task: None,
)
monkeypatch.setattr(
chain,
"_TransferChain__close_scrape_batch",
lambda batch_id: None,
)
def fake_handle_transfer(task, callback=None):
"""
记录实际创建的整理任务集数。
"""
planned.append((task.fileitem.path, task.meta.begin_episode))
return True, ""
def fake_meta_info_path(path, custom_words=None):
"""
模拟字幕文件自身会被误识别为第一集的场景。
"""
file_name = Path(path).name
if file_name.endswith(".mkv") and "S01E02" in file_name:
return FakeMeta(2)
return FakeMeta(1)
monkeypatch.setattr(chain, "_TransferChain__handle_transfer", fake_handle_transfer)
monkeypatch.setattr(
"app.chain.transfer.TransferHistoryOper",
lambda: SimpleNamespace(get_by_src=lambda src, storage=None: None),
)
monkeypatch.setattr(
"app.chain.transfer.DownloadHistoryOper",
lambda: SimpleNamespace(
get_by_hash=lambda download_hash: None,
get_file_by_fullpath=lambda fullpath: None,
get_files_by_savepath=lambda savepath: [],
get_by_path=lambda path: None,
),
)
monkeypatch.setattr(
"app.chain.transfer.SystemConfigOper",
lambda: SimpleNamespace(get=lambda key: None),
)
monkeypatch.setattr("app.chain.transfer.MetaInfoPath", fake_meta_info_path)
state, errmsg = TransferChain.do_transfer(
chain,
fileitem=parent_fileitem,
background=False,
sync_extra_files=True,
)
assert state is True
assert errmsg == ""
assert planned == [
(main_ep1_fileitem.path, 1),
(ep1_subtitle_fileitem.path, 1),
(main_ep2_fileitem.path, 2),
(ep2_subtitle_fileitem.path, 2),
]
def test_single_subtitle_transfer_reuses_same_name_video_episode(monkeypatch):
"""
单独整理同名字幕时应复用主视频识别结果,不受 sync_extra_files 开关影响。
"""
chain = make_transfer_chain()
planned = []
main_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Test.Show.S01E02.2026.mkv"
)
subtitle_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Test.Show.S01E02.2026.zh-cn.srt"
)
parent_fileitem = FileItem(
storage="local",
path="/downloads/Test Show (2026)/",
type="dir",
name="Test Show (2026)",
)
monkeypatch.setattr(
chain,
"_TransferChain__get_trans_fileitems",
lambda fileitem, predicate: [(subtitle_fileitem, False)],
)
monkeypatch.setattr(chain, "_TransferChain__put_to_jobview", lambda task: True)
monkeypatch.setattr(
chain,
"_TransferChain__register_scrape_batch_task",
lambda task: None,
)
monkeypatch.setattr(
chain,
"_TransferChain__close_scrape_batch",
lambda batch_id: None,
)
def fake_handle_transfer(task, callback=None):
"""
记录单独字幕整理时使用的集数。
"""
planned.append((task.fileitem.path, task.meta.begin_episode))
return True, ""
def fake_meta_info_path(path, custom_words=None):
"""
模拟字幕自身会被误识别为第一集,主视频可正确识别为第二集。
"""
file_name = Path(path).name
if file_name.endswith(".mkv"):
return FakeMeta(2)
return FakeMeta(1)
monkeypatch.setattr(chain, "_TransferChain__handle_transfer", fake_handle_transfer)
monkeypatch.setattr(
"app.chain.transfer.TransferHistoryOper",
lambda: SimpleNamespace(get_by_src=lambda src, storage=None: None),
)
monkeypatch.setattr(
"app.chain.transfer.DownloadHistoryOper",
lambda: SimpleNamespace(
get_by_hash=lambda download_hash: None,
get_file_by_fullpath=lambda fullpath: None,
get_files_by_savepath=lambda savepath: [],
get_by_path=lambda path: None,
),
)
monkeypatch.setattr(
"app.chain.transfer.SystemConfigOper",
lambda: SimpleNamespace(get=lambda key: None),
)
monkeypatch.setattr(
"app.chain.transfer.StorageChain",
lambda: SimpleNamespace(
get_parent_item=lambda fileitem: parent_fileitem,
list_files=lambda fileitem, recursion=False: [
main_fileitem,
subtitle_fileitem,
],
),
)
monkeypatch.setattr("app.chain.transfer.MetaInfoPath", fake_meta_info_path)
state, errmsg = TransferChain.do_transfer(
chain,
fileitem=subtitle_fileitem,
background=False,
sync_extra_files=False,
)
assert state is True
assert errmsg == ""
assert planned == [(subtitle_fileitem.path, 2)]
def test_single_video_transfer_lists_parent_once_for_same_name_extra(monkeypatch):
"""
单文件视频整理只读取一次父目录,并只附带同名附加文件。
"""
chain = make_transfer_chain()
planned = []
list_files_calls = []
main_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Test.Show.S01E02.2026.mkv"
)
subtitle_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Test.Show.S01E02.2026.zh-cn.srt"
)
other_subtitle_fileitem = make_fileitem(
"/downloads/Test Show (2026)/Other.Show.S01E02.2026.zh-cn.srt"
)
parent_fileitem = FileItem(
storage="local",
path="/downloads/Test Show (2026)/",
type="dir",
name="Test Show (2026)",
)
monkeypatch.setattr(
chain,
"_TransferChain__get_trans_fileitems",
lambda fileitem, predicate: [(main_fileitem, False)],
)
monkeypatch.setattr(chain, "_TransferChain__put_to_jobview", lambda task: True)
monkeypatch.setattr(
chain,
"_TransferChain__register_scrape_batch_task",
lambda task: None,
)
monkeypatch.setattr(
chain,
"_TransferChain__close_scrape_batch",
lambda batch_id: None,
)
def fake_handle_transfer(task, callback=None):
"""
记录单视频整理时实际附带的文件。
"""
planned.append(task.fileitem.path)
return True, ""
def fake_list_files(fileitem, recursion=False):
"""
记录父目录读取次数。
"""
list_files_calls.append((fileitem.path, recursion))
return [
main_fileitem,
subtitle_fileitem,
other_subtitle_fileitem,
]
monkeypatch.setattr(chain, "_TransferChain__handle_transfer", fake_handle_transfer)
monkeypatch.setattr(
"app.chain.transfer.TransferHistoryOper",
lambda: SimpleNamespace(get_by_src=lambda src, storage=None: None),
)
monkeypatch.setattr(
"app.chain.transfer.DownloadHistoryOper",
lambda: SimpleNamespace(
get_by_hash=lambda download_hash: None,
get_file_by_fullpath=lambda fullpath: None,
get_files_by_savepath=lambda savepath: [],
get_by_path=lambda path: None,
),
)
monkeypatch.setattr(
"app.chain.transfer.SystemConfigOper",
lambda: SimpleNamespace(get=lambda key: None),
)
monkeypatch.setattr(
"app.chain.transfer.StorageChain",
lambda: SimpleNamespace(
get_parent_item=lambda fileitem: parent_fileitem,
list_files=fake_list_files,
),
)
monkeypatch.setattr("app.chain.transfer.MetaInfoPath", lambda path, custom_words=None: FakeMeta(2))
state, errmsg = TransferChain.do_transfer(
chain,
fileitem=main_fileitem,
background=False,
sync_extra_files=False,
)
assert state is True
assert errmsg == ""
assert planned == [main_fileitem.path, subtitle_fileitem.path]
assert list_files_calls == [(parent_fileitem.path, False)]