Merge pull request #5473 from cddjr/feat_transfer_files_filter

This commit is contained in:
jxxghp
2026-02-06 21:04:52 +08:00
committed by GitHub
4 changed files with 100 additions and 50 deletions

View File

@@ -156,7 +156,7 @@ class StorageChain(ChainBase):
"""
判断是否包含蓝光必备的文件夹
"""
required_files = ("BDMV", "CERTIFICATE")
required_files = {"BDMV", "CERTIFICATE"}
return any(
item.type == "dir" and item.name in required_files
for item in fileitems or []

View File

@@ -29,6 +29,7 @@ from app.log import logger
from app.schemas import StorageOperSelectionEventData
from app.schemas import TransferInfo, Notification, EpisodeFormat, FileItem, TransferDirectoryConf, \
TransferTask, TransferQueue, TransferJob, TransferJobTask
from app.schemas.exception import OperationInterrupted
from app.schemas.types import TorrentStatus, EventType, MediaType, ProgressKey, NotificationType, MessageChannel, \
SystemConfigKey, ChainEventType, ContentType
from app.utils.mixins import ConfigReloadMixin
@@ -345,11 +346,13 @@ class JobManager:
检查指定种子的所有任务是否都已完成
"""
with job_lock:
for job in self._job_view.values():
for task in job.tasks:
if task.download_hash == download_hash:
if task.state not in ["completed", "failed"]:
return False
if any(
task.state not in {"completed", "failed"}
for job in self._job_view.values()
for task in job.tasks
if task.download_hash == download_hash
):
return False
return True
def is_torrent_success(self, download_hash: str) -> bool:
@@ -357,11 +360,13 @@ class JobManager:
检查指定种子的所有任务是否都已成功
"""
with job_lock:
for job in self._job_view.values():
for task in job.tasks:
if task.download_hash == download_hash:
if task.state not in ["completed"]:
return False
if any(
task.state != "completed"
for job in self._job_view.values()
for task in job.tasks
if task.download_hash == download_hash
):
return False
return True
def has_tasks(self, meta: MetaBase, mediainfo: Optional[MediaInfo] = None, season: Optional[int] = None) -> bool:
@@ -947,7 +952,7 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
# 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title
if not settings.SCRAP_FOLLOW_TMDB:
transfer_history = transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id,
transfer_history = transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id,
mtype=mediainfo.type.value)
if transfer_history and mediainfo.title != transfer_history.title:
mediainfo.title = transfer_history.title
@@ -1169,14 +1174,29 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
return True
def __get_trans_fileitems(
self, fileitem: FileItem, check: bool = True
self,
fileitem: FileItem,
predicate: Optional[Callable[[FileItem, bool], bool]],
verify_file_exists: bool = True,
) -> List[Tuple[FileItem, bool]]:
"""
获取整理目录或文件列表
获取整理文件列表
:param fileitem: 文件项
:param check: 检查文件是否存在默认为True
:param fileitem: 文件项
:param predicate: 用于筛选目录或文件项
该函数接收两个参数:
- `file_item`: 需要判断的文件项(类型为 `FileItem`
- `is_bluray_dir`: 表示该项是否为蓝光原盘目录(布尔值)
函数应返回 `True` 表示保留该项,`False` 表示过滤掉
若 `predicate` 为 `None`,则默认保留所有项
:param verify_file_exists: 验证目录或文件是否存在,默认值为 `True`
"""
if global_vars.is_system_stopped:
raise OperationInterrupted()
storagechain = StorageChain()
def __is_bluray_sub(_path: str) -> bool:
@@ -1194,7 +1214,12 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
return storagechain.get_file_item(storage=_storage, path=p.parent)
return None
if check:
def _apply_predicate(file_item: FileItem, is_bluray_dir: bool) -> List[Tuple[FileItem, bool]]:
if predicate is None or predicate(file_item, is_bluray_dir):
return [(file_item, is_bluray_dir)]
return []
if verify_file_exists:
latest_fileitem = storagechain.get_item(fileitem)
if not latest_fileitem:
logger.warn(f"目录或文件不存在:{fileitem.path}")
@@ -1204,28 +1229,30 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
# 是否蓝光原盘子目录或文件
if __is_bluray_sub(fileitem.path):
if dir_item := __get_bluray_dir(fileitem.storage, Path(fileitem.path)):
if bluray_dir := __get_bluray_dir(fileitem.storage, Path(fileitem.path)):
# 返回该文件所在的原盘根目录
return [(dir_item, True)]
return _apply_predicate(bluray_dir, True)
# 单文件
if fileitem.type == "file":
return [(fileitem, False)]
return _apply_predicate(fileitem, False)
# 是否蓝光原盘根目录
sub_items = storagechain.list_files(fileitem, recursion=False) or []
if storagechain.contains_bluray_subdirectories(sub_items):
# 当前目录是原盘根目录,不需要递归
return [(fileitem, True)]
return _apply_predicate(fileitem, True)
# 不是原盘根目录 递归获取目录内需要整理的文件项列表
return [
item
for sub_item in sub_items
for item in (
self.__get_trans_fileitems(sub_item, check=False)
self.__get_trans_fileitems(
sub_item, predicate, verify_file_exists=False
)
if sub_item.type == "dir"
else [(sub_item, False)]
else _apply_predicate(sub_item, False)
)
]
@@ -1275,22 +1302,47 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
transfer_exclude_words = SystemConfigOper().get(SystemConfigKey.TransferExcludeWords)
# 汇总错误信息
err_msgs: List[str] = []
# 递归获取待整理的文件/目录列表
file_items = self.__get_trans_fileitems(fileitem)
if not file_items:
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")
return False, f"{fileitem.name} 没有找到可整理的媒体文件"
def _filter(file_item: FileItem, is_bluray_dir: bool) -> bool:
"""
过滤文件
# 有集自定义格式,过滤文件
if formaterHandler:
file_items = [f for f in file_items if formaterHandler.match(f[0].name)]
:return: True 表示保留False 表示排除
"""
if continue_callback and not continue_callback():
raise OperationInterrupted()
# 有集自定义格式,过滤文件
if formaterHandler and not formaterHandler.match(file_item.name):
return False
# 过滤后缀和大小(蓝光目录、附加文件不过滤)
if (
not is_bluray_dir
and not self.__is_subtitle_file(file_item)
and not self.__is_audio_file(file_item)
):
if not self.__is_media_file(file_item):
return False
if not self.__is_allow_filesize(file_item, min_filesize):
return False
# 回收站及隐藏的文件不处理
if (
file_item.path.find("/@Recycle/") != -1
or file_item.path.find("/#recycle/") != -1
or file_item.path.find("/.") != -1
or file_item.path.find("/@eaDir") != -1
):
logger.debug(f"{file_item.path} 是回收站或隐藏的文件")
return False
# 整理屏蔽词不处理
if self._is_blocked_by_exclude_words(file_item.path, transfer_exclude_words):
return False
return True
# 过滤后缀和大小(蓝光目录、附加文件不过滤大小)
file_items = [f for f in file_items if f[1] or
self.__is_subtitle_file(f[0]) or
self.__is_audio_file(f[0]) or
(self.__is_media_file(f[0]) and self.__is_allow_filesize(f[0], min_filesize))]
try:
# 获取经过筛选后的待整理文件项列表
file_items = self.__get_trans_fileitems(fileitem, predicate=_filter)
except OperationInterrupted:
return False, f"{fileitem.name} 已取消"
if not file_items:
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")
@@ -1303,21 +1355,10 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
try:
for file_item, bluray_dir in file_items:
if global_vars.is_system_stopped:
break
raise OperationInterrupted()
if continue_callback and not continue_callback():
break
raise OperationInterrupted()
file_path = Path(file_item.path)
# 回收站及隐藏的文件不处理
if file_item.path.find('/@Recycle/') != -1 \
or file_item.path.find('/#recycle/') != -1 \
or file_item.path.find('/.') != -1 \
or file_item.path.find('/@eaDir') != -1:
logger.debug(f"{file_item.path} 是回收站或隐藏的文件")
continue
# 整理屏蔽词不处理
if self._is_blocked_by_exclude_words(file_item.path, transfer_exclude_words):
continue
# 整理成功的不再处理
if not force:
@@ -1415,6 +1456,8 @@ class TransferChain(ChainBase, ConfigReloadMixin, metaclass=Singleton):
transfer_tasks.append(transfer_task)
else:
logger.debug(f"{file_path.name} 已在整理列表中,跳过")
except OperationInterrupted:
return False, f"{fileitem.name} 已取消"
finally:
file_items.clear()
del file_items

View File

@@ -125,7 +125,7 @@ class TransferHistoryOper(DbOper):
"""
新增转移成功历史记录
"""
self.add_force(
return self.add_force(
src=fileitem.path,
src_storage=fileitem.storage,
src_fileitem=fileitem.model_dump(),

View File

@@ -29,3 +29,10 @@ class RateLimitExceededException(LimitException):
这个异常通常用于本地限流逻辑(例如 RateLimiter当系统检测到函数调用频率过高时触发限流并抛出该异常。
"""
pass
class OperationInterrupted(KeyboardInterrupt):
"""
用于表示操作被中断
"""
pass