feat: 后台整理队列

This commit is contained in:
jxxghp
2024-12-24 13:14:17 +08:00
parent c9949581ef
commit 48c289edf2
2 changed files with 261 additions and 87 deletions

View File

@@ -30,7 +30,164 @@ from app.utils.singleton import Singleton
from app.utils.string import StringUtils
downloader_lock = threading.Lock()
tree_lock = threading.Lock()
job_lock = threading.Lock()
class JobManager:
"""
作业管理器
"""
# 整理中的作业
"""
{
meidaid: [{
"task": TransferTask,
"state": "running" | "waiting" | "failed" | "completed",
}]
}
"""
_job_view: Dict[Tuple, List[Dict[str, Any]]] = {}
# 汇总季集清单
_season_episodes: Dict[Tuple, List[int]] = {}
def __init__(self):
self._job_view = {}
self._season_episodes = {}
@staticmethod
def __get_id(task: TransferTask = None, media: MediaInfo = None, season: int = None) -> Tuple:
"""
获取作业ID
"""
if task:
return task.mediainfo.tmdb_id or task.mediainfo.douban_id, task.meta.begin_season
else:
return media.tmdb_id or media.douban_id, season
def add_task(self, task: TransferTask):
"""
添加整理任务
"""
if not task or not task.mediainfo or not task.meta:
return
with job_lock:
__mediaid__ = self.__get_id(task)
if __mediaid__ not in self._job_view:
self._job_view[__mediaid__] = []
# 添加任务
self._job_view[__mediaid__].append({
"task": task,
"state": "waiting"
})
# 添加季集信息
if self._season_episodes.get(__mediaid__):
self._season_episodes[__mediaid__].extend(task.meta.episode_list)
self._season_episodes[__mediaid__] = list(set(self._season_episodes[__mediaid__]))
else:
self._season_episodes[__mediaid__] = task.meta.episode_list
def running_task(self, task: TransferTask):
"""
任务运行中
"""
if not task or not task.mediainfo or not task.meta:
return
with job_lock:
__mediaid__ = self.__get_id(task)
if __mediaid__ not in self._job_view:
return
# 更新状态
for job in self._job_view[__mediaid__]:
if job["task"] == task:
job["state"] = "running"
break
def finish_task(self, task: TransferTask):
"""
任务完成
"""
if not task or not task.mediainfo or not task.meta:
return
with job_lock:
__mediaid__ = self.__get_id(task)
if __mediaid__ not in self._job_view:
return
# 更新状态
for job in self._job_view[__mediaid__]:
if job["task"] == task:
job["state"] = "completed"
break
def fail_task(self, task: TransferTask):
"""
任务失败
"""
if not task or not task.mediainfo or not task.meta:
return
with job_lock:
__mediaid__ = self.__get_id(task)
if __mediaid__ not in self._job_view:
return
# 更新状态
for job in self._job_view[__mediaid__]:
if job["task"] == task:
job["state"] = "failed"
break
# 移除剧集信息
if __mediaid__ in self._season_episodes:
self._season_episodes[__mediaid__] = list(
set(self._season_episodes[__mediaid__]) - set(task.meta.episode_list))
def remove_job(self, media: MediaInfo, season: int = None) -> Optional[List[Dict[str, Any]]]:
"""
移除作业
"""
if not media:
return
__mediaid__ = self.__get_id(media=media, season=season)
with job_lock:
# 移除作业
if __mediaid__ in self._job_view:
return self._job_view.pop(__mediaid__)
# 移除季集信息
if __mediaid__ in self._season_episodes:
self._season_episodes.pop(__mediaid__)
def is_finished(self, media: MediaInfo, season: int = None):
"""
任务检查某项任务是否全部为已完成状态
"""
if not media:
return False
__mediaid__ = self.__get_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return True
with job_lock:
return all([job["state"] in ["completed", "failed"] for job in self._job_view[__mediaid__]])
def total(self):
"""
获取所有task任务总数
"""
with job_lock:
return sum([len(jobs) for jobs in self._job_view.values()])
def list_jobs(self) -> Dict:
"""
获取任务列表
"""
return self._job_view
def season_episodes(self, media: MediaInfo, season: int = None) -> List[int]:
"""
获取季集清单
"""
if not media:
return []
__mediaid__ = self.__get_id(media=media, season=season)
with job_lock:
return self._season_episodes.get(__mediaid__) or []
class TransferChain(ChainBase, metaclass=Singleton):
@@ -44,9 +201,6 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 待整理任务队列
_queue = Queue()
# 整理中的目录树
_job_tree: Dict[str, Dict[str, Any]] = {}
# 文件整理线程
_transfer_thread = None
@@ -63,6 +217,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
self.storagechain = StorageChain()
self.systemconfig = SystemConfigOper()
self.directoryhelper = DirectoryHelper()
self.jobview = JobManager()
# 启动整理任务
self.__init()
@@ -104,6 +259,11 @@ class TransferChain(ChainBase, metaclass=Singleton):
image=task.mediainfo.get_message_image(),
link=settings.MP_DOMAIN('#/history')
))
# 任务失败
self.jobview.fail_task(task)
# 移除已完成的任务
if self.jobview.is_finished(task.mediainfo, task.meta.begin_season):
self.jobview.remove_job(task.mediainfo, task.meta.begin_season)
return
# 转移成功
@@ -130,8 +290,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
'download_hash': task.download_hash,
})
# TODO 整理完成一个媒体项时
if True:
# 整理完成一个媒体项时
if self.jobview.is_finished(task.mediainfo, task.meta.begin_season):
# 移动模式删除空目录
if transferinfo.transfer_type in ["move"]:
# 下载器hash
@@ -142,24 +302,24 @@ class TransferChain(ChainBase, metaclass=Singleton):
if task.fileitem:
self.storagechain.delete_media_file(task.fileitem, delete_self=False)
# TODO 发送通知
"""
# 发送通知
if transferinfo.need_notify:
se_str = None
if task.media.type == MediaType.TV:
se_str = f"{transfer_meta.season} {StringUtils.format_ep(season_episodes[mkey])}"
self.send_transfer_message(meta=transfer_meta,
mediainfo=media,
transferinfo=transfer_info,
if task.mediainfo.type == MediaType.TV:
se_str = f"{task.meta.season} {StringUtils.format_ep(self.jobview.season_episodes(task.mediainfo, task.meta.begin_season))}"
self.send_transfer_message(meta=task.meta,
mediainfo=task.mediainfo,
transferinfo=transferinfo,
season_episode=se_str)
# TODO 刮削事件
if scrape or transfer_info.need_scrape:
# 刮削事件
if transferinfo.need_scrape:
self.eventmanager.send_event(EventType.MetadataScrape, {
'meta': transfer_meta,
'mediainfo': media,
'fileitem': transfer_info.target_diritem
'meta': task.meta,
'mediainfo': task.mediainfo,
'fileitem': transferinfo.target_diritem
})
"""
# 清除作业
self.jobview.remove_job(task.mediainfo, task.meta.begin_season)
def put_to_queue(self, task: TransferTask, callback: Optional[Callable] = None):
"""
@@ -169,8 +329,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
"""
if not task:
return
# TODO 维护目录树
# 维护整理任务视图
self.jobview.add_task(task)
# 添加到队列
self._queue.put(TransferQueue(
task=task,
@@ -181,34 +341,32 @@ class TransferChain(ChainBase, metaclass=Singleton):
"""
处理队列
"""
# 队列开始标识
__queue_start = True
# 任务总数
total_num = 0
# 已处理总数
processed_num = 0
# 失败数量
fail_num = 0
# 跳过数量
skip_num = 0
# 队列开始标识
__queue_start = True
while not global_vars.is_system_stopped:
try:
item: TransferQueue = self._queue.get(timeout=self._transfer_interval)
if item:
if item and item.task:
self.jobview.running_task(item.task)
if __queue_start:
logger.info("开始整理队列处理...")
# 启动进度
self.progress.start(ProgressKey.FileTransfer)
# TODO 计算总数
total_num = 0
total_num = self.jobview.total()
self.progress.update(value=0,
text=f"开始整理队列处理,共 {total_num} 个文件或子目录 ...",
key=ProgressKey.FileTransfer)
# 队列已开始
__queue_start = False
# TODO 重新计算总数
total_num = 0
# 重新计算总数
total_num = self.jobview.total()
# 更新进度
__process_msg = f"正在整理 {processed_num + 1}/{total_num}{item.task.fileitem.name} ..."
logger.info(__process_msg)
@@ -216,11 +374,14 @@ class TransferChain(ChainBase, metaclass=Singleton):
text=__process_msg,
key=ProgressKey.FileTransfer)
# 整理
self.__handle_transfer(task=item.task, callback=item.callback)
if self.__handle_transfer(task=item.task, callback=item.callback):
processed_num += 1
else:
fail_num += 1
except queue.Empty:
if not __queue_start:
# 结束进度
__end_msg = f"整理队列处理完成,共整理 {total_num} 个文件,失败 {fail_num} 个,跳过 {skip_num}"
__end_msg = f"整理队列处理完成,共整理 {total_num} 个文件,失败 {fail_num}"
logger.info(__end_msg)
self.progress.update(value=100,
text=__end_msg,
@@ -230,19 +391,18 @@ class TransferChain(ChainBase, metaclass=Singleton):
total_num = 0
processed_num = 0
fail_num = 0
skip_num = 0
# 标记为新队列
__queue_start = True
continue
except Exception as e:
logger.error(f"整理队列处理出现错误:{e}")
def __handle_transfer(self, task: TransferTask, callback: Optional[Callable] = None):
def __handle_transfer(self, task: TransferTask, callback: Optional[Callable] = None) -> bool:
"""
处理整理任务
"""
if not task:
return
return False
# 执行整理
transferinfo: TransferInfo = self.transfer(fileitem=task.fileitem,
meta=task.meta,
@@ -257,10 +417,11 @@ class TransferChain(ChainBase, metaclass=Singleton):
library_category_folder=task.library_category_folder)
if not transferinfo:
logger.error("文件整理模块运行失败")
return
return False
# 回调,位置传参:任务、整理结果
if callback:
callback(task, transferinfo)
return callback(task, transferinfo)
return False
def recommend_name(self, meta: MetaBase, mediainfo: MediaInfo) -> Optional[str]:
"""
@@ -363,6 +524,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
def __get_trans_fileitems(self, fileitem: FileItem) -> List[Tuple[FileItem, bool]]:
"""
获取整理目录或文件列表
:param fileitem: 文件项
"""
def __is_bluray_dir(_fileitem: FileItem) -> bool:
@@ -397,16 +559,18 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 蓝光原盘子目录或文件
if __is_bluray_sub(fileitem.path):
return [(__get_bluray_dir(Path(fileitem.path)), True)]
# 蓝光原盘根目录
if __is_bluray_dir(fileitem):
return [(fileitem, True)]
dir_item = __get_bluray_dir(fileitem.storage, Path(fileitem.path))
if dir_item:
return [(dir_item, True)]
# 单文件
if fileitem.type == "file":
return [(fileitem, False)]
# 蓝光原盘根目录
if __is_bluray_dir(fileitem):
return [(fileitem, True)]
# 需要整理的文件项列表
trans_items = []
# 先检查当前目录的下级目录,以支持合集的情况
@@ -461,6 +625,18 @@ class TransferChain(ChainBase, metaclass=Singleton):
返回:成功标识,错误信息
"""
def __is_allow_extensions(_ext: str) -> bool:
"""
判断是否允许的扩展名
"""
return True if not self.all_exts or f".{_ext.lower()}" in self.all_exts else False
def __is_allow_filesize(_size: int, _min_filesize: int) -> bool:
"""
判断是否满足最小文件大小
"""
return True if not _min_filesize or _size > _min_filesize * 1024 * 1024 else False
# 自定义格式
formaterHandler = FormatParser(eformat=epformat.format,
details=epformat.detail,
@@ -469,19 +645,18 @@ class TransferChain(ChainBase, metaclass=Singleton):
# 整理屏蔽词
transfer_exclude_words = self.systemconfig.get(SystemConfigKey.TransferExcludeWords)
# 待整理文件列表
file_items: List[Tuple[FileItem, bool]] = []
# 汇总错误信息
err_msgs: List[str] = []
# 获取待整理路径清单
# 待整理目录或文件项
trans_items = self.__get_trans_fileitems(fileitem)
# 待整理的文件列表
file_items: List[Tuple[FileItem, bool]] = []
if not trans_items:
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")
return False, f"{fileitem.name} 没有找到可整理的媒体文件"
# 处理所有待整理目录或文件
# 转换为所有待处理的文件清单
for trans_item, bluray_dir in trans_items:
# 如果是目录且不是⼀蓝光原盘,获取所有文件并整理
if trans_item.type == "dir" and not bluray_dir:
@@ -491,21 +666,18 @@ class TransferChain(ChainBase, metaclass=Singleton):
else:
file_items.append((trans_item, bluray_dir))
# 有集自定义格式,过滤文件
if formaterHandler:
# 有集自定义格式,过滤文件
file_items = [f for f in file_items if formaterHandler.match(f[0].name)]
# 过滤后缀和大小
file_items = [f for f in file_items
if f[0].extension and (f".{f[0].extension.lower()}" in self.all_exts
and (not min_filesize or f[0].size > min_filesize * 1024 * 1024))]
if __is_allow_extensions(f[0].extension) and __is_allow_filesize(f[0].size, min_filesize)]
if not file_items:
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")
return False, f"{fileitem.name} 没有找到可整理的媒体文件"
# 更新总文件数
total_num = len(file_items)
logger.info(f"正在整理 {total_num} 个文件...")
logger.info(f"正在计划整理 {len(file_items)} 个文件...")
# 整理所有文件
for file_item, bluray_dir in file_items:
@@ -589,15 +761,16 @@ class TransferChain(ChainBase, metaclass=Singleton):
file_mediainfo: MediaInfo = self.recognize_media(mtype=MediaType(download_history.type),
tmdbid=download_history.tmdbid,
doubanid=download_history.doubanid)
if mediainfo:
if file_mediainfo:
# 更新自定义媒体类别
if download_history.media_category:
mediainfo.category = download_history.media_category
file_mediainfo.category = download_history.media_category
else:
# 识别媒体信息
file_mediainfo = self.mediachain.recognize_by_meta(file_meta)
# 更新媒体图片
self.obtain_images(mediainfo=file_mediainfo)
if file_mediainfo:
self.obtain_images(mediainfo=file_mediainfo)
else:
file_mediainfo = mediainfo
@@ -661,21 +834,24 @@ class TransferChain(ChainBase, metaclass=Singleton):
target_storage=target_storage)
# 后台整理
self.__handle_transfer(task=TransferTask(
fileitem=file_item,
meta=file_meta,
mediainfo=file_mediainfo,
target_directory=target_directory or dir_info,
target_storage=target_storage,
target_path=target_path,
transfer_type=transfer_type,
episodes_info=episodes_info,
scrape=scrape,
library_type_folder=library_type_folder,
library_category_folder=library_category_folder,
downloader=downloader,
download_hash=download_hash
))
self.put_to_queue(
task=TransferTask(
fileitem=file_item,
meta=file_meta,
mediainfo=file_mediainfo,
target_directory=target_directory or dir_info,
target_storage=target_storage,
target_path=target_path,
transfer_type=transfer_type,
episodes_info=episodes_info,
scrape=scrape,
library_type_folder=library_type_folder,
library_category_folder=library_category_folder,
downloader=downloader,
download_hash=download_hash
)
)
logger.info(f"{file_path.name} 已添加到整理队列")
return True, "\n".join(err_msgs)

View File

@@ -21,7 +21,7 @@ from app.db.transferhistory_oper import TransferHistoryOper
from app.helper.directory import DirectoryHelper
from app.helper.message import MessageHelper
from app.log import logger
from app.schemas import FileItem, TransferTask
from app.schemas import FileItem
from app.utils.singleton import Singleton
lock = Lock()
@@ -43,10 +43,12 @@ class FileMonitorHandler(FileSystemEventHandler):
self.callback = callback
def on_created(self, event: FileSystemEvent):
self.callback.event_handler(event=event, text="创建", event_path=Path(event.src_path))
self.callback.event_handler(event=event, text="创建", event_path=event.src_path,
size=Path(event.src_path).stat().st_size)
def on_moved(self, event: FileSystemMovedEvent):
self.callback.event_handler(event=event, text="移动", event_path=Path(event.dest_path))
self.callback.event_handler(event=event, text="移动", event_path=event.dest_path,
size=Path(event.dest_path).stat().st_size)
class Monitor(metaclass=Singleton):
@@ -194,41 +196,36 @@ class Monitor(metaclass=Singleton):
new_files = new_snapshot.keys() - old_snapshot.keys()
for new_file in new_files:
# 添加到待整理队列
self.__handle_file(storage=storage, event_path=Path(new_file))
self.__handle_file(storage=storage, event_path=Path(new_file),
file_size=new_snapshot.get(new_file))
# 更新快照
self._storage_snapshot[storage] = new_snapshot
def event_handler(self, event, text: str, event_path: Path):
def event_handler(self, event, text: str, event_path: str, file_size: float = None):
"""
处理文件变化
:param event: 事件
:param text: 事件描述
:param event_path: 事件文件路径
:param file_size: 文件大小
"""
if not event.is_directory:
# 文件发生变化
logger.debug(f"文件 {event_path} 发生了 {text}")
# 整理文件
self.__handle_file(storage="local", event_path=event_path)
self.__handle_file(storage="local", event_path=Path(event_path), file_size=file_size)
def __transfer_queue(self, task: TransferTask):
"""
添加到整理队列
"""
# 加入整理队列,使用默认的回调函数
self.transferchain.put_to_queue(task=task)
def __handle_file(self, storage: str, event_path: Path):
def __handle_file(self, storage: str, event_path: Path, file_size: float = None):
"""
整理一个文件
:param storage: 存储
:param event_path: 事件文件路径
:param file_size: 文件大小
"""
# 全程加锁
with lock:
try:
# 开始整理
# TODO 缺少文件大小
self.transferchain.do_transfer(
fileitem=FileItem(
storage=storage,
@@ -236,7 +233,8 @@ class Monitor(metaclass=Singleton):
type="file",
name=event_path.name,
basename=event_path.stem,
extension=event_path.suffix[1:]
extension=event_path.suffix[1:],
size=file_size
),
src_match=True
)