feat:识别支持后台处理

This commit is contained in:
jxxghp
2024-12-27 17:45:04 +08:00
parent 75bb52ccca
commit e3552d4086
3 changed files with 216 additions and 137 deletions

View File

@@ -70,7 +70,10 @@ class JobManager:
获取作业ID
"""
if task:
return task.mediainfo.tmdb_id or task.mediainfo.douban_id, task.meta.begin_season
if task.mediainfo:
return task.mediainfo.tmdb_id or task.mediainfo.douban_id, task.meta.begin_season
else:
return task.meta.name, task.meta.begin_season
else:
return media.tmdb_id or media.douban_id, season
@@ -79,9 +82,20 @@ class JobManager:
"""
获取媒体信息
"""
mediainfo = deepcopy(task.mediainfo)
mediainfo.clear()
return schemas.MediaInfo(**mediainfo.to_dict())
if task.mediainfo:
# 有媒体信息
mediainfo = deepcopy(task.mediainfo)
mediainfo.clear()
return schemas.MediaInfo(**mediainfo.to_dict())
else:
# 没有媒体信息
meta: MetaBase = task.meta
return schemas.MediaInfo(
title=meta.name,
year=meta.year,
title_year=f"{meta.name} ({meta.year})",
type=meta.type.value if meta.type else None
)
@staticmethod
def __get_meta(task: TransferTask) -> schemas.MetaInfo:
@@ -90,11 +104,11 @@ class JobManager:
"""
return schemas.MetaInfo(**task.meta.to_dict())
def add_task(self, task: TransferTask):
def add_task(self, task: TransferTask, state: str = "waiting"):
"""
添加整理任务
"""
if not any([task, task.mediainfo, task.meta, task.fileitem]):
if not any([task, task.meta, task.fileitem]):
return
with job_lock:
__mediaid__ = self.__get_id(task)
@@ -103,14 +117,14 @@ class JobManager:
"media": self.__get_media(task),
"season": task.meta.begin_season,
"tasks": [{
"state": "waiting",
"state": state,
"fileitem": task.fileitem,
"meta": self.__get_meta(task)
}]
}
else:
self._job_view[__mediaid__]["tasks"].append({
"state": "waiting",
"state": state,
"fileitem": task.fileitem,
"meta": self.__get_meta(task)
})
@@ -153,8 +167,6 @@ class JobManager:
"""
任务失败
"""
if not task or not task.mediainfo or not task.meta:
return
with job_lock:
__mediaid__ = self.__get_id(task)
if __mediaid__ not in self._job_view:
@@ -170,7 +182,7 @@ class JobManager:
set(self._season_episodes[__mediaid__]) - set(task.meta.episode_list)
)
def remove_task(self, fileitem: FileItem):
def remove_task(self, fileitem: FileItem) -> Optional[Dict[str, Any]]:
"""
移除整理任务
"""
@@ -178,7 +190,7 @@ class JobManager:
for mediaid in list(self._job_view):
job = self._job_view[mediaid]
for task in job["tasks"]:
if task["fileitem"].path == fileitem.path and task["fileitem"].storage == fileitem.storage:
if task["fileitem"] == fileitem:
job["tasks"].remove(task)
# 如果没有作业了,则移除作业
if not job["tasks"]:
@@ -188,14 +200,12 @@ class JobManager:
self._season_episodes[mediaid] = list(
set(self._season_episodes[mediaid]) - set(task["meta"].episode_list)
)
break
return task
def remove_job(self, media: MediaInfo, season: int = None) -> Optional[Dict[str, Any]]:
"""
移除作业
"""
if not media:
return
__mediaid__ = self.__get_id(media=media, season=season)
with job_lock:
# 移除作业
@@ -205,17 +215,45 @@ class JobManager:
self._season_episodes.pop(__mediaid__)
return self._job_view.pop(__mediaid__)
def is_finished(self, media: MediaInfo, season: int = None) -> bool:
def is_done(self, media: MediaInfo, season: int = None) -> bool:
"""
任务检查某项任务是否全部为已完成状态
检查某项作业是否整理完成(不管成功还是失败)
"""
if not media:
return False
__mediaid__ = self.__get_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return False
with job_lock:
return all([job["state"] in ["completed", "failed"] for job in self._job_view[__mediaid__]["tasks"]])
return all(
[task["state"] in ["completed", "failed"] for task in self._job_view[__mediaid__]["tasks"]]
)
def is_finished(self, media: MediaInfo, season: int = None) -> bool:
"""
检查某项作业是否已完成且有成功的记录
"""
__mediaid__ = self.__get_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return False
with job_lock:
# 所有任务都是完成或者失败状态,且有完成的记录
tasks = self._job_view[__mediaid__]["tasks"]
return True if all(
[task["state"] in ["completed", "failed"] for task in tasks]
) and any(
[task["state"] == "completed" for task in tasks]
) else False
def is_success(self, media: MediaInfo, season: int = None) -> bool:
"""
检查某项作业是否全部成功
"""
__mediaid__ = self.__get_id(media=media, season=season)
if __mediaid__ not in self._job_view:
return False
with job_lock:
return all(
[task["state"] in ["completed"] for task in self._job_view[__mediaid__]["tasks"]]
)
def count(self, media: MediaInfo, season: int = None) -> int:
"""
@@ -233,7 +271,7 @@ class JobManager:
获取所有task任务总数
"""
with job_lock:
return sum([len(jobs) for jobs in self._job_view.values()])
return sum([len(job["tasks"]) for job in self._job_view.values()])
def list_jobs(self) -> List[dict]:
"""
@@ -245,8 +283,6 @@ class JobManager:
"""
获取季集清单
"""
if not media:
return []
__mediaid__ = self.__get_id(media=media, season=season)
with job_lock:
return self._season_episodes.get(__mediaid__) or []
@@ -347,8 +383,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
'download_hash': task.download_hash,
})
# 整理完成一个媒体项
if self.jobview.is_finished(task.mediainfo, task.meta.begin_season):
# 全部整理成功
if self.jobview.is_success(task.mediainfo, task.meta.begin_season):
# 移动模式删除空目录
if transferinfo.transfer_type in ["move"]:
# 下载器hash
@@ -359,6 +395,8 @@ class TransferChain(ChainBase, metaclass=Singleton):
if task.fileitem:
self.storagechain.delete_media_file(task.fileitem, delete_self=False)
# 整理完成且有成功的任务时
if self.jobview.is_finished(task.mediainfo, task.meta.begin_season):
# 发送通知
if transferinfo.need_notify:
se_str = None
@@ -425,58 +463,60 @@ class TransferChain(ChainBase, metaclass=Singleton):
try:
item: TransferQueue = self._queue.get(timeout=self._transfer_interval)
if item:
callback = item.callback
task = item.task
if not task:
continue
mediainfo = task.mediainfo
meta = task.meta
fileitem = task.fileitem
# 正在处理
self.jobview.running_task(task)
# 文件信息
fileitem = task.fileitem
# 开始新队列
if __queue_start:
logger.info("开始整理队列处理...")
# 启动进度
self.progress.start(ProgressKey.FileTransfer)
# 重置计数
processed_num = 0
fail_num = 0
total_num = self.jobview.total()
__process_msg = f"开始整理队列处理,共 {total_num} 个文件或子目录 ..."
__process_msg = f"开始整理队列处理,当前{total_num} 个文件 ..."
logger.info(__process_msg)
self.progress.update(value=0,
text=__process_msg,
key=ProgressKey.FileTransfer)
# 队列已开始
__queue_start = False
# 重新计算总数
total_num = self.jobview.total()
# 更新进度
__process_msg = f"正在整理 {processed_num + 1}/{total_num}{fileitem.name} ..."
__process_msg = f"正在整理 {fileitem.name} ..."
logger.info(__process_msg)
self.progress.update(value=processed_num / total_num * 100,
text=__process_msg,
key=ProgressKey.FileTransfer)
# 整理
state, err_msg = self.__handle_transfer(task=task, callback=callback)
if state:
# 任务成功
processed_num += 1
else:
state, err_msg = self.__handle_transfer(task=task, callback=item.callback)
if not state:
# 任务失败
logger.warn(f"{fileitem.name} 整理失败:{err_msg}")
fail_num += 1
# 更新进度
processed_num += 1
__process_msg = f"{fileitem.name} 整理完成"
logger.info(__process_msg)
self.progress.update(value=processed_num / total_num * 100,
text=__process_msg,
key=ProgressKey.FileTransfer)
# 移除已完成的任务
if self.jobview.is_finished(mediainfo, meta.begin_season):
self.jobview.remove_job(mediainfo, meta.begin_season)
if self.jobview.is_done(task.mediainfo, task.meta.begin_season):
self.jobview.remove_job(task.mediainfo, task.meta.begin_season)
except queue.Empty:
if not __queue_start:
# 结束进度
__end_msg = f"整理队列处理完成,共整理 {total_num} 个文件,失败 {fail_num}"
__end_msg = f"整理队列处理完成,共整理 {processed_num} 个文件,失败 {fail_num}"
logger.info(__end_msg)
self.progress.update(value=100,
text=__end_msg,
key=ProgressKey.FileTransfer)
self.progress.end(ProgressKey.FileTransfer)
# 重置计数
total_num = 0
processed_num = 0
fail_num = 0
# 标记为新队列
@@ -490,6 +530,85 @@ class TransferChain(ChainBase, metaclass=Singleton):
"""
处理整理任务
"""
# 识别
if not task.mediainfo:
download_history = task.download_history
# 识别媒体信息
if download_history and (download_history.tmdbid or download_history.doubanid):
# 下载记录中已存在识别信息
mediainfo: MediaInfo = self.recognize_media(mtype=MediaType(download_history.type),
tmdbid=download_history.tmdbid,
doubanid=download_history.doubanid)
if mediainfo:
# 更新自定义媒体类别
if download_history.media_category:
mediainfo.category = download_history.media_category
else:
# 识别媒体信息
mediainfo = self.mediachain.recognize_by_meta(task.meta)
# 更新媒体图片
if mediainfo:
self.obtain_images(mediainfo=mediainfo)
if not mediainfo:
# 新增整理失败历史记录
his = self.transferhis.add_fail(
fileitem=task.fileitem,
mode=task.transfer_type,
meta=task.meta,
downloader=task.downloader,
download_hash=task.download_hash
)
self.post_message(Notification(
mtype=NotificationType.Manual,
title=f"{task.fileitem.name} 未识别到媒体信息,无法入库!",
text=f"回复:```\n/redo {his.id} [tmdbid]|[类型]\n``` 手动识别整理。",
link=settings.MP_DOMAIN('#/history')
))
# 任务失败
return False, "未识别到媒体信息"
# 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title
if not settings.SCRAP_FOLLOW_TMDB:
transfer_history = self.transferhis.get_by_type_tmdbid(tmdbid=mediainfo.tmdb_id,
mtype=mediainfo.type.value)
if transfer_history:
mediainfo.title = transfer_history.title
# 获取集数据
if not task.episodes_info and mediainfo.type == MediaType.TV:
if task.meta.begin_season is None:
task.meta.begin_season = 1
mediainfo.season = mediainfo.season or task.meta.begin_season
task.episodes_info = self.tmdbchain.tmdb_episodes(
tmdbid=mediainfo.tmdb_id,
season=mediainfo.season
)
# 更新任务信息
task.mediainfo = mediainfo
# 更新队列任务
curr_task = self.jobview.remove_task(task.fileitem)
self.jobview.add_task(task, state=curr_task['state'] if curr_task else "waiting")
# 查询整理目标目录
if not task.target_directory:
if task.src_match:
# 按源目录匹配,以便找到更合适的目录配置
task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo,
storage=task.fileitem.storage,
src_path=Path(task.fileitem.path),
target_storage=task.target_storage)
elif task.target_path:
# 指定目标路径,`手动整理`场景下使用,忽略源目录匹配,使用指定目录匹配
task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo,
dest_path=task.target_path,
target_storage=task.target_storage)
else:
# 未指定目标路径,根据媒体信息获取目标目录
task.target_directory = self.directoryhelper.get_dir(media=task.mediainfo,
storage=task.fileitem.storage,
target_storage=task.target_storage)
# 执行整理
transferinfo: TransferInfo = self.transfer(fileitem=task.fileitem,
meta=task.meta,
@@ -775,7 +894,21 @@ class TransferChain(ChainBase, metaclass=Singleton):
logger.warn(f"{fileitem.path} 没有找到可整理的媒体文件")
return False, f"{fileitem.name} 没有找到可整理的媒体文件"
logger.info(f"正在计划整理 {len(file_items)} 个文件...")
# 总数量
total_num = len(file_items)
# 已处理数量
processed_num = 0
# 失败数量
fail_num = 0
logger.info(f"正在计划整理 {total_num} 个文件...")
if not background:
# 启动进度
self.progress.start(ProgressKey.FileTransfer)
__process_msg = f"开始整理,共 {total_num} 个文件 ..."
logger.info(__process_msg)
self.progress.update(value=0,
text=__process_msg,
key=ProgressKey.FileTransfer)
# 整理所有文件
for file_item, bluray_dir in file_items:
@@ -801,17 +934,27 @@ class TransferChain(ChainBase, metaclass=Singleton):
is_blocked = True
break
if is_blocked:
fail_num += 1
continue
# 整理成功的不再处理
if not force:
transferd = self.transferhis.get_by_src(file_item.path, storage=file_item.storage)
if transferd and transferd.status:
if transferd:
all_success = False
logger.info(f"{file_item.path} 已整理过,如需重新处理,请删除整理记录。")
err_msgs.append(f"{file_item.name} 已整理过")
fail_num += 1
continue
# 更新进度
if not background:
__process_msg = f"正在整理 {processed_num + 1}/{total_num}{file_item.name} ..."
logger.info(__process_msg)
self.progress.update(value=processed_num / total_num * 100,
text=__process_msg,
key=ProgressKey.FileTransfer)
if not meta:
# 文件元数据
file_meta = MetaInfoPath(file_path)
@@ -826,6 +969,7 @@ class TransferChain(ChainBase, metaclass=Singleton):
all_success = False
logger.error(f"{file_path.name} 无法识别有效信息")
err_msgs.append(f"{file_path.name} 无法识别有效信息")
fail_num += 1
continue
# 自定义识别
@@ -854,102 +998,22 @@ class TransferChain(ChainBase, metaclass=Singleton):
downloader = download_history.downloader
download_hash = download_history.download_hash
if not mediainfo:
# 识别媒体信息
if download_history and (download_history.tmdbid or download_history.doubanid):
# 下载记录中已存在识别信息
file_mediainfo: MediaInfo = self.recognize_media(mtype=MediaType(download_history.type),
tmdbid=download_history.tmdbid,
doubanid=download_history.doubanid)
if file_mediainfo:
# 更新自定义媒体类别
if download_history.media_category:
file_mediainfo.category = download_history.media_category
else:
# 识别媒体信息
file_mediainfo = self.mediachain.recognize_by_meta(file_meta)
# 更新媒体图片
if file_mediainfo:
self.obtain_images(mediainfo=file_mediainfo)
else:
file_mediainfo = mediainfo
if not file_mediainfo:
all_success = False
logger.warn(f'{file_path.name} 未识别到媒体信息')
err_msgs.append(f"{file_path.name} 未识别到媒体信息")
# 新增整理失败历史记录
his = self.transferhis.add_fail(
fileitem=file_item,
mode=transfer_type,
meta=file_meta,
downloader=downloader,
download_hash=download_hash
)
self.post_message(Notification(
mtype=NotificationType.Manual,
title=f"{file_path.name} 未识别到媒体信息,无法入库!",
text=f"回复:```\n/redo {his.id} [tmdbid]|[类型]\n``` 手动识别整理。",
link=settings.MP_DOMAIN('#/history')
))
continue
# 如果未开启新增已入库媒体是否跟随TMDB信息变化则根据tmdbid查询之前的title
if not settings.SCRAP_FOLLOW_TMDB:
transfer_history = self.transferhis.get_by_type_tmdbid(tmdbid=file_mediainfo.tmdb_id,
mtype=file_mediainfo.type.value)
if transfer_history:
file_mediainfo.title = transfer_history.title
logger.info(f"{file_path.name} 识别为:{file_mediainfo.type.value} {file_mediainfo.title_year}")
# 获取集数据
if file_mediainfo.type == MediaType.TV:
if file_meta.begin_season is None:
file_meta.begin_season = 1
file_mediainfo.season = file_mediainfo.season or file_meta.begin_season
episodes_info = self.tmdbchain.tmdb_episodes(
tmdbid=file_mediainfo.tmdb_id,
season=file_mediainfo.season
)
else:
episodes_info = None
# 查询整理目标目录
dir_info = None
if not target_directory:
if src_match:
# 按源目录匹配,以便找到更合适的目录配置
dir_info = self.directoryhelper.get_dir(media=file_mediainfo,
storage=file_item.storage,
src_path=file_path,
target_storage=target_storage)
elif target_path:
# 指定目标路径,`手动整理`场景下使用,忽略源目录匹配,使用指定目录匹配
dir_info = self.directoryhelper.get_dir(media=file_mediainfo,
dest_path=target_path,
target_storage=target_storage)
else:
# 未指定目标路径,根据媒体信息获取目标目录
dir_info = self.directoryhelper.get_dir(file_mediainfo,
storage=file_item.storage,
target_storage=target_storage)
# 后台整理
transfer_task = TransferTask(
fileitem=file_item,
meta=file_meta,
mediainfo=file_mediainfo,
target_directory=target_directory or dir_info,
mediainfo=mediainfo,
target_directory=target_directory,
target_storage=target_storage,
target_path=target_path,
transfer_type=transfer_type,
episodes_info=episodes_info,
src_match=src_match,
scrape=scrape,
library_type_folder=library_type_folder,
library_category_folder=library_category_folder,
downloader=downloader,
download_hash=download_hash
download_hash=download_hash,
download_history=download_history
)
if background:
self.put_to_queue(
@@ -965,6 +1029,18 @@ class TransferChain(ChainBase, metaclass=Singleton):
all_success = False
logger.warn(f"{file_path.name} {err_msg}")
err_msgs.append(f"{file_path.name} {err_msg}")
fail_num += 1
# 完成计数
processed_num += 1
# 整理结束
if not background:
__end_msg = f"整理队列处理完成,共整理 {total_num} 个文件,失败 {fail_num}"
logger.info(__end_msg)
self.progress.update(value=100,
text=__end_msg,
key=ProgressKey.FileTransfer)
self.progress.end(ProgressKey.FileTransfer)
return all_success, "".join(err_msgs)

View File

@@ -46,6 +46,8 @@ class DownloadHistory(BaseModel):
date: Optional[str] = None
# 备注
note: Optional[Any] = None
# 自定义媒体类别
media_category: Optional[str] = None
class Config:
orm_mode = True

View File

@@ -3,7 +3,7 @@ from typing import Optional, List, Any, Callable
from pydantic import BaseModel, Field
from app.schemas import TmdbEpisode
from app.schemas import TmdbEpisode, DownloadHistory
from app.schemas.file import FileItem
from app.schemas.system import TransferDirectoryConf
@@ -46,20 +46,21 @@ class TransferTask(BaseModel):
"""
文件整理任务
"""
fileitem: Optional[FileItem] = None
file_path: Optional[Path] = None
meta: Optional[Any] = None
fileitem: FileItem = None
meta: Any = None
mediainfo: Optional[Any] = None
target_directory: Optional[TransferDirectoryConf] = None
target_storage: Optional[str] = None
target_path: Optional[Path] = None
transfer_type: Optional[str] = None
scrape: Optional[bool] = None
library_type_folder: Optional[bool] = None
library_category_folder: Optional[bool] = None
src_match: Optional[bool] = False
scrape: Optional[bool] = False
library_type_folder: Optional[bool] = False
library_category_folder: Optional[bool] = False
episodes_info: Optional[List[TmdbEpisode]] = None
downloader: Optional[str] = None
download_hash: Optional[str] = None
download_history: Optional[DownloadHistory] = None
def to_dict(self):
"""