fix:只有媒体文件整完成才触发事件,以保持与历史一致

This commit is contained in:
jxxghp
2026-01-21 20:07:18 +08:00
parent 298a6ba8ab
commit e1b3e6ef01
2 changed files with 67 additions and 41 deletions

View File

@@ -4,7 +4,6 @@ import threading
import traceback
from copy import deepcopy
from pathlib import Path
from time import sleep
from typing import List, Optional, Tuple, Union, Dict, Callable
from app import schemas
@@ -367,10 +366,12 @@ class TransferChain(ChainBase, metaclass=Singleton):
def __init__(self):
super().__init__()
# 可处理的文件后缀(视频文件、字幕、音频文件)
self._allowed_exts = settings.RMT_MEDIAEXT + settings.RMT_SUBEXT + settings.RMT_AUDIOEXT
# 主要媒体文件后缀
self._media_exts = settings.RMT_MEDIAEXT
# 附加文件后缀
self._extra_exts = settings.RMT_SUBEXT + settings.RMT_AUDIOEXT
# 可处理的文件后缀(视频文件、字幕、音频文件)
self._allowed_exts = self._media_exts + self._extra_exts
# 待整理任务队列
self._queue = queue.Queue()
# 文件整理线程
@@ -402,6 +403,37 @@ class TransferChain(ChainBase, metaclass=Singleton):
daemon=True)
thread.start()
def __is_allowed_file(self, fileitem: FileItem) -> bool:
"""
判断是否允许的扩展名
"""
if not fileitem.extension:
return False
return True if f".{fileitem.extension.lower()}" in self._allowed_exts else False
def __is_extra_file(self, fileitem: FileItem) -> bool:
"""
判断是否额外的扩展名
"""
if not fileitem.extension:
return False
return True if f".{fileitem.extension.lower()}" in self._extra_exts else False
def __is_media_file(self, fileitem: FileItem) -> bool:
"""
判断是否为主要媒体文件
"""
if not fileitem.extension:
return False
return True if f".{fileitem.extension.lower()}" in self._media_exts else False
@staticmethod
def __is_allow_filesize(fileitem: FileItem, min_filesize: int) -> bool:
"""
判断是否满足最小文件大小
"""
return True if not min_filesize or (fileitem.size or 0) > min_filesize * 1024 * 1024 else False
def __default_callback(self, task: TransferTask,
transferinfo: TransferInfo, /) -> Tuple[bool, str]:
"""
@@ -434,7 +466,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
season_episode=se_str,
username=task.username)
# 刮削事件
if transferinfo.need_scrape:
if transferinfo.need_scrape and self.__is_media_file(task.fileitem):
self.eventmanager.send_event(EventType.MetadataScrape, {
'meta': task.meta,
'mediainfo': task.mediainfo,
@@ -459,6 +491,16 @@ class TransferChain(ChainBase, metaclass=Singleton):
mediainfo=task.mediainfo,
transferinfo=transferinfo
)
# 整理失败事件
if self.__is_media_file(task.fileitem):
self.eventmanager.send_event(EventType.TransferFailed, {
'fileitem': task.fileitem,
'meta': task.meta,
'mediainfo': task.mediainfo,
'transferinfo': transferinfo,
'downloader': task.downloader,
'download_hash': task.download_hash,
})
# 发送失败消息
self.post_message(Notification(
mtype=NotificationType.Manual,
@@ -492,14 +534,15 @@ class TransferChain(ChainBase, metaclass=Singleton):
)
# 整理完成事件
self.eventmanager.send_event(EventType.TransferComplete, {
'fileitem': task.fileitem,
'meta': task.meta,
'mediainfo': task.mediainfo,
'transferinfo': transferinfo,
'downloader': task.downloader,
'download_hash': task.download_hash,
})
if self.__is_media_file(task.fileitem):
self.eventmanager.send_event(EventType.TransferComplete, {
'fileitem': task.fileitem,
'meta': task.meta,
'mediainfo': task.mediainfo,
'transferinfo': transferinfo,
'downloader': task.downloader,
'download_hash': task.download_hash,
})
with task_lock:
# 登记转移成功文件清单
@@ -605,8 +648,9 @@ class TransferChain(ChainBase, metaclass=Singleton):
__process_msg = f"正在整理 {fileitem.name} ..."
logger.info(__process_msg)
with task_lock:
self._progress.update(value=(self._processed_num / self._total_num * 100) if self._total_num else 0,
text=__process_msg)
self._progress.update(
value=(self._processed_num / self._total_num * 100) if self._total_num else 0,
text=__process_msg)
# 整理
state, err_msg = self.__handle_transfer(task=task, callback=item.callback)
@@ -618,8 +662,9 @@ class TransferChain(ChainBase, metaclass=Singleton):
self._processed_num += 1
__process_msg = f"{fileitem.name} 整理完成"
logger.info(__process_msg)
self._progress.update(value=(self._processed_num / self._total_num * 100) if self._total_num else 100,
text=__process_msg)
self._progress.update(
value=(self._processed_num / self._total_num * 100) if self._total_num else 100,
text=__process_msg)
except Exception as e:
logger.error(f"{fileitem.name} 整理任务处理出现错误:{e} - {traceback.format_exc()}")
with task_lock:
@@ -916,7 +961,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
return True
def __get_trans_fileitems(
self, fileitem: FileItem, check: bool = True
self, fileitem: FileItem, check: bool = True
) -> List[Tuple[FileItem, bool]]:
"""
获取整理目录或文件列表
@@ -1009,25 +1054,6 @@ class TransferChain(ChainBase, metaclass=Singleton):
:param continue_callback: 继续处理回调
返回:成功标识,错误信息
"""
def __is_allowed_file(_ext: str) -> bool:
"""
判断是否允许的扩展名
"""
return True if f".{_ext.lower()}" in self._allowed_exts else False
def __is_extra_file(_ext: str) -> bool:
"""
判断是否额外的扩展名
"""
return True if f".{_ext.lower()}" in self._extra_exts else False
def __is_allow_filesize(_ext: str, _size: int, _min_filesize: int) -> bool:
"""
判断是否满足最小文件大小
"""
return True if not _min_filesize or _size > _min_filesize * 1024 * 1024 else False
# 是否全部成功
all_success = True
@@ -1054,10 +1080,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 过滤后缀和大小(蓝光目录、附加文件不过滤大小)
file_items = [f for f in file_items if f[1] or
__is_extra_file(f[0].extension) or
(__is_allowed_file(f[0].extension) and __is_allow_filesize(_ext=f[0].extension,
_size=f[0].size or 0,
_min_filesize=min_filesize))]
self.__is_extra_file(f[0]) or
(self.__is_allowed_file(f[0]) and self.__is_allow_filesize(f[0], min_filesize))]
if not file_items:
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")

View File

@@ -38,8 +38,10 @@ class EventType(Enum):
SiteUpdated = "site.updated"
# 站点已刷新
SiteRefreshed = "site.refreshed"
# 转移完成
# 整理完成
TransferComplete = "transfer.complete"
# 整理失败
TransferFailed = "transfer.failed"
# 下载已添加
DownloadAdded = "download.added"
# 删除历史记录