feat: 在构造待整理文件列表时引入过滤逻辑以简化后续处理

This commit is contained in:
景大侠
2026-02-06 17:05:12 +08:00
parent 32afe6445f
commit 40395b2999
3 changed files with 99 additions and 49 deletions

View File

@@ -156,7 +156,7 @@ class StorageChain(ChainBase):
""" """
判断是否包含蓝光必备的文件夹 判断是否包含蓝光必备的文件夹
""" """
required_files = ("BDMV", "CERTIFICATE") required_files = {"BDMV", "CERTIFICATE"}
return any( return any(
item.type == "dir" and item.name in required_files item.type == "dir" and item.name in required_files
for item in fileitems or [] for item in fileitems or []

View File

@@ -29,6 +29,7 @@ from app.log import logger
from app.schemas import StorageOperSelectionEventData from app.schemas import StorageOperSelectionEventData
from app.schemas import TransferInfo, Notification, EpisodeFormat, FileItem, TransferDirectoryConf, \ from app.schemas import TransferInfo, Notification, EpisodeFormat, FileItem, TransferDirectoryConf, \
TransferTask, TransferQueue, TransferJob, TransferJobTask TransferTask, TransferQueue, TransferJob, TransferJobTask
from app.schemas.exception import OperationInterrupted
from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, NotificationType, MessageChannel, \ from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, NotificationType, MessageChannel, \
SystemConfigKey, ChainEventType, ContentType SystemConfigKey, ChainEventType, ContentType
from app.utils.mixins import ConfigReloadMixin from app.utils.mixins import ConfigReloadMixin
@@ -345,11 +346,13 @@ class JobManager:
检查指定种子的所有任务是否都已完成 检查指定种子的所有任务是否都已完成
""" """
with job_lock: with job_lock:
for job in self._job_view.values(): if any(
for task in job.tasks: task.state not in {"completed", "failed"}
if task.download_hash == download_hash: for job in self._job_view.values()
if task.state not in ["completed", "failed"]: for task in job.tasks
return False if task.download_hash == download_hash
):
return False
return True return True
def is_torrent_success(self, download_hash: str) -> bool: def is_torrent_success(self, download_hash: str) -> bool:
@@ -357,11 +360,13 @@ class JobManager:
检查指定种子的所有任务是否都已成功 检查指定种子的所有任务是否都已成功
""" """
with job_lock: with job_lock:
for job in self._job_view.values(): if any(
for task in job.tasks: task.state != "completed"
if task.download_hash == download_hash: for job in self._job_view.values()
if task.state not in ["completed"]: for task in job.tasks
return False if task.download_hash == download_hash
):
return False
return True return True
def has_tasks(self, meta: MetaBase, mediainfo: Optional[MediaInfo] = None, season: Optional[int] = None) -> bool: def has_tasks(self, meta: MetaBase, mediainfo: Optional[MediaInfo] = None, season: Optional[int] = None) -> bool:
@@ -947,7 +952,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
# 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title # 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title
if not settings.SCRAP_FOLLOW_TMDB: if not settings.SCRAP_FOLLOW_TMDB:
transfer_history = transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id, transfer_history = transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id,
mtype=mediainfo.type.value) mtype=mediainfo.type.value)
if transfer_history and mediainfo.title != transfer_history.title: if transfer_history and mediainfo.title != transfer_history.title:
mediainfo.title = transfer_history.title mediainfo.title = transfer_history.title
@@ -1169,14 +1174,29 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
return True return True
def __get_trans_fileitems( def __get_trans_fileitems(
self, fileitem: FileItem, check: bool = True self,
fileitem: FileItem,
predicate: Optional[Callable[[FileItem, bool], bool]],
verify_file_exists: bool = True,
) -> List[Tuple[FileItem, bool]]: ) -> List[Tuple[FileItem, bool]]:
""" """
获取整理目录或文件列表 获取整理文件列表
:param fileitem: 文件项 :param fileitem: 文件项
:param check: 检查文件是否存在默认为True :param predicate: 用于筛选目录或文件项
该函数接收两个参数:
- `file_item`: 需要判断的文件项(类型为 `FileItem`
- `is_bluray_dir`: 表示该项是否为蓝光原盘目录(布尔值)
函数应返回 `True` 表示保留该项,`False` 表示过滤掉
若 `predicate` 为 `None`,则默认保留所有项
:param verify_file_exists: 验证目录或文件是否存在,默认值为 `True`
""" """
if global_vars.is_system_stopped:
raise OperationInterrupted()
storagechain = StorageChain() storagechain = StorageChain()
def __is_bluray_sub(_path: str) -> bool: def __is_bluray_sub(_path: str) -> bool:
@@ -1194,7 +1214,12 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
return storagechain.get_file_item(storage=_storage, path=p.parent) return storagechain.get_file_item(storage=_storage, path=p.parent)
return None return None
if check: def _apply_predicate(file_item: FileItem, is_bluray_dir: bool) -> List[Tuple[FileItem, bool]]:
if predicate is None or predicate(file_item, is_bluray_dir):
return [(file_item, is_bluray_dir)]
return []
if verify_file_exists:
latest_fileitem = storagechain.get_item(fileitem) latest_fileitem = storagechain.get_item(fileitem)
if not latest_fileitem: if not latest_fileitem:
logger.warn(f"目录或文件不存在:{fileitem.path}") logger.warn(f"目录或文件不存在:{fileitem.path}")
@@ -1204,28 +1229,30 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
# 是否蓝光原盘子目录或文件 # 是否蓝光原盘子目录或文件
if __is_bluray_sub(fileitem.path): if __is_bluray_sub(fileitem.path):
if dir_item := __get_bluray_dir(fileitem.storage, Path(fileitem.path)): if bluray_dir := __get_bluray_dir(fileitem.storage, Path(fileitem.path)):
# 返回该文件所在的原盘根目录 # 返回该文件所在的原盘根目录
return [(dir_item, True)] return _apply_predicate(bluray_dir, True)
# 单文件 # 单文件
if fileitem.type == "file": if fileitem.type == "file":
return [(fileitem, False)] return _apply_predicate(fileitem, False)
# 是否蓝光原盘根目录 # 是否蓝光原盘根目录
sub_items = storagechain.list_files(fileitem, recursion=False) or [] sub_items = storagechain.list_files(fileitem, recursion=False) or []
if storagechain.contains_bluray_subdirectories(sub_items): if storagechain.contains_bluray_subdirectories(sub_items):
# 当前目录是原盘根目录,不需要递归 # 当前目录是原盘根目录,不需要递归
return [(fileitem, True)] return _apply_predicate(fileitem, True)
# 不是原盘根目录 递归获取目录内需要整理的文件项列表 # 不是原盘根目录 递归获取目录内需要整理的文件项列表
return [ return [
item item
for sub_item in sub_items for sub_item in sub_items
for item in ( for item in (
self.__get_trans_fileitems(sub_item, check=False) self.__get_trans_fileitems(
sub_item, predicate, verify_file_exists=False
)
if sub_item.type == "dir" if sub_item.type == "dir"
else [(sub_item, False)] else _apply_predicate(sub_item, False)
) )
] ]
@@ -1275,22 +1302,47 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
transfer_exclude_words = SystemConfigOper().get(SystemConfigKey.TransferExcludeWords) transfer_exclude_words = SystemConfigOper().get(SystemConfigKey.TransferExcludeWords)
# 汇总错误信息 # 汇总错误信息
err_msgs: List[str] = [] err_msgs: List[str] = []
# 递归获取待整理的文件/目录列表
file_items = self.__get_trans_fileitems(fileitem)
if not file_items: def _filter(file_item: FileItem, is_bluray_dir: bool) -> bool:
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件") """
return False, f"{fileitem.name} 没有找到可整理的媒体文件" 过滤文件
# 有集自定义格式,过滤文件 :return: True 表示保留False 表示排除
if formaterHandler: """
file_items = [f for f in file_items if formaterHandler.match(f[0].name)] if continue_callback and not continue_callback():
raise OperationInterrupted()
# 有集自定义格式,过滤文件
if formaterHandler and not formaterHandler.match(file_item.name):
return False
# 过滤后缀和大小(蓝光目录、附加文件不过滤)
if (
not is_bluray_dir
and not self.__is_subtitle_file(file_item)
and not self.__is_audio_file(file_item)
):
if not self.__is_media_file(file_item):
return False
if not self.__is_allow_filesize(file_item, min_filesize):
return False
# 回收站及隐藏的文件不处理
if (
file_item.path.find("/@Recycle/") != -1
or file_item.path.find("/#recycle/") != -1
or file_item.path.find("/.") != -1
or file_item.path.find("/@eaDir") != -1
):
logger.debug(f"{file_item.path} 是回收站或隐藏的文件")
return False
# 整理屏蔽词不处理
if self._is_blocked_by_exclude_words(file_item.path, transfer_exclude_words):
return False
return True
# 过滤后缀和大小(蓝光目录、附加文件不过滤大小) try:
file_items = [f for f in file_items if f[1] or # 获取经过筛选后的待整理文件项列表
self.__is_subtitle_file(f[0]) or file_items = self.__get_trans_fileitems(fileitem, predicate=_filter)
self.__is_audio_file(f[0]) or except OperationInterrupted:
(self.__is_media_file(f[0]) and self.__is_allow_filesize(f[0], min_filesize))] return False, f"{fileitem.name} 已取消"
if not file_items: if not file_items:
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件") logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")
@@ -1303,21 +1355,10 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
try: try:
for file_item, bluray_dir in file_items: for file_item, bluray_dir in file_items:
if global_vars.is_system_stopped: if global_vars.is_system_stopped:
break raise OperationInterrupted()
if continue_callback and not continue_callback(): if continue_callback and not continue_callback():
break raise OperationInterrupted()
file_path = Path(file_item.path) file_path = Path(file_item.path)
# 回收站及隐藏的文件不处理
if file_item.path.find('/@Recycle/') != -1 \
or file_item.path.find('/#recycle/') != -1 \
or file_item.path.find('/.') != -1 \
or file_item.path.find('/@eaDir') != -1:
logger.debug(f"{file_item.path} 是回收站或隐藏的文件")
continue
# 整理屏蔽词不处理
if self._is_blocked_by_exclude_words(file_item.path, transfer_exclude_words):
continue
# 整理成功的不再处理 # 整理成功的不再处理
if not force: if not force:
@@ -1415,6 +1456,8 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
transfer_tasks.append(transfer_task) transfer_tasks.append(transfer_task)
else: else:
logger.debug(f"{file_path.name} 已在整理列表中,跳过") logger.debug(f"{file_path.name} 已在整理列表中,跳过")
except OperationInterrupted:
return False, f"{fileitem.name} 已取消"
finally: finally:
file_items.clear() file_items.clear()
del file_items del file_items

View File

@@ -29,3 +29,10 @@ class RateLimitExceededException(LimitException):
这个异常通常用于本地限流逻辑(例如 RateLimiter当系统检测到函数调用频率过高时触发限流并抛出该异常。 这个异常通常用于本地限流逻辑(例如 RateLimiter当系统检测到函数调用频率过高时触发限流并抛出该异常。
""" """
pass pass
class OperationInterrupted(KeyboardInterrupt):
"""
用于表示操作被中断
"""
pass